Skip to main content

Re: Clarify “the order of execution for the subtractor and adder is not defined”

From the source code in KGroupedTableImpl, the subtractor is always called
before the adder. By not guaranteeing the order, I think the devs meant
that it might change on future versions of Kafka Streams (although I'd
think it's unlikely to).

I have use cases similars with your example, and that phrase worries me a
bit too. :)

On Thu, Jan 28, 2021, 22:31 Fq Public <fq.public5@gmail.com> wrote:

> Hi everyone! I posted this same question on stackoverflow
> <
> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
> >
> a few days ago but didn't get any responses. Was hoping someone here might
> be able to help clarify this part of the documentation for me :)
>
> On Thu, 28 Jan 2021 at 19:50, Fq Public <fq.public5@gmail.com> wrote:
>
> > The Streams DSL documentation
> > <
> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating>
> includes
> > a caveat about using the aggregate method to transform a KGroupedTable →
> > KTable, as follows (emphasis mine):
> >
> > When subsequent non-null values are received for a key (e.g., UPDATE),
> > then (1) the subtractor is called with the old value as stored in the
> table
> > and (2) the adder is called with the new value of the input record that
> was
> > just received. *The order of execution for the subtractor and adder is
> > not defined.*
> >
> > My interpretation of that last line implies that one of three things can
> > happen:
> >
> > 1. subtractor can be called before adder
> > 2. adder can be called before subtractor
> > 3. adder and subtractor could be called at the same time
> >
> > Here is the question I'm looking to get answered:
> > *Are all 3 scenarios above actually possible when using the aggregate
> > method on a KGroupedTable?*
> > Or am I misinterpreting the documentation? For my use-case (detailed
> > below), it would be ideal if the subtractor was always called before the
> > adder.
> > ------------------------------
> >
> > *Why is this question important?*
> >
> > If the adder and subtractor are non-commutative operations and the order
> > in which they are executed can vary, you can end up with different
> results
> > depending on the order of execution of adder and subtractor. An example
> of
> > a useful non-commutative operation would be something like if we're
> > aggregating records into a Set:
> >
> > .aggregate[Set[Animal]](Set.empty)(
> > adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
> animalValue,
> > subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
> animalValue
> > )
> >
> > In this example, for duplicated events, if the adder is called before the
> > subtractor you would end up removing the value entirely from the set
> (which
> > would be problematic for most use-cases I imagine).
> > ------------------------------
> >
> > *Why am I doubting the documentation (assuming my interpretation of it is
> > correct)?*
> >
> > 1. Seems like an unusual design choice
> > 2. When I've run unit tests (using TopologyTestDriver and
> > EmbeddedKafka), I always see the subtractor is called before the
> adder.
> > Unfortunately, if there is some kind of race condition involved, it's
> > entirely possible that I would never hit the other scenarios.
> > 3. I did try looking into the kafka-streams codebase as well. The
> > KTableProcessorSupplier that calls the user-supplied adder/subtracter
> > functions appears to be this one:
> >
> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
> and
> > on line 92, you can even see a comment saying "first try to remove
> the old
> > value". Unfortunately, during my own testing, I saw was that the
> > process function itself is called twice; first with a Change<V> value
> that
> > includes only the old value and then the process function is called
> > again with a Change<V> value that includes only the new value. I
> > haven't been able to dig deep enough to find the internal code that is
> > generating the old value record and the new value record (upon
> receiving an
> > update) to determine if it actually produces those records in that
> order.
> >
> >
>

Comments