The Streams DSL documentation
<https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating>
includes
a caveat about using the aggregate method to transform a KGroupedTable →
KTable, as follows (emphasis mine):
When subsequent non-null values are received for a key (e.g., UPDATE), then
(1) the subtractor is called with the old value as stored in the table and
(2) the adder is called with the new value of the input record that was
just received. *The order of execution for the subtractor and adder is not
defined.*
My interpretation of that last line implies that one of three things can
happen:
1. subtractor can be called before adder
2. adder can be called before subtractor
3. adder and subtractor could be called at the same time
Here is the question I'm looking to get answered:
*Are all 3 scenarios above actually possible when using the aggregate
method on a KGroupedTable?*
Or am I misinterpreting the documentation? For my use-case (detailed
below), it would be ideal if the subtractor was always called before the
adder.
------------------------------
*Why is this question important?*
If the adder and subtractor are non-commutative operations and the order in
which they are executed can vary, you can end up with different results
depending on the order of execution of adder and subtractor. An example of
a useful non-commutative operation would be something like if we're
aggregating records into a Set:
.aggregate[Set[Animal]](Set.empty)(
adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
)
In this example, for duplicated events, if the adder is called before the
subtractor you would end up removing the value entirely from the set (which
would be problematic for most use-cases I imagine).
------------------------------
*Why am I doubting the documentation (assuming my interpretation of it is
correct)?*
1. Seems like an unusual design choice
2. When I've run unit tests (using TopologyTestDriver and
EmbeddedKafka), I always see the subtractor is called before the adder.
Unfortunately, if there is some kind of race condition involved, it's
entirely possible that I would never hit the other scenarios.
3. I did try looking into the kafka-streams codebase as well. The
KTableProcessorSupplier that calls the user-supplied adder/subtracter
functions appears to be this one:
https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
and
on line 92, you can even see a comment saying "first try to remove the old
value". Unfortunately, during my own testing, I saw was that the
process function
itself is called twice; first with a Change<V> value that includes only
the old value and then the process function is called again with a Change<V>
value that includes only the new value. I haven't been able to dig deep
enough to find the internal code that is generating the old value record
and the new value record (upon receiving an update) to determine if it
actually produces those records in that order.
<https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating>
includes
a caveat about using the aggregate method to transform a KGroupedTable →
KTable, as follows (emphasis mine):
When subsequent non-null values are received for a key (e.g., UPDATE), then
(1) the subtractor is called with the old value as stored in the table and
(2) the adder is called with the new value of the input record that was
just received. *The order of execution for the subtractor and adder is not
defined.*
My interpretation of that last line implies that one of three things can
happen:
1. subtractor can be called before adder
2. adder can be called before subtractor
3. adder and subtractor could be called at the same time
Here is the question I'm looking to get answered:
*Are all 3 scenarios above actually possible when using the aggregate
method on a KGroupedTable?*
Or am I misinterpreting the documentation? For my use-case (detailed
below), it would be ideal if the subtractor was always called before the
adder.
------------------------------
*Why is this question important?*
If the adder and subtractor are non-commutative operations and the order in
which they are executed can vary, you can end up with different results
depending on the order of execution of adder and subtractor. An example of
a useful non-commutative operation would be something like if we're
aggregating records into a Set:
.aggregate[Set[Animal]](Set.empty)(
adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
)
In this example, for duplicated events, if the adder is called before the
subtractor you would end up removing the value entirely from the set (which
would be problematic for most use-cases I imagine).
------------------------------
*Why am I doubting the documentation (assuming my interpretation of it is
correct)?*
1. Seems like an unusual design choice
2. When I've run unit tests (using TopologyTestDriver and
EmbeddedKafka), I always see the subtractor is called before the adder.
Unfortunately, if there is some kind of race condition involved, it's
entirely possible that I would never hit the other scenarios.
3. I did try looking into the kafka-streams codebase as well. The
KTableProcessorSupplier that calls the user-supplied adder/subtracter
functions appears to be this one:
https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
and
on line 92, you can even see a comment saying "first try to remove the old
value". Unfortunately, during my own testing, I saw was that the
process function
itself is called twice; first with a Change<V> value that includes only
the old value and then the process function is called again with a Change<V>
value that includes only the new value. I haven't been able to dig deep
enough to find the internal code that is generating the old value record
and the new value record (upon receiving an update) to determine if it
actually produces those records in that order.
Comments
Post a Comment