Skip to main content

Re: kafka stream ktable with suppress operator

-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIzBAEBCgAdFiEE8osu2CcCCF5douGQu8PBaGu5w1EFAl26r4cACgkQu8PBaGu5
w1H6pw/+MnUHZqSXwZjjvRgBzx+uf6UsBNoaCn/vpAX31Lw7Ts+sLwZNHcGB2n9C
QLSP1V2REFQmXb1slza7qK2X8EK+RBaSFjWVT3o8NLd3sJKzGxSqMQcMdrXqqwKo
H5mbpS02w9x5qMEUZh/wvXbx2cRb0Y7qOfD1q1aWIjg6m4iQf1yPW5Xg5wU+DJMy
hDQ3vGLH13Bq7zuNDbozSViP0bCWk+LEjC6fzGUb/pcASpXx8Hm6KHR5oOoeOnBR
uF/+kvsJILjtlnbFrWUxgoIoN76ly7I1crSJU5sH2aQR8Rt2iJIOMiaJh7n6OoPM
Z4bRlLRAhM59s39OXXKHnAsJ8OoA7u/BspNrKnJjS3ogrJ5fzTJSR3sJMN1ksLAd
VkIB4LSppWed9BJIizaTeplmLtBspKvkarGTjPEqw8Npq3fwy/j+5zNAjLlCzXcC
zmwdzrrt9+pgZb5yTMU958gC+9RFxC2gpUL6gTAOiexkNHgsU3rAn4SwTGWwpJWY
I7fUYqiQCqNKi/2tBmZDGvkuYF1xzOToGXOjM7I2n1Q/sJQ9BJKbPX9EKaYo/WNu
n4PcneRVqdZyFQhkn62ilzvEO4ZDyQizQTQI8+VB5bzc8ZJ3gDxwZRQ0T4Blx+7X
ojEfLreGVX5cwKC/ZPb5k+fRRUfnvPUxy22NJTwSvOJQ8VVSvy4=
=0usY
-----END PGP SIGNATURE-----
Just a follow up: currently, suppress() only supports in-memory stores
(note, that `suppress()` has it's own store).

For the actually `reduce()` store, you can pick between RocksDB and
in-memory (default is RocksDB). Hence, if you restart an application on
the same host, it should not be necessary to reload the state from the
changelog topic if you use RocksDB.

However, the suppress buffer must be recreated from the
suppress-changelog topics on restart atm.

Originally, `suppress()` intended to support persistent stores as well,
but it was not implement yet. We hope to close this gap in the future.

>>> I haven't figured out the reason, but after restart, the app will keep
>>> reset changelog topic offset to ZERO and trigger rebalance.

Resetting to zero would happen is the full state needs to be recovered.
However, this should not result in a rebalance. Can you elaborate on the
rebalancing issue you described?



-Matthias

On 10/28/19 5:54 PM, Alex Brekken wrote:
> I assume you're using RocksDB as your state stores... The bytes out you're
> seeing on the changelog topics is probably because they are restoring your
> state stores. If your state stores are in-memory, then on every
> application startup they're going to be restored from the changelog
> topics. If your state stores are persistent (saved to disk), then a
> restore can still happen if you've lost your filesystem. (maybe you're
> doing a state store cleanup on startup/shutdown, or have temporal storage
> such as emptyDir in k8s, for example) So *I think* what you're seeing is
> normal, though if you want to dig deeper there are rocksdb metrics that can
> be exposed and will show restore related info. Additionally, there is a
> StateRestoreListener interface that you can implement if you'd like to log
> some of the state store restoration details.
>
> Alex
>
> On Mon, Oct 28, 2019 at 4:41 PM Xiyuan Hu <xiyuan.huhu@gmail.com> wrote:
>
>> Hi,
>> I'm using 2.3.1 now and having the same issue. During restarting, I
>> noticed a lot logging like below:
>> Seeking to EARLIEST offset of partition
>> XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41
>> Seeking to EARLIEST offset of partition
>> XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41
>>
>> After restarting, the bytesout of changelog topic is as high as
>> 800-900MB/s while normally, it has zero bytes out. Is this expected?
>> I haven't figured out the reason, but after restart, the app will keep
>> reset changelog topic offset to ZERO and trigger rebalance. It seems a
>> dead loop?
>> Rebalance -> reset to ZERO -> rebalance
>>
>> Is there any config I should set?
>>
>> Thanks!
>>
>> On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <matthias@confluent.io>
>> wrote:
>>>
>>> What version are you using? We fixed couple of bugs in `suppress()` -- I
>>> would recommend to use latest 2.3.1 bug-fix release.
>>>
>>>
>>> -Matthia
>>>
>>> On 10/25/19 9:12 AM, Tao Wang wrote:
>>>> When using suppress operator with windowed Ktable, it looks like
>> restarting the kafka stream causes the aggregated messages from the
>> SUPPRESS-STATE-STORE published again..
>>>>
>>>> Here is the sudo code .. anything I am missing or anything can be done
>> to avoid this ..
>>>>
>>>>
>>>> KTable<Windowed<String>, String> test = <KStreamObject>
>>>> .groupByKey()
>>>>
>> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
>>>> .aggregate(
>>>> ....
>>>> ,
>>>>
>> Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
>>>> .withRetention(Duration.ofMinutes(5))
>>>> .with(Serdes.String(), Serdes.String())
>>>> )
>>>>
>> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>>>>
>>>> .toStream()
>>>>
>>>> .to("<topic_out>")
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> So when restarting the stream app, the <topic_out> will have
>> duplicated messages from a while back ... is this expected behavior ?
>>>>
>>>> Thanks,
>>>> Tao Wang
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> ________________________________
>>>>
>>>> This message may contain confidential information and is intended for
>> specific recipients unless explicitly noted otherwise. If you have reason
>> to believe you are not an intended recipient of this message, please delete
>> it and notify the sender. This message may not represent the opinion of
>> Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and
>> does not constitute a contract or guarantee. Unencrypted electronic mail is
>> not secure and the recipient of this message is expected to provide
>> safeguards from viruses and pursue alternate means of communication where
>> privacy or a binding message is desired.
>>>>
>>>
>>
>

Comments