Skip to main content

Re: Picking up the thread on emit-on-change and at-least-once

Hey,

the problem you describe it not totally fundamental, but there is no
workaround in KS currently... So right now, only EOS can fix it.

In the end, what you would need is a way to first write to the output
topic (and flush the write), before you update the state store.

Using the PAPI you could conceptually do `context.forward()` before you
call `store.put(...)`, but the KS runtime hides too many low level
controls that it would guarantee that `context.forward(...)` resulted in
successful write into the sink downstream, before it returns and
`state.put(...)` is executed... There is a missing `producer.flush()`
step in-between that you cannot perform.

Does this make sense?

Down the line though, I think that TX state-stores can fix it (KIP-892).
I believe that we can (and should) use this, even for ALOS mode,
allowing us to roll back state of error, ie, all write to the store are
"pending" until we do a commit, and for this case we first flush all
pending producer writes, before we "commit the store changes", and the
KS runtime can do the required flush step.

Thus, while TX state-store itself do not give you EOS, they should
address the issue at hand, and thus also unblock KIP-557 down the line.


Hope this helps,
-Matthias

On 2/25/25 4:22 PM, Steven Schlansker wrote:
> Hi kafka-users,
>
> We are implementing a Kafka Streams app that computes various
> streaming statistics over a corpus of data stored in Kafka topics.
> While some aggregates update often, others like 'min', 'max', or
> histogram buckets could have relatively few distinct updates relative
> to the input data.
>
> With our first implementation, we discovered that the majority of our
> time during catch-up processing is computing the same data redundantly
> over and over, at various stages of our pipeline. We then found that
> KIP-557, Emit-on-change support
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> is reverted and doesn't seem to be coming back in the short term.
>
> So, we've implemented a work-around, where we trade additional storage
> to suppress redundant updates.
>
> record Holder<V> (V val, boolean changed) {}
> inputStream
> .mapValues(v -> new Holder<>(v, true))
> .aggregate(
> () -> null,
> (k, v, a) -> a == null ? new Holder<>(v.val(), true) : new
> Holder<>(v.val(), !Objects.equals(a.val(), v.val())),
> materialized)
> .toStream()
> .flatMapValues((k, v) -> v.changed() ?
> Collections.singletonList(v.val()) : Collections.emptyList())
> .toTable(materialized)
>
> This seems to mostly give us the semantics we want. The aggregation
> step statefully tracks if we saw this value as the previous state, and
> if not, forwards the new value.
>
> However, we subsequently seem to run into the same bug as highlighted
> in https://issues.apache.org/jira/browse/KAFKA-12508 - in at least
> once configuration, a failure / retry will observe the source topic
> rolling back but not the state store, causing downstream updates to be
> lost.
>
> This makes me wonder - is KAFKA-12508 a more general bug with at least
> once mode and any stateful store - where you can observe state in a
> store from a failed thread in the "future" relative to your source
> topic, since it already wrote to the changelog before failing? KIP-557
> is long reverted and we're able to observe similar behavior just with
> the DSL.
>
> I can't quite put my finger on the risk, but with a bunch of
> aggregates all through our app, it makes me a bit uneasy about
> at-least-once mode. I'm sure at least some of the stateful aggregate
> authors (myself included) did not consider this sort of failure case.
> We will probably try exactly-once mode for peace of mind.
>
> From there, back to emit on change - if there is no other path forward
> envisioned, perhaps we could consider re-enabling KIP-557 only for
> exactly-once streams apps? Then at least some users could benefit from
> it.
>
> Failing that, our hack workaround is somewhat complicated by the need
> to toStream / flatMap / toTable just to suppress "no change". It would
> be nicer if the aggregate could directly return "no update" to the
> KStreamAggregateProcessor, which would then know to skip forwarding
> the record. Null couldn't be used, but we could construct a different
> flag value or other signal.
> If we can't get emit-on-change back into Kafka Streams soon, we'd be
> interested in sponsoring a change like this if the community thinks it
> could be helpful.
>
> Sorry for the meandering train of thought, hopefully others are also
> interested in getting emit-on-change working still :)
>
> Best,
> Steven

Comments