Re: Restart of kafka-streams with more than one partition on topic is reprocessing the data from beginning of the topic
Hi Jonathan,
Thanks for the suggestion. It is working fine with the kafka 2.2.1-rc1 vesrion.
Do we have any tentative release date for this version. Please let me know if you have any info on this.
Thanks in Advance.
Thanks
Kalyani
On 2019/05/24 08:00:33, Jonathan Santilli <jonathansantilli@gmail.com> wrote:
> Hello Kalyani,
>
> try testing the RC kafka-2.2.1-rc1, for what you describe seems to be a
> problem that has been solved in the version 2.2.1 (
> https://issues.apache.org/jira/browse/KAFKA-7895) (which is under voting
> right now 2.2.1-RC1
> https://home.apache.org/~vahid/kafka-2.2.1-rc1/RELEASE_NOTES.html)
> I have tested the App that was suffering that problem and now is solved. Of
> course, you need to test your own App.
>
> Hope that helps.
>
> Cheers!
> --
> Jonathan
>
>
>
> On Thu, May 23, 2019 at 5:37 PM kalyani yarlagadda <
> kalyani.yarlagadda1@gmail.com> wrote:
>
> > Hi,
> >
> > I need assistance in the below scenario. Please help me with this.
> >
> > I am using the hopping time window in Kafka streams. I am facing an issue
> > on the restart of my Kafka application, the application is processing the
> > data from the beginning offset.
> > However, it is happening only when the topic has more than one partition.
> > If the topic has only 1 partition then on the restart of the application
> > the sliding window is working fine.
> >
> > *Kafka Version:* *2.1.0*
> >
> > *Eg:*
> >
> > Time Window is 4 hours
> > advanceBy 5 minutes
> > Application is started at time A and running for every 5minutes with the
> > stream data of 4hours
> > and now stopped at a timestamp say X, and restarted at timestamp Y
> >
> > *The behavior for a single partition*: After the restart, the streams are
> > processed from time X to time Y for every 5minutes.
> > *The behavior for a more than one partition: *After the restart, the
> > streams are processed from time A to time Y for every 5minutes.
> >
> > *I am adding the POC code below*
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *// define the time window as a hopping time window TimeWindows timeWindow
> > = TimeWindows.of(Duration.ofHours(4)) .advanceBy(Duration.ofMinutes(5))
> > .grace(Duration.ofMinutes(1)); KTable<Windowed<String>, MetricsTimeSeries>
> > windowedMetricsTimeSeriesStream = builder.stream("metrics_ip",
> > Consumed.with(Serdes.String(), new JSONSerde<>())) .groupByKey()
> > .windowedBy(timeWindow) .aggregate(() -> new MetricsTimeSeries(), /*
> > initializer */ * //*MetricsTimeSeries* is the aggregator class
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > * (aggKey, newValue, aggValue) -> { aggValue.addDataPoint(newValue); return
> > aggValue; }, /* adder */
> > Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /*
> > state store name */
> >
> > .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(10*1024*1024).shutDownWhenFull()));
> > windowedMetricsTimeSeriesStream .toStream() .map((key, value) -> //mapping
> > logic goes here ) .to("metrics_op");*
> >
> > *Properties set to Kafka Streams:*
> >
> >
> >
> >
> >
> >
> > *StreamsConfig.APPLICATION_ID_CONFIG -
> >
> > "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
> > - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -
> >
> > Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > - Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
> > - JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS -
> > JSONSerde.classConsumerConfig.AUTO_OFFSET_RESET_CONFIG - "latest"*
> >
> >
> >
> >
> >
> > Thanks in Advance.
> >
> > Kalyani Y,
> > 9177982636
> >
>
>
> --
> Santilli Jonathan
>
Thanks for the suggestion. It is working fine with the kafka 2.2.1-rc1 vesrion.
Do we have any tentative release date for this version. Please let me know if you have any info on this.
Thanks in Advance.
Thanks
Kalyani
On 2019/05/24 08:00:33, Jonathan Santilli <jonathansantilli@gmail.com> wrote:
> Hello Kalyani,
>
> try testing the RC kafka-2.2.1-rc1, for what you describe seems to be a
> problem that has been solved in the version 2.2.1 (
> https://issues.apache.org/jira/browse/KAFKA-7895) (which is under voting
> right now 2.2.1-RC1
> https://home.apache.org/~vahid/kafka-2.2.1-rc1/RELEASE_NOTES.html)
> I have tested the App that was suffering that problem and now is solved. Of
> course, you need to test your own App.
>
> Hope that helps.
>
> Cheers!
> --
> Jonathan
>
>
>
> On Thu, May 23, 2019 at 5:37 PM kalyani yarlagadda <
> kalyani.yarlagadda1@gmail.com> wrote:
>
> > Hi,
> >
> > I need assistance in the below scenario. Please help me with this.
> >
> > I am using the hopping time window in Kafka streams. I am facing an issue
> > on the restart of my Kafka application, the application is processing the
> > data from the beginning offset.
> > However, it is happening only when the topic has more than one partition.
> > If the topic has only 1 partition then on the restart of the application
> > the sliding window is working fine.
> >
> > *Kafka Version:* *2.1.0*
> >
> > *Eg:*
> >
> > Time Window is 4 hours
> > advanceBy 5 minutes
> > Application is started at time A and running for every 5minutes with the
> > stream data of 4hours
> > and now stopped at a timestamp say X, and restarted at timestamp Y
> >
> > *The behavior for a single partition*: After the restart, the streams are
> > processed from time X to time Y for every 5minutes.
> > *The behavior for a more than one partition: *After the restart, the
> > streams are processed from time A to time Y for every 5minutes.
> >
> > *I am adding the POC code below*
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *// define the time window as a hopping time window TimeWindows timeWindow
> > = TimeWindows.of(Duration.ofHours(4)) .advanceBy(Duration.ofMinutes(5))
> > .grace(Duration.ofMinutes(1)); KTable<Windowed<String>, MetricsTimeSeries>
> > windowedMetricsTimeSeriesStream = builder.stream("metrics_ip",
> > Consumed.with(Serdes.String(), new JSONSerde<>())) .groupByKey()
> > .windowedBy(timeWindow) .aggregate(() -> new MetricsTimeSeries(), /*
> > initializer */ * //*MetricsTimeSeries* is the aggregator class
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > * (aggKey, newValue, aggValue) -> { aggValue.addDataPoint(newValue); return
> > aggValue; }, /* adder */
> > Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /*
> > state store name */
> >
> > .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(10*1024*1024).shutDownWhenFull()));
> > windowedMetricsTimeSeriesStream .toStream() .map((key, value) -> //mapping
> > logic goes here ) .to("metrics_op");*
> >
> > *Properties set to Kafka Streams:*
> >
> >
> >
> >
> >
> >
> > *StreamsConfig.APPLICATION_ID_CONFIG -
> >
> > "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
> > - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -
> >
> > Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > - Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
> > - JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS -
> > JSONSerde.classConsumerConfig.AUTO_OFFSET_RESET_CONFIG - "latest"*
> >
> >
> >
> >
> >
> > Thanks in Advance.
> >
> > Kalyani Y,
> > 9177982636
> >
>
>
> --
> Santilli Jonathan
>
Comments
Post a Comment