Skip to main content

Re: Custom Kafka Streams State Restore Logic

Thanks.

I agree. Seems your options are limited. The API is not really a good
fix for what you want to do... Sorry.

-Matthias

On 1/18/23 7:48 AM, Upesh Desai wrote:
> Hi Matthias, thanks for your reply! Sure, so the use case is as follows.
>
> We currently store some time series data in the state store, and it is
> stored to a changelog as well. The time series data is bucketed (5
> minutes, 1 hour, and 1 day). Our goal was to always only have a max of 2
> time buckets in the store at once. As we receive new timeseries data, we
> figure out what time bucket it belongs to, and add it to its respective
> bucket. We have a "grace period" which allows for late arriving data to
> be processed even after a time bucket has ended. That's the reason why
> we have this constraint of 2 time buckets max within the store; 1 for
> the previous bucket in its grace period, 1 for the current bucket.
>
> So we wanted to extend the base state store and add a simple in-memory
> map to track the 2 time buckets per timeseries (that's the store key). A
> couple reasons why we don't want to add this as a separate state store
> or the existing store are:
> 1. There is a ton of serialization / deserialization that happens behind
> the scenes
>
> 2. This new time bucket tracking map would only be updated a couple
> times per time bucket, and does not need to be updated on every message
> read.
>
> 3. There's no API on the included stores that allows us to do so
>
> Therefore, I thought it best to try to use the existing store
> functionality, create a "new state store" that really just instantiates
> one of the included stores within, add this in memory map, and then plug
> into/alter/extend the restore functionality to populate the time bucket
> tracking map during restore time.
>
> It sounds like I will either have to 1) create a custom state store from
> scratch, or 2) see if there is a post-restore hook that can then call a
> method to scan the whole store and build up the time bucket map before
> starting to process.
>
> Any advice on Kafka streams / state store logic would be appreciated!
>
> -Upesh
>
> Upesh Desai​  | Senior Software Developer  | *udesai@itrsgroup.com*
> <mailto:udesai@itrsgroup.com>
>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>
>
> <https://www.itrsgroup.com/>
>
>
>
> *From: *Matthias J. Sax <mjsax@apache.org>
> *Date: *Wednesday, January 18, 2023 at 12:50 AM
> *To: *users@kafka.apache.org <users@kafka.apache.org>
> *Subject: *Re: Custom Kafka Streams State Restore Logic
>
> Guess it depends what you actually want to achieve?
>
> Also note: `InMemoryWindowStore` is an internal class, and thus might
> change at any point, and it was never designed to be extended...
>
>
> -Matthias
>
> On 1/13/23 2:55 PM, Upesh Desai wrote:
>> Hello all,
>>
>> I am currently working on creating a new InMemoryWindowStore, by
>> extending the default in memory window store. One of the roadblocks I've
>> run into is finding a way to add some custom logic when the state store
>> is being restored from the changelog. I know that this is possible if I
>> completely write the store logic from scratch, but we really only want
>> to add a tiny bit of custom logic, and do not want to have to replicate
>> all the existing logic.
>>
>> Is there a simple way for this to be done? I see the default
>> implementation in the InMemoryWindowStore :
>>
>> context.register(
>>      root,
>>      (RecordBatchingStateRestoreCallback) records -> {
>>          for (final ConsumerRecord<byte[], byte[]> record : records) {
>>              put(
>>                  Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
>>                  record.value(),
>> /extractStoreTimestamp/(record.key())
>>              );
>>
>> ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(
>>                  record,
>>                  consistencyEnabled,
>>                  position
>>              );
>>          }
>>      }
>> );
>>
>> Thanks in advance!
>>
>> Upesh
>>
>> <https://www.itrsgroup.com/ <https://www.itrsgroup.com/>>
>>
>>
>> Upesh Desai​
>> Senior Software Developer
>>
>> *udesai@itrsgroup.com* <mailto:udesai@itrsgroup.com <mailto:udesai@itrsgroup.com>>
>> *www.itrsgroup.com* <https://www.itrsgroup.com/ <https://www.itrsgroup.com/>>
>>
>> Internet communications are not secure and therefore the ITRS Group does
>> not accept legal responsibility for the contents of this message. Any
>> view or opinions presented are solely those of the author and do not
>> necessarily represent those of the ITRS Group unless otherwise
>> specifically stated.
>>
>> [itrs.email.signature]
>>
>

Comments