Hi,
Yes using filter with transformValues would also work.
I have a question out of curiosity. which one would be more efficient?
stream.transform(/*return null for records that don't need to be forwarded
downstream*/)
or
stream.transformValues(/*return null for values that don't need to be
forwarded downstream*/).filter((k,v) -> return v !=null)
Thanks
Sachin
On Tue, Feb 25, 2020 at 11:48 PM Bruno Cadonna <bruno@confluent.io> wrote:
> Hi Sachin,
>
> I am afraid I cannot follow your point.
>
> You can still use a filter if you do not want to emit records
> downstream w/o triggering any repartitioning.
>
> Best,
> Bruno
>
> On Tue, Feb 25, 2020 at 6:43 PM Sachin Mittal <sjmittal@gmail.com> wrote:
> >
> > Hi,
> > This is really getting interesting.
> > Now if we don't want a record to be emitted downstream only way we can do
> > is via transform or (flatTransform).
> >
> > Since we are now reverting the fix for null record in transformValues and
> > rather change the docs, doesn't this add bit of confusion for users.
> > Confluent docs says that:
> > transformValues is preferable to transform because it will not cause data
> > re-partitioning.
> >
> > So in many cases if just the record's value structure is sufficient to
> > determine whether we should emit it downstream or not, we would still be
> > forced to
> > use transform and unnecessarily cause data re-partitioning. Won't this be
> > in-efficient.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Tue, Feb 25, 2020 at 10:52 PM Bruno Cadonna <bruno@confluent.io>
> wrote:
> >
> > > Hello Guozhang and Adam,
> > >
> > > Regarding Guozhang's proposal please see recent discussions about
> > > `transformValues()` and returning `null` from the transformer:
> > >
> > >
> https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17044602
> > > .
> > >
> > > With the current behavior, the commands should be:
> > >
> > > `stream.transformValues(...).filter((k,v) -> return v !=
> > > null).groupByKey().aggregate()`
> > >
> > > Best,
> > > Bruno
> > >
> > > On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang <wangguoz@gmail.com>
> wrote:
> > > >
> > > > Hello Adam,
> > > >
> > > > It seems your intention is to not "avoid emitting if the new
> aggregation
> > > > result is the same as the old aggregation" but to "avoid processing
> the
> > > > aggregation at all if it state is already some certain value", right?
> > > >
> > > > In this case I think you can try sth. like this:
> > > >
> > > > *stream.transformValues().groupByKey().aggregate()*
> > > >
> > > > where transformValues is just used as a slight complicated "filter"
> > > > operation, in which you can access the state store that "aggregate"
> is
> > > > connected to, and read / check if the corresponding entry is already
> > > > `success`, if yes let `transformValue` to return `null` which means
> > > forward
> > > > nothing to the downstream.
> > > >
> > > > The reason to use transformValues instead of transform is to make
> sure
> > > you
> > > > do not introduce unnecessary repartitioning here.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart <
> adam.rinehart@gmail.com>
> > > > wrote:
> > > >
> > > > > So I am trying to process incoming events, that may or may not
> actually
> > > > > update the state of my output object. Originally I was doing this
> with
> > > a
> > > > > KStream/KTable join, until I saw the discussion about "KTable in
> > > Compact
> > > > > Topic takes too long to be updated", when I switched to
> > > > > groupByKey().aggregate().
> > > > >
> > > > > Some events may not result in a state change. For example, once I
> have
> > > an
> > > > > incoming success event, I emit a success output and future incoming
> > > failure
> > > > > events will be ignored.
> > > > >
> > > > > My intention is to only emit a record from the aggregate KTable if
> the
> > > > > aggregate record actually changed. But I can't figure out how to do
> > > that
> > > > > within the aggregator interface. I've tried returning the incoming
> > > > > aggregate object when nothing changes, but I still get a record
> emitted
> > > > > from the table.
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > >
>
Yes using filter with transformValues would also work.
I have a question out of curiosity. which one would be more efficient?
stream.transform(/*return null for records that don't need to be forwarded
downstream*/)
or
stream.transformValues(/*return null for values that don't need to be
forwarded downstream*/).filter((k,v) -> return v !=null)
Thanks
Sachin
On Tue, Feb 25, 2020 at 11:48 PM Bruno Cadonna <bruno@confluent.io> wrote:
> Hi Sachin,
>
> I am afraid I cannot follow your point.
>
> You can still use a filter if you do not want to emit records
> downstream w/o triggering any repartitioning.
>
> Best,
> Bruno
>
> On Tue, Feb 25, 2020 at 6:43 PM Sachin Mittal <sjmittal@gmail.com> wrote:
> >
> > Hi,
> > This is really getting interesting.
> > Now if we don't want a record to be emitted downstream only way we can do
> > is via transform or (flatTransform).
> >
> > Since we are now reverting the fix for null record in transformValues and
> > rather change the docs, doesn't this add bit of confusion for users.
> > Confluent docs says that:
> > transformValues is preferable to transform because it will not cause data
> > re-partitioning.
> >
> > So in many cases if just the record's value structure is sufficient to
> > determine whether we should emit it downstream or not, we would still be
> > forced to
> > use transform and unnecessarily cause data re-partitioning. Won't this be
> > in-efficient.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Tue, Feb 25, 2020 at 10:52 PM Bruno Cadonna <bruno@confluent.io>
> wrote:
> >
> > > Hello Guozhang and Adam,
> > >
> > > Regarding Guozhang's proposal please see recent discussions about
> > > `transformValues()` and returning `null` from the transformer:
> > >
> > >
> https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17044602
> > > .
> > >
> > > With the current behavior, the commands should be:
> > >
> > > `stream.transformValues(...).filter((k,v) -> return v !=
> > > null).groupByKey().aggregate()`
> > >
> > > Best,
> > > Bruno
> > >
> > > On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang <wangguoz@gmail.com>
> wrote:
> > > >
> > > > Hello Adam,
> > > >
> > > > It seems your intention is to not "avoid emitting if the new
> aggregation
> > > > result is the same as the old aggregation" but to "avoid processing
> the
> > > > aggregation at all if it state is already some certain value", right?
> > > >
> > > > In this case I think you can try sth. like this:
> > > >
> > > > *stream.transformValues().groupByKey().aggregate()*
> > > >
> > > > where transformValues is just used as a slight complicated "filter"
> > > > operation, in which you can access the state store that "aggregate"
> is
> > > > connected to, and read / check if the corresponding entry is already
> > > > `success`, if yes let `transformValue` to return `null` which means
> > > forward
> > > > nothing to the downstream.
> > > >
> > > > The reason to use transformValues instead of transform is to make
> sure
> > > you
> > > > do not introduce unnecessary repartitioning here.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart <
> adam.rinehart@gmail.com>
> > > > wrote:
> > > >
> > > > > So I am trying to process incoming events, that may or may not
> actually
> > > > > update the state of my output object. Originally I was doing this
> with
> > > a
> > > > > KStream/KTable join, until I saw the discussion about "KTable in
> > > Compact
> > > > > Topic takes too long to be updated", when I switched to
> > > > > groupByKey().aggregate().
> > > > >
> > > > > Some events may not result in a state change. For example, once I
> have
> > > an
> > > > > incoming success event, I emit a success output and future incoming
> > > failure
> > > > > events will be ignored.
> > > > >
> > > > > My intention is to only emit a record from the aggregate KTable if
> the
> > > > > aggregate record actually changed. But I can't figure out how to do
> > > that
> > > > > within the aggregator interface. I've tried returning the incoming
> > > > > aggregate object when nothing changes, but I still get a record
> emitted
> > > > > from the table.
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > >
>
Comments
Post a Comment