Skip to main content

Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Jonathan, Matthias

I've created a Jira for this issue
https://issues.apache.org/jira/browse/KAFKA-8615.

Jonathan, I plan to work on this when I get back from vacation on 7/8. If
you would like to work in this yourself before that, feel free to do so and
assign the ticket to yourself.

Thanks,
Bill

On Thu, Jun 27, 2019 at 1:38 PM Matthias J. Sax <matthias@confluent.io>
wrote:

> Sounds like a regression to me.
>
> We did change some code to track partition time differently. Can you
> open a Jira?
>
>
> -Matthias
>
> On 6/26/19 7:58 AM, Jonathan Santilli wrote:
> > Sure Bill, sure, is the same code I have reported the issue for the
> > suppress some months ago:
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> >
> > In fact, I have reported at that moment, that after restarting the app,
> the
> > suppress was sending again downstream the already processed records.
> > Now, with the version 2.2.1+ after restarting the app, the
> > aggregation/suppress (do not know exactly where) is missing some records
> to
> > be aggregated, even though they are in the input topic.
> >
> > Kafka Version 2.3
> >
> > *public* *class* OwnTimeExtractor *implements* TimestampExtractor {
> >
> > @Override
> >
> > *public* *long* extract(*final* ConsumerRecord<Object, Object>
> record,
> > *final* *long* previousTimestamp) {
> >
> >
> > *// *previousTimestamp is always == -1
> >
> > }
> > }
> >
> > final StreamsBuilder builder = new StreamsBuilder();
> > final KStream<..., ...> events = builder
> > .stream(inputTopicNames, Consumed.with(..., ...)
> > .withTimestampExtractor(new OwnTimeExtractor());
> >
> > events
> > .filter((k, v) -> ...)
> > .flatMapValues(v -> ...)
> > .flatMapValues(v -> ...)
> > .selectKey((k, v) -> v)
> > .groupByKey(Grouped.with(..., ...))
> > .windowedBy(
> > TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
> > .advanceBy(Duration.ofSeconds(windowSizeInSecs))
> > .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
> > .reduce((agg, new) -> {
> > ...
> > return agg;
> > })
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > .toStream()
> > .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
> >
> >
> >
> > On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bill@confluent.io> wrote:
> >
> >> Thanks for the reply Jonathan.
> >>
> >> Are you in a position to share your code so I can try to reproduce on my
> >> end?
> >>
> >> -Bill
> >>
> >>
> >> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
> >> jonathansantilli@gmail.com> wrote:
> >>
> >>> Hello Bill,
> >>>
> >>> am implementing the TimestampExtractor Interface, then using it to
> >> consume,
> >>> like:
> >>>
> >>> *final* KStream<..., ...> events = builder.stream(inputTopicList,
> >> Consumed.
> >>> *with*(keySerde, valueSerde).withTimestampExtractor(*new
> >> *OwnTimeExtractor(
> >>> ...)));
> >>>
> >>> Am not setting the default.timestamp.extractor config value.
> >>>
> >>> Cheers!
> >>> --
> >>> Jonathan
> >>>
> >>>
> >>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bill@confluent.io> wrote:
> >>>
> >>>> Hi Jonathan,
> >>>>
> >>>> Thanks for reporting this. Which timestamp extractor are you using in
> >>> the
> >>>> configs?
> >>>>
> >>>> Thanks,
> >>>> Bill
> >>>>
> >>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
> >>>> jonathansantilli@gmail.com> wrote:
> >>>>
> >>>>> Hello, hope you all are doing well,
> >>>>>
> >>>>> am testing the new version 2.3 for Kafka Streams specifically. I have
> >>>>> noticed that now, the implementation of the method extract from the
> >>>>> interface org.apache.kafka.streams.processor.TimestampExtractor
> >>>>>
> >>>>> *public* *long* extract(ConsumerRecord<Object, Object> record, *long*
> >>>>> previousTimestamp)
> >>>>>
> >>>>>
> >>>>> is always returning -1 as value.
> >>>>>
> >>>>>
> >>>>> Previous version 2.2.1 was returning the correct value for the record
> >>>>> partition.
> >>>>>
> >>>>> Am aware the interface is market as @InterfaceStability.Evolving and
> >> we
> >>>>> should not rely on the stability/compatibility. Am just wondering if
> >>> that
> >>>>> new behavior is intentional or is a bug.
> >>>>>
> >>>>>
> >>>>> Cheers!
> >>>>> --
> >>>>> Santilli Jonathan
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Santilli Jonathan
> >>>
> >>
> >
> >
>
>

Comments