Skip to main content

Re: kafka stream ktable with suppress operator

Hi Matthias,

When I redeployment the application with the same application Id, it
will cause a rebalance loop: partition revoked -> rebalance -> offset
reset to zero -> partition assigned -> partition revoked.

The app was running well before the redeployment, but once redeployed,
it will keep rebalancing for hours and I have to switch to a new
application id to stop that.

You mentioned that, reduce() could use RocksDB as stores by default
while suppress() is in memory. Is that the reason that reduce() has
both -repartition and -changelog topics while suppress() only has
-changelog topic?

And will that be related to the shutdown hook? If I don't provide
shutdown hook and perform a redeployment, will it cause above issue?

Thanks!

On Thu, Oct 31, 2019 at 5:55 AM Matthias J. Sax <matthias@confluent.io> wrote:
>
> Just a follow up: currently, suppress() only supports in-memory stores
> (note, that `suppress()` has it's own store).
>
> For the actually `reduce()` store, you can pick between RocksDB and
> in-memory (default is RocksDB). Hence, if you restart an application on
> the same host, it should not be necessary to reload the state from the
> changelog topic if you use RocksDB.
>
> However, the suppress buffer must be recreated from the
> suppress-changelog topics on restart atm.
>
> Originally, `suppress()` intended to support persistent stores as well,
> but it was not implement yet. We hope to close this gap in the future.
>
> >>> I haven't figured out the reason, but after restart, the app will keep
> >>> reset changelog topic offset to ZERO and trigger rebalance.
>
> Resetting to zero would happen is the full state needs to be recovered.
> However, this should not result in a rebalance. Can you elaborate on the
> rebalancing issue you described?
>
>
>
> -Matthias
>
> On 10/28/19 5:54 PM, Alex Brekken wrote:
> > I assume you're using RocksDB as your state stores... The bytes out you're
> > seeing on the changelog topics is probably because they are restoring your
> > state stores. If your state stores are in-memory, then on every
> > application startup they're going to be restored from the changelog
> > topics. If your state stores are persistent (saved to disk), then a
> > restore can still happen if you've lost your filesystem. (maybe you're
> > doing a state store cleanup on startup/shutdown, or have temporal storage
> > such as emptyDir in k8s, for example) So *I think* what you're seeing is
> > normal, though if you want to dig deeper there are rocksdb metrics that can
> > be exposed and will show restore related info. Additionally, there is a
> > StateRestoreListener interface that you can implement if you'd like to log
> > some of the state store restoration details.
> >
> > Alex
> >
> > On Mon, Oct 28, 2019 at 4:41 PM Xiyuan Hu <xiyuan.huhu@gmail.com> wrote:
> >
> >> Hi,
> >> I'm using 2.3.1 now and having the same issue. During restarting, I
> >> noticed a lot logging like below:
> >> Seeking to EARLIEST offset of partition
> >> XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41
> >> Seeking to EARLIEST offset of partition
> >> XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41
> >>
> >> After restarting, the bytesout of changelog topic is as high as
> >> 800-900MB/s while normally, it has zero bytes out. Is this expected?
> >> I haven't figured out the reason, but after restart, the app will keep
> >> reset changelog topic offset to ZERO and trigger rebalance. It seems a
> >> dead loop?
> >> Rebalance -> reset to ZERO -> rebalance
> >>
> >> Is there any config I should set?
> >>
> >> Thanks!
> >>
> >> On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <matthias@confluent.io>
> >> wrote:
> >>>
> >>> What version are you using? We fixed couple of bugs in `suppress()` -- I
> >>> would recommend to use latest 2.3.1 bug-fix release.
> >>>
> >>>
> >>> -Matthia
> >>>
> >>> On 10/25/19 9:12 AM, Tao Wang wrote:
> >>>> When using suppress operator with windowed Ktable, it looks like
> >> restarting the kafka stream causes the aggregated messages from the
> >> SUPPRESS-STATE-STORE published again..
> >>>>
> >>>> Here is the sudo code .. anything I am missing or anything can be done
> >> to avoid this ..
> >>>>
> >>>>
> >>>> KTable<Windowed<String>, String> test = <KStreamObject>
> >>>> .groupByKey()
> >>>>
> >> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
> >>>> .aggregate(
> >>>> ....
> >>>> ,
> >>>>
> >> Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
> >>>> .withRetention(Duration.ofMinutes(5))
> >>>> .with(Serdes.String(), Serdes.String())
> >>>> )
> >>>>
> >> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> >>>>
> >>>> .toStream()
> >>>>
> >>>> .to("<topic_out>")
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> So when restarting the stream app, the <topic_out> will have
> >> duplicated messages from a while back ... is this expected behavior ?
> >>>>
> >>>> Thanks,
> >>>> Tao Wang
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> ________________________________
> >>>>
> >>>> This message may contain confidential information and is intended for
> >> specific recipients unless explicitly noted otherwise. If you have reason
> >> to believe you are not an intended recipient of this message, please delete
> >> it and notify the sender. This message may not represent the opinion of
> >> Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and
> >> does not constitute a contract or guarantee. Unencrypted electronic mail is
> >> not secure and the recipient of this message is expected to provide
> >> safeguards from viruses and pursue alternate means of communication where
> >> privacy or a binding message is desired.
> >>>>
> >>>
> >>
> >
>

Comments