-----BEGIN PGP SIGNATURE-----
iQIzBAEBCAAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl6CKpUACgkQO4miYXKq
/OghQhAAs/3exA1eDlPhwyBzSBQ3Hv3bw0vpU5d00c2+iaRjzFmP8zI9qw9TkxEe
E8sIONEfi3Kyw/imr5XIhHMoa+0fd/1Y1/YObK6kiQxi+xYB540K5ji0vlFvKPij
6gCJ0fdKSyzyrvbuUhrnA3G/h1hF0Oz98tJbC25MnALQX/huPMQDKNrkSsUls0pn
cnSrezfDnPLWeHjETbYyx3A2cTRl2JM8b2Kh5R9iS3JV5fgLUSAz64ewfDnLmVKp
BgDlLrkIcbCuLza2WRqI/Y26lYV5CTIzqGzpyL8ZnM1xuhvCtOvNIWL/Xi13KjPk
eMZUByqupq5JIEadceLKfxWrA1wHFPTZNMCk5QbeO/8U1KIT3deRq/OvOtw7ubyh
0CRH0BeU5+Pu/clHnpbxKzxTeLS5mhaM9TuooKoqh8Ks+qXlX9FV0FuvAvq1oL/b
14WYXhUUtPhVSbzDAuCaenin2OL7HxHyqOooYlYzJKfygN3b2kvDhjmP6oMYjU+j
RirxEP2AFECkKLB4jcJWyCJHRxcarrgMms0TDdokl5At9O8rYugP8z/QtDpV/uFL
HMqHPity1XoveUUfo8Fww8lrnoTY2kGgEMhTV0Xn5ywyCQKeurBdzltYDpuSMgsW
PTer25PBxYEhoyvQxwEYlVWjz8aBObqwG6gSXVw0x93uNObhUR4=
=NAl2
-----END PGP SIGNATURE-----
`windowSize` has not impact on used segments: it is only used to compute
the window end timestamp when you fetch a window, because the end-time
is not stored explicitly (as all windows are assumed to have the same
window size; i.e., it's a storage optimization to safe the bytes for
window end timestamp but re-compute it only-the-fly on read).
`retainDuplicates` should be set to `false` for your use case. If you
set it to `true`, you effectively make the store "append only" and each
key will internally be extended with a counter to create a unique
primary key for RocksDB to allow storing multiple record with the same
key (i.e., to store duplicates :)). This feature is used by Kafka
Streams to implement stream-stream joins, i.e., instead of storing
windowed data, the store is used to store raw records for a certain
period of time. Note that for KStreams there is no notion of a primary
key and record keys are not unique and the stored timestamp will just be
be the record timestamp.
Does this help?
-Matthias
On 3/30/20 2:58 AM, Sachin Mittal wrote:
> Hi,
> I understood how window stores are implemented using rocksdb.
> When creating an instance of RocksDBWindowStore we pass two additional
> arguments:
> retainDuplicates
> windowSize
>
> I have not clearly understood the purpose of these two.
> Like say in my application I just create one windowed store of a given
size
> say 10 minutes and retention of 30 minutes.
>
> Does this mean internally it will create a one rocksdb segment for every
> record within 10 minutes boundary and retain it for 30 minutes?
> If a new record arrives beyond that 10 minutes a new segment gets created?
>
> How does retainDuplicates comes into play here?
>
> Thanks
> Sachin
>
>
>
> On Mon, Mar 2, 2020 at 12:49 AM Matthias J. Sax <mjsax@apache.org> wrote:
>
> If you want to put a record into multiple window, you can do a `put()`
> for each window.
>
> The DSL uses the store in the exact same manner for hopping window
> (compare the code I shared in the last reply). Even if windows are
> overlapping, the grouping-key+window-start-timestamp is a unique
> primary key for each window.
>
> -Matthias
>
> On 2/27/20 9:26 AM, Sachin Mittal wrote:
>>>> Hi, Yes I get that when I am using the apis provided by kstream I
>>>> can basically use both: - Tumbling time window (non-overlapping,
>>>> gap-less windows) - Hopping time window (Time-based Fixed-size,
>>>> overlapping windows)
>>>>
>>>> I wanted to know if I am using state store directly when created
>>>> using a RocksDbWindowBytesStoreSupplier. In that case the
>>>> RocksDBWindowStore created will always be of type Tumbling. ie any
>>>> record put into that store will be part of one window only.
>>>>
>>>> Thanks Sachin
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Feb 27, 2020 at 1:09 PM Matthias J. Sax <mjsax@apache.org>
>>>> wrote:
>>>>
>>>>> What you call "sliding window" is called "hopping window" in
>>>>> Kafka Streams.
>>>>>
>>>>> And yes, you can use a windowed-store for this case: In fact, a
>>>>> non-overlapping tumbling window is just a special case of a
>>>>> hopping window with advance == window-size.
>>>>>
>>>>> In Kafka Streams we have a single implementation for hopping
>>>>> windows (that we use for tumbling windows, too):
>>>>>
>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/
> apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
> <https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java>
>>>>>
>>>>>
>>>>>
>>>>>
> -Matthias
>>>>>
>>>>> On 2/26/20 9:47 PM, Sachin Mittal wrote:
>>>>>> Hi, So far how I have understood is that when we create a
>>>>>> rocksdb window
>>>>> store;
>>>>>> we specify a window size and retention period.
>>>>>>
>>>>>> So windows are created from epoch time based on size, say size
>>>>>> if 100
>>>>> then
>>>>>> windows are: [0, 100), [100, 200), [200, 300) ...
>>>>>>
>>>>>> Windows are retained based on retention period and after which
>>>>>> it is dropped.
>>>>>>
>>>>>> Also a window is divided into segments which is implemented
>>>>>> using a
>>>>> treemap.
>>>>>>
>>>>>> Please confirm if my understanding is correct.
>>>>>>
>>>>>> Also looks from all this is that windows are always hopping.
>>>>>>
>>>>>> Is there a case of sliding windows that can be created? If yes
>>>>>> how? Example of sliding window would be: [0, 100), [75, 175),
>>>>>> [150, 250) ...
>>>>>>
>>>>>> Thanks Sachin
>>>>>>
>>>>>
>>>>>
>>>>
>>
>
iQIzBAEBCAAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl6CKpUACgkQO4miYXKq
/OghQhAAs/3exA1eDlPhwyBzSBQ3Hv3bw0vpU5d00c2+iaRjzFmP8zI9qw9TkxEe
E8sIONEfi3Kyw/imr5XIhHMoa+0fd/1Y1/YObK6kiQxi+xYB540K5ji0vlFvKPij
6gCJ0fdKSyzyrvbuUhrnA3G/h1hF0Oz98tJbC25MnALQX/huPMQDKNrkSsUls0pn
cnSrezfDnPLWeHjETbYyx3A2cTRl2JM8b2Kh5R9iS3JV5fgLUSAz64ewfDnLmVKp
BgDlLrkIcbCuLza2WRqI/Y26lYV5CTIzqGzpyL8ZnM1xuhvCtOvNIWL/Xi13KjPk
eMZUByqupq5JIEadceLKfxWrA1wHFPTZNMCk5QbeO/8U1KIT3deRq/OvOtw7ubyh
0CRH0BeU5+Pu/clHnpbxKzxTeLS5mhaM9TuooKoqh8Ks+qXlX9FV0FuvAvq1oL/b
14WYXhUUtPhVSbzDAuCaenin2OL7HxHyqOooYlYzJKfygN3b2kvDhjmP6oMYjU+j
RirxEP2AFECkKLB4jcJWyCJHRxcarrgMms0TDdokl5At9O8rYugP8z/QtDpV/uFL
HMqHPity1XoveUUfo8Fww8lrnoTY2kGgEMhTV0Xn5ywyCQKeurBdzltYDpuSMgsW
PTer25PBxYEhoyvQxwEYlVWjz8aBObqwG6gSXVw0x93uNObhUR4=
=NAl2
-----END PGP SIGNATURE-----
`windowSize` has not impact on used segments: it is only used to compute
the window end timestamp when you fetch a window, because the end-time
is not stored explicitly (as all windows are assumed to have the same
window size; i.e., it's a storage optimization to safe the bytes for
window end timestamp but re-compute it only-the-fly on read).
`retainDuplicates` should be set to `false` for your use case. If you
set it to `true`, you effectively make the store "append only" and each
key will internally be extended with a counter to create a unique
primary key for RocksDB to allow storing multiple record with the same
key (i.e., to store duplicates :)). This feature is used by Kafka
Streams to implement stream-stream joins, i.e., instead of storing
windowed data, the store is used to store raw records for a certain
period of time. Note that for KStreams there is no notion of a primary
key and record keys are not unique and the stored timestamp will just be
be the record timestamp.
Does this help?
-Matthias
On 3/30/20 2:58 AM, Sachin Mittal wrote:
> Hi,
> I understood how window stores are implemented using rocksdb.
> When creating an instance of RocksDBWindowStore we pass two additional
> arguments:
> retainDuplicates
> windowSize
>
> I have not clearly understood the purpose of these two.
> Like say in my application I just create one windowed store of a given
size
> say 10 minutes and retention of 30 minutes.
>
> Does this mean internally it will create a one rocksdb segment for every
> record within 10 minutes boundary and retain it for 30 minutes?
> If a new record arrives beyond that 10 minutes a new segment gets created?
>
> How does retainDuplicates comes into play here?
>
> Thanks
> Sachin
>
>
>
> On Mon, Mar 2, 2020 at 12:49 AM Matthias J. Sax <mjsax@apache.org> wrote:
>
> If you want to put a record into multiple window, you can do a `put()`
> for each window.
>
> The DSL uses the store in the exact same manner for hopping window
> (compare the code I shared in the last reply). Even if windows are
> overlapping, the grouping-key+window-start-timestamp is a unique
> primary key for each window.
>
> -Matthias
>
> On 2/27/20 9:26 AM, Sachin Mittal wrote:
>>>> Hi, Yes I get that when I am using the apis provided by kstream I
>>>> can basically use both: - Tumbling time window (non-overlapping,
>>>> gap-less windows) - Hopping time window (Time-based Fixed-size,
>>>> overlapping windows)
>>>>
>>>> I wanted to know if I am using state store directly when created
>>>> using a RocksDbWindowBytesStoreSupplier. In that case the
>>>> RocksDBWindowStore created will always be of type Tumbling. ie any
>>>> record put into that store will be part of one window only.
>>>>
>>>> Thanks Sachin
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Feb 27, 2020 at 1:09 PM Matthias J. Sax <mjsax@apache.org>
>>>> wrote:
>>>>
>>>>> What you call "sliding window" is called "hopping window" in
>>>>> Kafka Streams.
>>>>>
>>>>> And yes, you can use a windowed-store for this case: In fact, a
>>>>> non-overlapping tumbling window is just a special case of a
>>>>> hopping window with advance == window-size.
>>>>>
>>>>> In Kafka Streams we have a single implementation for hopping
>>>>> windows (that we use for tumbling windows, too):
>>>>>
>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/
> apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
> <https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java>
>>>>>
>>>>>
>>>>>
>>>>>
> -Matthias
>>>>>
>>>>> On 2/26/20 9:47 PM, Sachin Mittal wrote:
>>>>>> Hi, So far how I have understood is that when we create a
>>>>>> rocksdb window
>>>>> store;
>>>>>> we specify a window size and retention period.
>>>>>>
>>>>>> So windows are created from epoch time based on size, say size
>>>>>> if 100
>>>>> then
>>>>>> windows are: [0, 100), [100, 200), [200, 300) ...
>>>>>>
>>>>>> Windows are retained based on retention period and after which
>>>>>> it is dropped.
>>>>>>
>>>>>> Also a window is divided into segments which is implemented
>>>>>> using a
>>>>> treemap.
>>>>>>
>>>>>> Please confirm if my understanding is correct.
>>>>>>
>>>>>> Also looks from all this is that windows are always hopping.
>>>>>>
>>>>>> Is there a case of sliding windows that can be created? If yes
>>>>>> how? Example of sliding window would be: [0, 100), [75, 175),
>>>>>> [150, 250) ...
>>>>>>
>>>>>> Thanks Sachin
>>>>>>
>>>>>
>>>>>
>>>>
>>
>
Comments
Post a Comment