Hi Tomasz,
Thanks for trying that out. It's not the way I'd expect it to work. I don't remember if there were any follow-up bugs that have been solved in subsequent releases. Just as a long shot, perhaps you can try it on the latest release (3.3.0)?
Otherwise, I think the best path forward would be to file a bug report on the Apache Kafka Jira with enough information to reproduce the issue (or if you're able to provide a repro, that would be awesome).
Thanks, and sorry for the trouble.
-John
On Tue, Sep 27, 2022, at 03:15, Tomasz Gac wrote:
> I upgraded to kafka streams 3.0.0 with positive task.max.idle.ms and it did
> not help.
> When lag is large, the application still consumes data batches without
> interleaving.
>
>
>
> wt., 27 wrz 2022 o 05:51 John Roesler <vvcephei@apache.org> napisał(a):
>
>> Hi Tomasz,
>>
>> Thanks for asking. This sounds like the situation that we fixed in Apache
>> Kafka 3.0, with KIP-695 (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization
>> ).
>>
>> Can you try upgrading and let us know if this fixes the problem?
>>
>> Thanks,
>> -John
>>
>> On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote:
>> > Hi group,
>> >
>> > I wrote a simple kafka streams application with topology such as below:
>> >
>> > builder.addStateStore(
>> >> Stores.keyValueStoreBuilder(
>> >> Stores.persistentKeyValueStore("STORE"),
>> >> Serdes.String(), Serdes.String())
>> >> .withLoggingEnabled(storeConfig))|
>> >
>> >
>> >
>> > builder.stream("TOPIC_1", Consumed.with(...))
>> >> .merge(builder.stream("TOPIC_2", Consumed.with(...))
>> >> .merge(builder.stream("TOPIC_3", Consumed.with(...))
>> >> .map(...) // stateless
>> >> .transform(..., "STORE") // stateful
>> >
>> > .to("TOPIC_4");
>> >
>> >
>> > All input topics have 6 partitions, and for the purpose of testing, we
>> are
>> > producing data to partition number 5.
>> > We are using kafka streams version 2.8.1, broker version 2.12-2.1.1
>> >
>> > The application works as expected when it has caught up to the lag, eg.
>> > when reset tool is used with --to-latest parameter.
>> > However, when the application is processing the messages starting from
>> the
>> > earliest offset, the inputs are provided in batches such as:
>> >
>> > - ~1000 messages from TOPIC_1
>> > - ~1000 messages from TOPIC_2
>> > - ~1000 messages from TOPIC_3
>> >
>> > All of the messages have timestamps provided in headers, so I would
>> expect
>> > the application to interleave the messages from these three topics so
>> that
>> > their timestamps are in the ascending order.
>> > However, this is not the case that I am observing. The messages are
>> > processed in batches.
>> >
>> > How do I configure my application so that it processes messages in order
>> > when it is catching up to the lag?
>>
Thanks for trying that out. It's not the way I'd expect it to work. I don't remember if there were any follow-up bugs that have been solved in subsequent releases. Just as a long shot, perhaps you can try it on the latest release (3.3.0)?
Otherwise, I think the best path forward would be to file a bug report on the Apache Kafka Jira with enough information to reproduce the issue (or if you're able to provide a repro, that would be awesome).
Thanks, and sorry for the trouble.
-John
On Tue, Sep 27, 2022, at 03:15, Tomasz Gac wrote:
> I upgraded to kafka streams 3.0.0 with positive task.max.idle.ms and it did
> not help.
> When lag is large, the application still consumes data batches without
> interleaving.
>
>
>
> wt., 27 wrz 2022 o 05:51 John Roesler <vvcephei@apache.org> napisał(a):
>
>> Hi Tomasz,
>>
>> Thanks for asking. This sounds like the situation that we fixed in Apache
>> Kafka 3.0, with KIP-695 (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization
>> ).
>>
>> Can you try upgrading and let us know if this fixes the problem?
>>
>> Thanks,
>> -John
>>
>> On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote:
>> > Hi group,
>> >
>> > I wrote a simple kafka streams application with topology such as below:
>> >
>> > builder.addStateStore(
>> >> Stores.keyValueStoreBuilder(
>> >> Stores.persistentKeyValueStore("STORE"),
>> >> Serdes.String(), Serdes.String())
>> >> .withLoggingEnabled(storeConfig))|
>> >
>> >
>> >
>> > builder.stream("TOPIC_1", Consumed.with(...))
>> >> .merge(builder.stream("TOPIC_2", Consumed.with(...))
>> >> .merge(builder.stream("TOPIC_3", Consumed.with(...))
>> >> .map(...) // stateless
>> >> .transform(..., "STORE") // stateful
>> >
>> > .to("TOPIC_4");
>> >
>> >
>> > All input topics have 6 partitions, and for the purpose of testing, we
>> are
>> > producing data to partition number 5.
>> > We are using kafka streams version 2.8.1, broker version 2.12-2.1.1
>> >
>> > The application works as expected when it has caught up to the lag, eg.
>> > when reset tool is used with --to-latest parameter.
>> > However, when the application is processing the messages starting from
>> the
>> > earliest offset, the inputs are provided in batches such as:
>> >
>> > - ~1000 messages from TOPIC_1
>> > - ~1000 messages from TOPIC_2
>> > - ~1000 messages from TOPIC_3
>> >
>> > All of the messages have timestamps provided in headers, so I would
>> expect
>> > the application to interleave the messages from these three topics so
>> that
>> > their timestamps are in the ascending order.
>> > However, this is not the case that I am observing. The messages are
>> > processed in batches.
>> >
>> > How do I configure my application so that it processes messages in order
>> > when it is catching up to the lag?
>>
Comments
Post a Comment