Skip to main content

Re: Restart of kafka-streams with more than one partition on topic is reprocessing the data from beginning of the topic

Hello Kalyani, am really happy that it worked for you, am also waiting for
the release.

Thanks a lot Vahid for managing the release.


Cheers!
--
Jonathan



On Thu, May 30, 2019 at 1:30 PM kalyani.yarlagadda1@gmail.com <
kalyani.yarlagadda1@gmail.com> wrote:

> Hi Jonathan,
>
> Thanks for the suggestion. It is working fine with the kafka 2.2.1-rc1
> vesrion.
> Do we have any tentative release date for this version. Please let me know
> if you have any info on this.
> Thanks in Advance.
>
> Thanks
> Kalyani
>
> On 2019/05/24 08:00:33, Jonathan Santilli <jonathansantilli@gmail.com>
> wrote:
> > Hello Kalyani,
> >
> > try testing the RC kafka-2.2.1-rc1, for what you describe seems to be a
> > problem that has been solved in the version 2.2.1 (
> > https://issues.apache.org/jira/browse/KAFKA-7895) (which is under voting
> > right now 2.2.1-RC1
> > https://home.apache.org/~vahid/kafka-2.2.1-rc1/RELEASE_NOTES.html)
> > I have tested the App that was suffering that problem and now is solved.
> Of
> > course, you need to test your own App.
> >
> > Hope that helps.
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> >
> > On Thu, May 23, 2019 at 5:37 PM kalyani yarlagadda <
> > kalyani.yarlagadda1@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I need assistance in the below scenario. Please help me with this.
> > >
> > > I am using the hopping time window in Kafka streams. I am facing an
> issue
> > > on the restart of my Kafka application, the application is processing
> the
> > > data from the beginning offset.
> > > However, it is happening only when the topic has more than one
> partition.
> > > If the topic has only 1 partition then on the restart of the
> application
> > > the sliding window is working fine.
> > >
> > > *Kafka Version:* *2.1.0*
> > >
> > > *Eg:*
> > >
> > > Time Window is 4 hours
> > > advanceBy 5 minutes
> > > Application is started at time A and running for every 5minutes with
> the
> > > stream data of 4hours
> > > and now stopped at a timestamp say X, and restarted at timestamp Y
> > >
> > > *The behavior for a single partition*: After the restart, the streams
> are
> > > processed from time X to time Y for every 5minutes.
> > > *The behavior for a more than one partition: *After the restart, the
> > > streams are processed from time A to time Y for every 5minutes.
> > >
> > > *I am adding the POC code below*
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *// define the time window as a hopping time window TimeWindows
> timeWindow
> > > = TimeWindows.of(Duration.ofHours(4)) .advanceBy(Duration.ofMinutes(5))
> > > .grace(Duration.ofMinutes(1)); KTable<Windowed<String>,
> MetricsTimeSeries>
> > > windowedMetricsTimeSeriesStream = builder.stream("metrics_ip",
> > > Consumed.with(Serdes.String(), new JSONSerde<>())) .groupByKey()
> > > .windowedBy(timeWindow) .aggregate(() -> new MetricsTimeSeries(), /*
> > > initializer */ * //*MetricsTimeSeries* is the aggregator class
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > * (aggKey, newValue, aggValue) -> { aggValue.addDataPoint(newValue);
> return
> > > aggValue; }, /* adder */
> > > Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /*
> > > state store name */
> > >
> > >
> .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(10*1024*1024).shutDownWhenFull()));
> > > windowedMetricsTimeSeriesStream .toStream() .map((key, value) ->
> //mapping
> > > logic goes here ) .to("metrics_op");*
> > >
> > > *Properties set to Kafka Streams:*
> > >
> > >
> > >
> > >
> > >
> > >
> > > *StreamsConfig.APPLICATION_ID_CONFIG -
> > >
> > >
> "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
> > > - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -
> > >
> > >
> Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > -
> Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
> > > - JSONSerde.class
> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS -
> > > JSONSerde.classConsumerConfig.AUTO_OFFSET_RESET_CONFIG - "latest"*
> > >
> > >
> > >
> > >
> > >
> > > Thanks in Advance.
> > >
> > > Kalyani Y,
> > > 9177982636
> > >
> >
> >
> > --
> > Santilli Jonathan
> >
>


--
Santilli Jonathan

Comments