From the source code in KGroupedTableImpl, the subtractor is always called
before the adder. By not guaranteeing the order, I think the devs meant
that it might change on future versions of Kafka Streams (although I'd
think it's unlikely to).
I have use cases similars with your example, and that phrase worries me a
bit too. :)
On Thu, Jan 28, 2021, 22:31 Fq Public <fq.public5@gmail.com> wrote:
> Hi everyone! I posted this same question on stackoverflow
> <
> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
> >
> a few days ago but didn't get any responses. Was hoping someone here might
> be able to help clarify this part of the documentation for me :)
>
> On Thu, 28 Jan 2021 at 19:50, Fq Public <fq.public5@gmail.com> wrote:
>
> > The Streams DSL documentation
> > <
> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating>
> includes
> > a caveat about using the aggregate method to transform a KGroupedTable →
> > KTable, as follows (emphasis mine):
> >
> > When subsequent non-null values are received for a key (e.g., UPDATE),
> > then (1) the subtractor is called with the old value as stored in the
> table
> > and (2) the adder is called with the new value of the input record that
> was
> > just received. *The order of execution for the subtractor and adder is
> > not defined.*
> >
> > My interpretation of that last line implies that one of three things can
> > happen:
> >
> > 1. subtractor can be called before adder
> > 2. adder can be called before subtractor
> > 3. adder and subtractor could be called at the same time
> >
> > Here is the question I'm looking to get answered:
> > *Are all 3 scenarios above actually possible when using the aggregate
> > method on a KGroupedTable?*
> > Or am I misinterpreting the documentation? For my use-case (detailed
> > below), it would be ideal if the subtractor was always called before the
> > adder.
> > ------------------------------
> >
> > *Why is this question important?*
> >
> > If the adder and subtractor are non-commutative operations and the order
> > in which they are executed can vary, you can end up with different
> results
> > depending on the order of execution of adder and subtractor. An example
> of
> > a useful non-commutative operation would be something like if we're
> > aggregating records into a Set:
> >
> > .aggregate[Set[Animal]](Set.empty)(
> > adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
> animalValue,
> > subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
> animalValue
> > )
> >
> > In this example, for duplicated events, if the adder is called before the
> > subtractor you would end up removing the value entirely from the set
> (which
> > would be problematic for most use-cases I imagine).
> > ------------------------------
> >
> > *Why am I doubting the documentation (assuming my interpretation of it is
> > correct)?*
> >
> > 1. Seems like an unusual design choice
> > 2. When I've run unit tests (using TopologyTestDriver and
> > EmbeddedKafka), I always see the subtractor is called before the
> adder.
> > Unfortunately, if there is some kind of race condition involved, it's
> > entirely possible that I would never hit the other scenarios.
> > 3. I did try looking into the kafka-streams codebase as well. The
> > KTableProcessorSupplier that calls the user-supplied adder/subtracter
> > functions appears to be this one:
> >
> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
> and
> > on line 92, you can even see a comment saying "first try to remove
> the old
> > value". Unfortunately, during my own testing, I saw was that the
> > process function itself is called twice; first with a Change<V> value
> that
> > includes only the old value and then the process function is called
> > again with a Change<V> value that includes only the new value. I
> > haven't been able to dig deep enough to find the internal code that is
> > generating the old value record and the new value record (upon
> receiving an
> > update) to determine if it actually produces those records in that
> order.
> >
> >
>
before the adder. By not guaranteeing the order, I think the devs meant
that it might change on future versions of Kafka Streams (although I'd
think it's unlikely to).
I have use cases similars with your example, and that phrase worries me a
bit too. :)
On Thu, Jan 28, 2021, 22:31 Fq Public <fq.public5@gmail.com> wrote:
> Hi everyone! I posted this same question on stackoverflow
> <
> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
> >
> a few days ago but didn't get any responses. Was hoping someone here might
> be able to help clarify this part of the documentation for me :)
>
> On Thu, 28 Jan 2021 at 19:50, Fq Public <fq.public5@gmail.com> wrote:
>
> > The Streams DSL documentation
> > <
> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating>
> includes
> > a caveat about using the aggregate method to transform a KGroupedTable →
> > KTable, as follows (emphasis mine):
> >
> > When subsequent non-null values are received for a key (e.g., UPDATE),
> > then (1) the subtractor is called with the old value as stored in the
> table
> > and (2) the adder is called with the new value of the input record that
> was
> > just received. *The order of execution for the subtractor and adder is
> > not defined.*
> >
> > My interpretation of that last line implies that one of three things can
> > happen:
> >
> > 1. subtractor can be called before adder
> > 2. adder can be called before subtractor
> > 3. adder and subtractor could be called at the same time
> >
> > Here is the question I'm looking to get answered:
> > *Are all 3 scenarios above actually possible when using the aggregate
> > method on a KGroupedTable?*
> > Or am I misinterpreting the documentation? For my use-case (detailed
> > below), it would be ideal if the subtractor was always called before the
> > adder.
> > ------------------------------
> >
> > *Why is this question important?*
> >
> > If the adder and subtractor are non-commutative operations and the order
> > in which they are executed can vary, you can end up with different
> results
> > depending on the order of execution of adder and subtractor. An example
> of
> > a useful non-commutative operation would be something like if we're
> > aggregating records into a Set:
> >
> > .aggregate[Set[Animal]](Set.empty)(
> > adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
> animalValue,
> > subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
> animalValue
> > )
> >
> > In this example, for duplicated events, if the adder is called before the
> > subtractor you would end up removing the value entirely from the set
> (which
> > would be problematic for most use-cases I imagine).
> > ------------------------------
> >
> > *Why am I doubting the documentation (assuming my interpretation of it is
> > correct)?*
> >
> > 1. Seems like an unusual design choice
> > 2. When I've run unit tests (using TopologyTestDriver and
> > EmbeddedKafka), I always see the subtractor is called before the
> adder.
> > Unfortunately, if there is some kind of race condition involved, it's
> > entirely possible that I would never hit the other scenarios.
> > 3. I did try looking into the kafka-streams codebase as well. The
> > KTableProcessorSupplier that calls the user-supplied adder/subtracter
> > functions appears to be this one:
> >
> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
> and
> > on line 92, you can even see a comment saying "first try to remove
> the old
> > value". Unfortunately, during my own testing, I saw was that the
> > process function itself is called twice; first with a Change<V> value
> that
> > includes only the old value and then the process function is called
> > again with a Change<V> value that includes only the new value. I
> > haven't been able to dig deep enough to find the internal code that is
> > generating the old value record and the new value record (upon
> receiving an
> > update) to determine if it actually produces those records in that
> order.
> >
> >
>
Comments
Post a Comment