Re: Restart of kafka-streams with more than one partition on topic is reprocessing the data from beginning of the topic
Hi Kalyani,
I'm glad to hear that the issue you reported will be fixed in 2.2.1. The
release is currently being voted on and if no issues is reported on the
current release candidate we'd roll it out within the next week or so.
--Vahid
On Thu, May 30, 2019, 05:30 kalyani.yarlagadda1@gmail.com <
kalyani.yarlagadda1@gmail.com> wrote:
> Hi Jonathan,
>
> Thanks for the suggestion. It is working fine with the kafka 2.2.1-rc1
> vesrion.
> Do we have any tentative release date for this version. Please let me know
> if you have any info on this.
> Thanks in Advance.
>
> Thanks
> Kalyani
>
> On 2019/05/24 08:00:33, Jonathan Santilli <jonathansantilli@gmail.com>
> wrote:
> > Hello Kalyani,
> >
> > try testing the RC kafka-2.2.1-rc1, for what you describe seems to be a
> > problem that has been solved in the version 2.2.1 (
> > https://issues.apache.org/jira/browse/KAFKA-7895) (which is under voting
> > right now 2.2.1-RC1
> > https://home.apache.org/~vahid/kafka-2.2.1-rc1/RELEASE_NOTES.html)
> > I have tested the App that was suffering that problem and now is solved.
> Of
> > course, you need to test your own App.
> >
> > Hope that helps.
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> >
> > On Thu, May 23, 2019 at 5:37 PM kalyani yarlagadda <
> > kalyani.yarlagadda1@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I need assistance in the below scenario. Please help me with this.
> > >
> > > I am using the hopping time window in Kafka streams. I am facing an
> issue
> > > on the restart of my Kafka application, the application is processing
> the
> > > data from the beginning offset.
> > > However, it is happening only when the topic has more than one
> partition.
> > > If the topic has only 1 partition then on the restart of the
> application
> > > the sliding window is working fine.
> > >
> > > *Kafka Version:* *2.1.0*
> > >
> > > *Eg:*
> > >
> > > Time Window is 4 hours
> > > advanceBy 5 minutes
> > > Application is started at time A and running for every 5minutes with
> the
> > > stream data of 4hours
> > > and now stopped at a timestamp say X, and restarted at timestamp Y
> > >
> > > *The behavior for a single partition*: After the restart, the streams
> are
> > > processed from time X to time Y for every 5minutes.
> > > *The behavior for a more than one partition: *After the restart, the
> > > streams are processed from time A to time Y for every 5minutes.
> > >
> > > *I am adding the POC code below*
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *// define the time window as a hopping time window TimeWindows
> timeWindow
> > > = TimeWindows.of(Duration.ofHours(4)) .advanceBy(Duration.ofMinutes(5))
> > > .grace(Duration.ofMinutes(1)); KTable<Windowed<String>,
> MetricsTimeSeries>
> > > windowedMetricsTimeSeriesStream = builder.stream("metrics_ip",
> > > Consumed.with(Serdes.String(), new JSONSerde<>())) .groupByKey()
> > > .windowedBy(timeWindow) .aggregate(() -> new MetricsTimeSeries(), /*
> > > initializer */ * //*MetricsTimeSeries* is the aggregator class
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > * (aggKey, newValue, aggValue) -> { aggValue.addDataPoint(newValue);
> return
> > > aggValue; }, /* adder */
> > > Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /*
> > > state store name */
> > >
> > >
> .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(10*1024*1024).shutDownWhenFull()));
> > > windowedMetricsTimeSeriesStream .toStream() .map((key, value) ->
> //mapping
> > > logic goes here ) .to("metrics_op");*
> > >
> > > *Properties set to Kafka Streams:*
> > >
> > >
> > >
> > >
> > >
> > >
> > > *StreamsConfig.APPLICATION_ID_CONFIG -
> > >
> > >
> "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
> > > - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -
> > >
> > >
> Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > -
> Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
> > > - JSONSerde.class
> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS -
> > > JSONSerde.classConsumerConfig.AUTO_OFFSET_RESET_CONFIG - "latest"*
> > >
> > >
> > >
> > >
> > >
> > > Thanks in Advance.
> > >
> > > Kalyani Y,
> > > 9177982636
> > >
> >
> >
> > --
> > Santilli Jonathan
> >
>
I'm glad to hear that the issue you reported will be fixed in 2.2.1. The
release is currently being voted on and if no issues is reported on the
current release candidate we'd roll it out within the next week or so.
--Vahid
On Thu, May 30, 2019, 05:30 kalyani.yarlagadda1@gmail.com <
kalyani.yarlagadda1@gmail.com> wrote:
> Hi Jonathan,
>
> Thanks for the suggestion. It is working fine with the kafka 2.2.1-rc1
> vesrion.
> Do we have any tentative release date for this version. Please let me know
> if you have any info on this.
> Thanks in Advance.
>
> Thanks
> Kalyani
>
> On 2019/05/24 08:00:33, Jonathan Santilli <jonathansantilli@gmail.com>
> wrote:
> > Hello Kalyani,
> >
> > try testing the RC kafka-2.2.1-rc1, for what you describe seems to be a
> > problem that has been solved in the version 2.2.1 (
> > https://issues.apache.org/jira/browse/KAFKA-7895) (which is under voting
> > right now 2.2.1-RC1
> > https://home.apache.org/~vahid/kafka-2.2.1-rc1/RELEASE_NOTES.html)
> > I have tested the App that was suffering that problem and now is solved.
> Of
> > course, you need to test your own App.
> >
> > Hope that helps.
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> >
> > On Thu, May 23, 2019 at 5:37 PM kalyani yarlagadda <
> > kalyani.yarlagadda1@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I need assistance in the below scenario. Please help me with this.
> > >
> > > I am using the hopping time window in Kafka streams. I am facing an
> issue
> > > on the restart of my Kafka application, the application is processing
> the
> > > data from the beginning offset.
> > > However, it is happening only when the topic has more than one
> partition.
> > > If the topic has only 1 partition then on the restart of the
> application
> > > the sliding window is working fine.
> > >
> > > *Kafka Version:* *2.1.0*
> > >
> > > *Eg:*
> > >
> > > Time Window is 4 hours
> > > advanceBy 5 minutes
> > > Application is started at time A and running for every 5minutes with
> the
> > > stream data of 4hours
> > > and now stopped at a timestamp say X, and restarted at timestamp Y
> > >
> > > *The behavior for a single partition*: After the restart, the streams
> are
> > > processed from time X to time Y for every 5minutes.
> > > *The behavior for a more than one partition: *After the restart, the
> > > streams are processed from time A to time Y for every 5minutes.
> > >
> > > *I am adding the POC code below*
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *// define the time window as a hopping time window TimeWindows
> timeWindow
> > > = TimeWindows.of(Duration.ofHours(4)) .advanceBy(Duration.ofMinutes(5))
> > > .grace(Duration.ofMinutes(1)); KTable<Windowed<String>,
> MetricsTimeSeries>
> > > windowedMetricsTimeSeriesStream = builder.stream("metrics_ip",
> > > Consumed.with(Serdes.String(), new JSONSerde<>())) .groupByKey()
> > > .windowedBy(timeWindow) .aggregate(() -> new MetricsTimeSeries(), /*
> > > initializer */ * //*MetricsTimeSeries* is the aggregator class
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > * (aggKey, newValue, aggValue) -> { aggValue.addDataPoint(newValue);
> return
> > > aggValue; }, /* adder */
> > > Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /*
> > > state store name */
> > >
> > >
> .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(10*1024*1024).shutDownWhenFull()));
> > > windowedMetricsTimeSeriesStream .toStream() .map((key, value) ->
> //mapping
> > > logic goes here ) .to("metrics_op");*
> > >
> > > *Properties set to Kafka Streams:*
> > >
> > >
> > >
> > >
> > >
> > >
> > > *StreamsConfig.APPLICATION_ID_CONFIG -
> > >
> > >
> "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
> > > - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -
> > >
> > >
> Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > -
> Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
> > > - JSONSerde.class
> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS -
> > > JSONSerde.classConsumerConfig.AUTO_OFFSET_RESET_CONFIG - "latest"*
> > >
> > >
> > >
> > >
> > >
> > > Thanks in Advance.
> > >
> > > Kalyani Y,
> > > 9177982636
> > >
> >
> >
> > --
> > Santilli Jonathan
> >
>
Comments
Post a Comment