Skip to main content

Re: Clarify “the order of execution for the subtractor and adder is not defined”

What Alex says.

The order is hard-coded (ie not race condition), but there is no
guarantee that we won't change the order in future releases without
notice (ie, it's not a public contract and we don't need to do a KIP to
change it). I guess there would be a Jira about it... But as a matter of
fact, it does not really matter (detail below).

For the three scenarios you mentioned, the 3rd one cannot happen though:
We execute an aggregator in a single thread (per shard) and thus we
either call the adder or subtractor first.



> 1. Seems like an unusual design choice

Why do you think so?



> first with a Change<V> value that includes only
> the old value and then the process function is called again with a Change<V>
> value that includes only the new value.

In general, both records might be processed by different threads and
thus we cannot only send one record. It's just that the TTD simulates a
single threaded execution thus both records always end up in the same
processor.

Cf
https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations

However, the order actually only matters if both records really end up
in the same processor (if the grouping key did not change during the
upstream update).

Furthermore, the order actually depends not on the downstream aggregate
implementation, but on the order of writes into the repartitions topic
of the `groupBy()` and with multiple parallel upstream processor, those
writes are interleaved anyway. Thus, in general, you should think of the
"add" and "subtract" part as independent entities and not make any
assumption about their order (also, even if the key did not change, both
records might be interleaved by other records...)

The only guarantee we can provide is (given that you configured the
producer correctly to avoid re-ordring during send()), that if the
grouping key does not change, the send of the old and new value will not
be re-ordered relative to each other. The order of the send is
hard-coded in the upstream processor though:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99

Thus, the order of the downstream aggregate processor is actually
meaningless.



-Matthias

On 1/29/21 6:50 AM, Alexandre Brasil wrote:
> From the source code in KGroupedTableImpl, the subtractor is always called
> before the adder. By not guaranteeing the order, I think the devs meant
> that it might change on future versions of Kafka Streams (although I'd
> think it's unlikely to).
>
> I have use cases similars with your example, and that phrase worries me a
> bit too. :)
>
> On Thu, Jan 28, 2021, 22:31 Fq Public <fq.public5@gmail.com> wrote:
>
>> Hi everyone! I posted this same question on stackoverflow
>> <
>> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
>>>
>> a few days ago but didn't get any responses. Was hoping someone here might
>> be able to help clarify this part of the documentation for me :)
>>
>> On Thu, 28 Jan 2021 at 19:50, Fq Public <fq.public5@gmail.com> wrote:
>>
>>> The Streams DSL documentation
>>> <
>> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating>
>> includes
>>> a caveat about using the aggregate method to transform a KGroupedTable →
>>> KTable, as follows (emphasis mine):
>>>
>>> When subsequent non-null values are received for a key (e.g., UPDATE),
>>> then (1) the subtractor is called with the old value as stored in the
>> table
>>> and (2) the adder is called with the new value of the input record that
>> was
>>> just received. *The order of execution for the subtractor and adder is
>>> not defined.*
>>>
>>> My interpretation of that last line implies that one of three things can
>>> happen:
>>>
>>> 1. subtractor can be called before adder
>>> 2. adder can be called before subtractor
>>> 3. adder and subtractor could be called at the same time
>>>
>>> Here is the question I'm looking to get answered:
>>> *Are all 3 scenarios above actually possible when using the aggregate
>>> method on a KGroupedTable?*
>>> Or am I misinterpreting the documentation? For my use-case (detailed
>>> below), it would be ideal if the subtractor was always called before the
>>> adder.
>>> ------------------------------
>>>
>>> *Why is this question important?*
>>>
>>> If the adder and subtractor are non-commutative operations and the order
>>> in which they are executed can vary, you can end up with different
>> results
>>> depending on the order of execution of adder and subtractor. An example
>> of
>>> a useful non-commutative operation would be something like if we're
>>> aggregating records into a Set:
>>>
>>> .aggregate[Set[Animal]](Set.empty)(
>>> adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
>> animalValue,
>>> subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
>> animalValue
>>> )
>>>
>>> In this example, for duplicated events, if the adder is called before the
>>> subtractor you would end up removing the value entirely from the set
>> (which
>>> would be problematic for most use-cases I imagine).
>>> ------------------------------
>>>
>>> *Why am I doubting the documentation (assuming my interpretation of it is
>>> correct)?*
>>>
>>> 1. Seems like an unusual design choice
>>> 2. When I've run unit tests (using TopologyTestDriver and
>>> EmbeddedKafka), I always see the subtractor is called before the
>> adder.
>>> Unfortunately, if there is some kind of race condition involved, it's
>>> entirely possible that I would never hit the other scenarios.
>>> 3. I did try looking into the kafka-streams codebase as well. The
>>> KTableProcessorSupplier that calls the user-supplied adder/subtracter
>>> functions appears to be this one:
>>>
>> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
>> and
>>> on line 92, you can even see a comment saying "first try to remove
>> the old
>>> value". Unfortunately, during my own testing, I saw was that the
>>> process function itself is called twice; first with a Change<V> value
>> that
>>> includes only the old value and then the process function is called
>>> again with a Change<V> value that includes only the new value. I
>>> haven't been able to dig deep enough to find the internal code that is
>>> generating the old value record and the new value record (upon
>> receiving an
>>> update) to determine if it actually produces those records in that
>> order.
>>>
>>>
>>
>

Comments