Skip to main content

Re: kafka stream - sliding window - getting unexpected output

Emitting intermediate result is by-design.

If you don't want to get intermediate result, you can add `suppress()`
after the aggregation and configure it to only "emit on window close".

-Matthias

On 5/17/22 3:20 AM, Shankar Mane wrote:
> Hi All,
>
> Our use case is to use sliding window. (for e.g. at any point, whenever
>> user performs any actions at time [ t1 ], we would like to see his activity
>> in [ t1 - last 24 hours]. Using this, to show the user some recommendations.
>
>
>
> -- I have code ready and it works without any errors.
> -- aggregations happen as expected.
> -- but the output generated is unexpected. As windows gets slides, i am
> getting mixed output which includes intermediate aggregated records also
> coming with final aggregated outputs.
>
> Could someone please help me here ? what can I do here to get ONLY final
> aggregated output.
>
>
> Code snippet :
> ________________________________________________________________
>
>
>
> builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
> .filter((k, v) -> v != null)
> .map((k,v) -> KeyValue.pair(v.getUserId(), v))
> //.through("slidingbykey",
> Produced.with(Serdes.String(), inputSerde))
> .groupByKey()
>
> .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
> windowDuration))
> .aggregate(OutputPojo::new, (k, tr, out) -> {
> out.setUserId(tr.getUserId());
> out.setCount(out.getCount() +1);
> out.setSum(out.getSum() + tr.getInt4());
> out.setUuid(tr.getUuid());
>
> out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
> waitForMs(200); //added delay just for analysing output
> return out;
> }, Materialized.with(stringSerde, outputSerde))
> .suppress(Suppressed.untilTimeLimit(windowDuration,
> Suppressed.BufferConfig.unbounded()))
> .toStream()
> .map((Windowed<String> key, OutputPojo out) -> {
> return new KeyValue<>(key.key(),out) ;
> })
> .print(Printed.toSysOut());
> // .to(aveTempOutputTopic, Produced.with(stringSerde,
> outputSerde))
> ;
>
>
>
> ________________________________________________________________
>
>
> Input data :
>
> for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done
>> {'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid':
>> '64f019ee-9cf4-427d-b4c9-f2b5f88820e1'}
>> {'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid':
>> 'cf173b3e-c34f-470a-ba15-ef648d0be8b9'}
>> {'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid':
>> '48d2b4ea-052d-42fa-a998-0216d928c034'}
>> {'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid':
>> '55a6c26c-3d2c-46f1-ab3c-04927f660cbe'}
>> {'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid':
>> 'dbfd8cee-565d-496b-b5a8-773ae64bc518'}
>> {'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid':
>> '135dc5cd-50cb-467b-9e63-300fdeedaf75'}
>> {'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid':
>> '66d8e3c7-8f63-43ca-acf1-e39619bf33a0'}
>> {'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid':
>> 'f037712b-42a5-4449-bcc2-cf6eafddf5ad'}
>> {'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid':
>> '7baa4254-b9da-43dc-bbb7-4caede578aeb'}
>> {'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid':
>> '16541989-f3ba-49f6-bd31-bf8a75ba8eac'}
>
>
> ________________________________________________________________
>
>
> Output (*Unexpected*) : below output is captured at each sliding window of
> 1s duration (but input data is published at 2s of interval) :
>
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:31:28.263,
>> uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34) ----> seems older UUID
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
>> strTime=2022-05-17 15:31:28.263, uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
>> strTime=2022-05-17 15:31:56.234, uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:31:58.436, uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:05.029, uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:05.029, uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:07.227, uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:07.227, uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:09.425, uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:09.425, uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:11.623, uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:11.623, uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:13.820, uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:13.820, uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:16.018, uuid=16541989-f3ba-49f6-bd31-bf8a75ba8eac)
>
>
> Regards,
> Shankar
>

Comments