Hi everyone! I posted this same question on stackoverflow
<https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined>
a few days ago but didn't get any responses. Was hoping someone here might
be able to help clarify this part of the documentation for me :)
On Thu, 28 Jan 2021 at 19:50, Fq Public <fq.public5@gmail.com> wrote:
> The Streams DSL documentation
> <https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating> includes
> a caveat about using the aggregate method to transform a KGroupedTable →
> KTable, as follows (emphasis mine):
>
> When subsequent non-null values are received for a key (e.g., UPDATE),
> then (1) the subtractor is called with the old value as stored in the table
> and (2) the adder is called with the new value of the input record that was
> just received. *The order of execution for the subtractor and adder is
> not defined.*
>
> My interpretation of that last line implies that one of three things can
> happen:
>
> 1. subtractor can be called before adder
> 2. adder can be called before subtractor
> 3. adder and subtractor could be called at the same time
>
> Here is the question I'm looking to get answered:
> *Are all 3 scenarios above actually possible when using the aggregate
> method on a KGroupedTable?*
> Or am I misinterpreting the documentation? For my use-case (detailed
> below), it would be ideal if the subtractor was always called before the
> adder.
> ------------------------------
>
> *Why is this question important?*
>
> If the adder and subtractor are non-commutative operations and the order
> in which they are executed can vary, you can end up with different results
> depending on the order of execution of adder and subtractor. An example of
> a useful non-commutative operation would be something like if we're
> aggregating records into a Set:
>
> .aggregate[Set[Animal]](Set.empty)(
> adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
> subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
> )
>
> In this example, for duplicated events, if the adder is called before the
> subtractor you would end up removing the value entirely from the set (which
> would be problematic for most use-cases I imagine).
> ------------------------------
>
> *Why am I doubting the documentation (assuming my interpretation of it is
> correct)?*
>
> 1. Seems like an unusual design choice
> 2. When I've run unit tests (using TopologyTestDriver and
> EmbeddedKafka), I always see the subtractor is called before the adder.
> Unfortunately, if there is some kind of race condition involved, it's
> entirely possible that I would never hit the other scenarios.
> 3. I did try looking into the kafka-streams codebase as well. The
> KTableProcessorSupplier that calls the user-supplied adder/subtracter
> functions appears to be this one:
> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 and
> on line 92, you can even see a comment saying "first try to remove the old
> value". Unfortunately, during my own testing, I saw was that the
> process function itself is called twice; first with a Change<V> value that
> includes only the old value and then the process function is called
> again with a Change<V> value that includes only the new value. I
> haven't been able to dig deep enough to find the internal code that is
> generating the old value record and the new value record (upon receiving an
> update) to determine if it actually produces those records in that order.
>
>
<https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined>
a few days ago but didn't get any responses. Was hoping someone here might
be able to help clarify this part of the documentation for me :)
On Thu, 28 Jan 2021 at 19:50, Fq Public <fq.public5@gmail.com> wrote:
> The Streams DSL documentation
> <https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating> includes
> a caveat about using the aggregate method to transform a KGroupedTable →
> KTable, as follows (emphasis mine):
>
> When subsequent non-null values are received for a key (e.g., UPDATE),
> then (1) the subtractor is called with the old value as stored in the table
> and (2) the adder is called with the new value of the input record that was
> just received. *The order of execution for the subtractor and adder is
> not defined.*
>
> My interpretation of that last line implies that one of three things can
> happen:
>
> 1. subtractor can be called before adder
> 2. adder can be called before subtractor
> 3. adder and subtractor could be called at the same time
>
> Here is the question I'm looking to get answered:
> *Are all 3 scenarios above actually possible when using the aggregate
> method on a KGroupedTable?*
> Or am I misinterpreting the documentation? For my use-case (detailed
> below), it would be ideal if the subtractor was always called before the
> adder.
> ------------------------------
>
> *Why is this question important?*
>
> If the adder and subtractor are non-commutative operations and the order
> in which they are executed can vary, you can end up with different results
> depending on the order of execution of adder and subtractor. An example of
> a useful non-commutative operation would be something like if we're
> aggregating records into a Set:
>
> .aggregate[Set[Animal]](Set.empty)(
> adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
> subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
> )
>
> In this example, for duplicated events, if the adder is called before the
> subtractor you would end up removing the value entirely from the set (which
> would be problematic for most use-cases I imagine).
> ------------------------------
>
> *Why am I doubting the documentation (assuming my interpretation of it is
> correct)?*
>
> 1. Seems like an unusual design choice
> 2. When I've run unit tests (using TopologyTestDriver and
> EmbeddedKafka), I always see the subtractor is called before the adder.
> Unfortunately, if there is some kind of race condition involved, it's
> entirely possible that I would never hit the other scenarios.
> 3. I did try looking into the kafka-streams codebase as well. The
> KTableProcessorSupplier that calls the user-supplied adder/subtracter
> functions appears to be this one:
> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 and
> on line 92, you can even see a comment saying "first try to remove the old
> value". Unfortunately, during my own testing, I saw was that the
> process function itself is called twice; first with a Change<V> value that
> includes only the old value and then the process function is called
> again with a Change<V> value that includes only the new value. I
> haven't been able to dig deep enough to find the internal code that is
> generating the old value record and the new value record (upon receiving an
> update) to determine if it actually produces those records in that order.
>
>
Comments
Post a Comment