In Lenses it is not possible to directly edit a SQL Processor, for example changing the query or changing some of its deployment configuration.
There is a reason behind this: a change in a processor may result in a new processor that is incompatible with the previous one, producing incorrect or duplicated results.
But if you know what we are doing, and you are confident that the change will be a compatible one, there is a way to achieve this.
In this article we will first see how to update a processor, and then we will briefly analyse when it is safe to do so.
To emulate the editing of a processor, we will create a new one sharing the same Processor ID as the previous one.
Processor ID
The Processor ID will determine the consumer group of the underlying Kafka Streams application.
Sharing the same consumer group, will allow the processor to start exactly from where the previous one left off.
We will now set up a simple example and see, step by step, how to apply this strategy.
We will assume we have a simple topic called player_results, storing the result of chess games. Each record will contain information about the outcome of a game for a single player.
player_results
The _key is an identifier identifying the result is referring to.
_key
The result can be one of Win, Loss, and Draw.
result
Win
Loss
Draw
From SQL Studio, create the topic and insert some data into it:
create table player_results(playerId string, result string) FORMAT(long, protobuf); INSERT INTO player_results(_key, playerId, result) values (0, "Player 1", "Win"), (0, "Player 2", "Loss"), (1, "Player 1", "Loss"), (1, "Player 2", "Win"), (2, "Player 1", "Draw"), (2, "Player 2", "Draw"), (3, "Player 3", "Win"), (3, "Player 1", "Loss"), (4, "Player 2", "Draw"), (4, "Player 1", "Draw");
Here we used PROTOBUF as a serialisation format, but that is not relevant for the sake of this example.
PROTOBUF
We want to compute the total number of points for each player, initially following these rules
2
1
0
To do that, create the following processor:
SET defaults.topic.autocreate=true; insert into player_stats select stream SUM(case when result = "Win" then 2 when result = "Draw" then 1 else 0 end) as points from player_results group by playerId
Save the processor and start it.
After a while (it may take some time because the processor is stateful), you will see the following results in the output topic (you can check them in SQL Studio):
KEY: "Player 3" points: 2 KEY: "Player 2" points: 4 KEY: "Player 1" points: 4
That is consistent with our data: Player 3 scored a single Win, for a total of 2 points, while Player 1 and Player 2 scored both a win and two draws, for a total of 4 points.
Player 3
2 points
Player 1
Player 2
4 points
At some point we decide to change our scoring algorithm, assigning 3 points in case of victory, instead of 2. We decided that the change should not affect historical data, but only new games.
3 points
Since we are going to delete the processor, we need first to copy all the information we need from it, in order to be used in the new processor.
We need to stop the processor in order to avoid to compute new data with the old point algorithm. You can stop it clicking on Actions > Stop in the processor page.
Actions > Stop
This is a mandatory step, since without Processor ID we will lose the ability to start from where the old processor stopped.
You can find the Processor ID under the Configuration tab of the processor page.
Configuration
Copy any other detail you may need for the creation of the new processor. In our case, we are just going to copy the SQL query.
Make sure you copied all you need, and delete the processor.
This step is necessary, since Lenses does not allow the creation of multiple processors with the same Processor ID.
Create a new processor with the following data:
SQL Query
3
SET defaults.topic.autocreate=true; insert into player_stats select stream SUM(case when result = "Win" then 3 when result = "Draw" then 1 else 0 end) as points from player_results group by playerId
In SQL Studio, insert the results for a new game between Player 1 and Player 2:
INSERT INTO player_results(gameId, playerId, result) values (5, "Player 1", "Win"), (5, "Player 2", "Loss");
After a while, we should see the following data in player_stats:
player_stats
KEY: "Player 3" points: 2 KEY: "Player 2" points: 4 KEY: "Player 1" points: 4 KEY: "Player 1" points: 7 KEY: "Player 2" points: 4
As expected, the points of Player 2 and Player 3 did not change.
This means that the data was not-recomputed from scratch, since in that case, according to the new rules, they should have been assigned 5 points each.
5
On the other hand, Player 1 scored a victory when the new scoring system was in place, getting three points more, raising its total to 7.
7
The new version of the processor has been created, and we verified that it produced the desired results.
We saw that updating a processor, although not directly supported by Lenses, is quite an easy task.
But updating a processor is also a very delicate process. For this reason it should be done with the maximal care, only when you are 100% sure that the change is compatible with the previous version.
Particular attention has to go to stateful processors, i.e. processors that make use of one of the following:
JOIN
GROUP BY
SELECT TABLE
In such cases, the processor will internally build names for its state-related internal topics. If something significant is changed in the processor query, those names may change in the new version, causing the loss of the old state.
Stateless processors: It is safe to update a non-stateful processor, for example adding a WHERE statement. Since the processor is not stateful, no internal topic names are involved.
WHERE
Stateful processors: In a stateful processor, the only safe change is to replace an expression with another one, like we did in our example, since such a change will not affect the name of internal topics.
Almost everything else is an unsafe change, unless you know what are the consequences of the change and you are OK with that.
Even adding a simple WHERE statement to a stateful processor may cause some internal topics to change name, causing the loss of state described above.
On this page