Skip to main content

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

Hello Sachin,

I just read John / Bill's comment on that ticket (was not on KAFKA-9533
before so it was kinda new to me), and I think the besides the rationale of
John which I agree since for KStream that returning null in value with a
non-null key could still have a valid meaning, the behavior has actually
been "forwarding to null" so far. We do, however, need to fix the javadoc
if we believe that should be the right semantics.

Guozhang

On Tue, Feb 25, 2020 at 10:18 AM Bruno Cadonna <bruno@confluent.io> wrote:

> Hi Sachin,
>
> I am afraid I cannot follow your point.
>
> You can still use a filter if you do not want to emit records
> downstream w/o triggering any repartitioning.
>
> Best,
> Bruno
>
> On Tue, Feb 25, 2020 at 6:43 PM Sachin Mittal <sjmittal@gmail.com> wrote:
> >
> > Hi,
> > This is really getting interesting.
> > Now if we don't want a record to be emitted downstream only way we can do
> > is via transform or (flatTransform).
> >
> > Since we are now reverting the fix for null record in transformValues and
> > rather change the docs, doesn't this add bit of confusion for users.
> > Confluent docs says that:
> > transformValues is preferable to transform because it will not cause data
> > re-partitioning.
> >
> > So in many cases if just the record's value structure is sufficient to
> > determine whether we should emit it downstream or not, we would still be
> > forced to
> > use transform and unnecessarily cause data re-partitioning. Won't this be
> > in-efficient.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Tue, Feb 25, 2020 at 10:52 PM Bruno Cadonna <bruno@confluent.io>
> wrote:
> >
> > > Hello Guozhang and Adam,
> > >
> > > Regarding Guozhang's proposal please see recent discussions about
> > > `transformValues()` and returning `null` from the transformer:
> > >
> > >
> https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17044602
> > > .
> > >
> > > With the current behavior, the commands should be:
> > >
> > > `stream.transformValues(...).filter((k,v) -> return v !=
> > > null).groupByKey().aggregate()`
> > >
> > > Best,
> > > Bruno
> > >
> > > On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang <wangguoz@gmail.com>
> wrote:
> > > >
> > > > Hello Adam,
> > > >
> > > > It seems your intention is to not "avoid emitting if the new
> aggregation
> > > > result is the same as the old aggregation" but to "avoid processing
> the
> > > > aggregation at all if it state is already some certain value", right?
> > > >
> > > > In this case I think you can try sth. like this:
> > > >
> > > > *stream.transformValues().groupByKey().aggregate()*
> > > >
> > > > where transformValues is just used as a slight complicated "filter"
> > > > operation, in which you can access the state store that "aggregate"
> is
> > > > connected to, and read / check if the corresponding entry is already
> > > > `success`, if yes let `transformValue` to return `null` which means
> > > forward
> > > > nothing to the downstream.
> > > >
> > > > The reason to use transformValues instead of transform is to make
> sure
> > > you
> > > > do not introduce unnecessary repartitioning here.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart <
> adam.rinehart@gmail.com>
> > > > wrote:
> > > >
> > > > > So I am trying to process incoming events, that may or may not
> actually
> > > > > update the state of my output object. Originally I was doing this
> with
> > > a
> > > > > KStream/KTable join, until I saw the discussion about "KTable in
> > > Compact
> > > > > Topic takes too long to be updated", when I switched to
> > > > > groupByKey().aggregate().
> > > > >
> > > > > Some events may not result in a state change. For example, once I
> have
> > > an
> > > > > incoming success event, I emit a success output and future incoming
> > > failure
> > > > > events will be ignored.
> > > > >
> > > > > My intention is to only emit a record from the aggregate KTable if
> the
> > > > > aggregate record actually changed. But I can't figure out how to do
> > > that
> > > > > within the aggregator interface. I've tried returning the incoming
> > > > > aggregate object when nothing changes, but I still get a record
> emitted
> > > > > from the table.
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > >
>


--
-- Guozhang

Comments