Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org
iQIzBAEBCgAdFiEE8osu2CcCCF5douGQu8PBaGu5w1EFAl0U/wUACgkQu8PBaGu5
w1HRFQ//aJ/qldMfhz+zpISBr+wTlTSwNoZ9ySYFnCD5ZUc2oAU05sgRKZTwgQap
fhjxzTJVWuEWoYBwsvjtzPyoOOWzJblvySzwH1/OwBexI+fUfAo3dQuGHkETKjRe
VFKcfwE1WPtoNVqGspHH4Zx84kIbufoW3CP0MIA8JuItI7RRkqNfi/oQMY5JOohs
29iNQ+tEhNk+sqYW93I2tAAJ80j6OSB2fjjE7hmvTCv/TY0BwuBAoV0AhlTm0Rat
BdwYpnlKtZLH6o8ZW5oeEhpfvEftoF3OP0BiBGgPBFf8Aqrl5l3upr+DL+p3ZEDO
pkaE/7tDF1Yt3xkLSh526NOsvG7nTWlHwzjMmIdywIC1Om9DAe7tJ9N26vmomU7o
+rBnJTHHTIUBvfVbmWJB8S3pI/7xao7PeTgojuuWfBMxW3YvD+2niLhRBrAGYCES
MRtZ3+54ppMaImhbVVokKk/ZPpqVK96BIIwSdT1mxlrt5uyEvf97S2nCNyW4ZHAX
+79JTqw9deXVSItxeoKiI8BuPuEePEcUJ8GGuX8jAW36sJKQWap32PdVMLSMqGY7
Ctpz2zbh/pDxB36z+Vqh8lQGYkV4fhA+RJSDhh2WnJiv5v195bf0QNRU1xqdxqTp
VMv8Vr3Vuc3y477iW2f6L9yk78qnUMd/4vlF0KczwfHaY5RrNjQ=
=alQZ
-----END PGP SIGNATURE-----
Sounds like a regression to me.
We did change some code to track partition time differently. Can you
open a Jira?
-Matthias
On 6/26/19 7:58 AM, Jonathan Santilli wrote:
> Sure Bill, sure, is the same code I have reported the issue for the
> suppress some months ago:
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
>
> In fact, I have reported at that moment, that after restarting the app, the
> suppress was sending again downstream the already processed records.
> Now, with the version 2.2.1+ after restarting the app, the
> aggregation/suppress (do not know exactly where) is missing some records to
> be aggregated, even though they are in the input topic.
>
> Kafka Version 2.3
>
> *public* *class* OwnTimeExtractor *implements* TimestampExtractor {
>
> @Override
>
> *public* *long* extract(*final* ConsumerRecord<Object, Object> record,
> *final* *long* previousTimestamp) {
>
>
> *// *previousTimestamp is always == -1
>
> }
> }
>
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream<..., ...> events = builder
> .stream(inputTopicNames, Consumed.with(..., ...)
> .withTimestampExtractor(new OwnTimeExtractor());
>
> events
> .filter((k, v) -> ...)
> .flatMapValues(v -> ...)
> .flatMapValues(v -> ...)
> .selectKey((k, v) -> v)
> .groupByKey(Grouped.with(..., ...))
> .windowedBy(
> TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
> .advanceBy(Duration.ofSeconds(windowSizeInSecs))
> .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
> .reduce((agg, new) -> {
> ...
> return agg;
> })
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> .toStream()
> .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
>
>
>
> On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bill@confluent.io> wrote:
>
>> Thanks for the reply Jonathan.
>>
>> Are you in a position to share your code so I can try to reproduce on my
>> end?
>>
>> -Bill
>>
>>
>> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
>> jonathansantilli@gmail.com> wrote:
>>
>>> Hello Bill,
>>>
>>> am implementing the TimestampExtractor Interface, then using it to
>> consume,
>>> like:
>>>
>>> *final* KStream<..., ...> events = builder.stream(inputTopicList,
>> Consumed.
>>> *with*(keySerde, valueSerde).withTimestampExtractor(*new
>> *OwnTimeExtractor(
>>> ...)));
>>>
>>> Am not setting the default.timestamp.extractor config value.
>>>
>>> Cheers!
>>> --
>>> Jonathan
>>>
>>>
>>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bill@confluent.io> wrote:
>>>
>>>> Hi Jonathan,
>>>>
>>>> Thanks for reporting this. Which timestamp extractor are you using in
>>> the
>>>> configs?
>>>>
>>>> Thanks,
>>>> Bill
>>>>
>>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
>>>> jonathansantilli@gmail.com> wrote:
>>>>
>>>>> Hello, hope you all are doing well,
>>>>>
>>>>> am testing the new version 2.3 for Kafka Streams specifically. I have
>>>>> noticed that now, the implementation of the method extract from the
>>>>> interface org.apache.kafka.streams.processor.TimestampExtractor
>>>>>
>>>>> *public* *long* extract(ConsumerRecord<Object, Object> record, *long*
>>>>> previousTimestamp)
>>>>>
>>>>>
>>>>> is always returning -1 as value.
>>>>>
>>>>>
>>>>> Previous version 2.2.1 was returning the correct value for the record
>>>>> partition.
>>>>>
>>>>> Am aware the interface is market as @InterfaceStability.Evolving and
>> we
>>>>> should not rely on the stability/compatibility. Am just wondering if
>>> that
>>>>> new behavior is intentional or is a bug.
>>>>>
>>>>>
>>>>> Cheers!
>>>>> --
>>>>> Santilli Jonathan
>>>>>
>>>>
>>>
>>>
>>> --
>>> Santilli Jonathan
>>>
>>
>
>
Comment: GPGTools - https://gpgtools.org
iQIzBAEBCgAdFiEE8osu2CcCCF5douGQu8PBaGu5w1EFAl0U/wUACgkQu8PBaGu5
w1HRFQ//aJ/qldMfhz+zpISBr+wTlTSwNoZ9ySYFnCD5ZUc2oAU05sgRKZTwgQap
fhjxzTJVWuEWoYBwsvjtzPyoOOWzJblvySzwH1/OwBexI+fUfAo3dQuGHkETKjRe
VFKcfwE1WPtoNVqGspHH4Zx84kIbufoW3CP0MIA8JuItI7RRkqNfi/oQMY5JOohs
29iNQ+tEhNk+sqYW93I2tAAJ80j6OSB2fjjE7hmvTCv/TY0BwuBAoV0AhlTm0Rat
BdwYpnlKtZLH6o8ZW5oeEhpfvEftoF3OP0BiBGgPBFf8Aqrl5l3upr+DL+p3ZEDO
pkaE/7tDF1Yt3xkLSh526NOsvG7nTWlHwzjMmIdywIC1Om9DAe7tJ9N26vmomU7o
+rBnJTHHTIUBvfVbmWJB8S3pI/7xao7PeTgojuuWfBMxW3YvD+2niLhRBrAGYCES
MRtZ3+54ppMaImhbVVokKk/ZPpqVK96BIIwSdT1mxlrt5uyEvf97S2nCNyW4ZHAX
+79JTqw9deXVSItxeoKiI8BuPuEePEcUJ8GGuX8jAW36sJKQWap32PdVMLSMqGY7
Ctpz2zbh/pDxB36z+Vqh8lQGYkV4fhA+RJSDhh2WnJiv5v195bf0QNRU1xqdxqTp
VMv8Vr3Vuc3y477iW2f6L9yk78qnUMd/4vlF0KczwfHaY5RrNjQ=
=alQZ
-----END PGP SIGNATURE-----
Sounds like a regression to me.
We did change some code to track partition time differently. Can you
open a Jira?
-Matthias
On 6/26/19 7:58 AM, Jonathan Santilli wrote:
> Sure Bill, sure, is the same code I have reported the issue for the
> suppress some months ago:
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
>
> In fact, I have reported at that moment, that after restarting the app, the
> suppress was sending again downstream the already processed records.
> Now, with the version 2.2.1+ after restarting the app, the
> aggregation/suppress (do not know exactly where) is missing some records to
> be aggregated, even though they are in the input topic.
>
> Kafka Version 2.3
>
> *public* *class* OwnTimeExtractor *implements* TimestampExtractor {
>
> @Override
>
> *public* *long* extract(*final* ConsumerRecord<Object, Object> record,
> *final* *long* previousTimestamp) {
>
>
> *// *previousTimestamp is always == -1
>
> }
> }
>
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream<..., ...> events = builder
> .stream(inputTopicNames, Consumed.with(..., ...)
> .withTimestampExtractor(new OwnTimeExtractor());
>
> events
> .filter((k, v) -> ...)
> .flatMapValues(v -> ...)
> .flatMapValues(v -> ...)
> .selectKey((k, v) -> v)
> .groupByKey(Grouped.with(..., ...))
> .windowedBy(
> TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
> .advanceBy(Duration.ofSeconds(windowSizeInSecs))
> .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
> .reduce((agg, new) -> {
> ...
> return agg;
> })
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> .toStream()
> .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
>
>
>
> On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bill@confluent.io> wrote:
>
>> Thanks for the reply Jonathan.
>>
>> Are you in a position to share your code so I can try to reproduce on my
>> end?
>>
>> -Bill
>>
>>
>> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
>> jonathansantilli@gmail.com> wrote:
>>
>>> Hello Bill,
>>>
>>> am implementing the TimestampExtractor Interface, then using it to
>> consume,
>>> like:
>>>
>>> *final* KStream<..., ...> events = builder.stream(inputTopicList,
>> Consumed.
>>> *with*(keySerde, valueSerde).withTimestampExtractor(*new
>> *OwnTimeExtractor(
>>> ...)));
>>>
>>> Am not setting the default.timestamp.extractor config value.
>>>
>>> Cheers!
>>> --
>>> Jonathan
>>>
>>>
>>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bill@confluent.io> wrote:
>>>
>>>> Hi Jonathan,
>>>>
>>>> Thanks for reporting this. Which timestamp extractor are you using in
>>> the
>>>> configs?
>>>>
>>>> Thanks,
>>>> Bill
>>>>
>>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
>>>> jonathansantilli@gmail.com> wrote:
>>>>
>>>>> Hello, hope you all are doing well,
>>>>>
>>>>> am testing the new version 2.3 for Kafka Streams specifically. I have
>>>>> noticed that now, the implementation of the method extract from the
>>>>> interface org.apache.kafka.streams.processor.TimestampExtractor
>>>>>
>>>>> *public* *long* extract(ConsumerRecord<Object, Object> record, *long*
>>>>> previousTimestamp)
>>>>>
>>>>>
>>>>> is always returning -1 as value.
>>>>>
>>>>>
>>>>> Previous version 2.2.1 was returning the correct value for the record
>>>>> partition.
>>>>>
>>>>> Am aware the interface is market as @InterfaceStability.Evolving and
>> we
>>>>> should not rely on the stability/compatibility. Am just wondering if
>>> that
>>>>> new behavior is intentional or is a bug.
>>>>>
>>>>>
>>>>> Cheers!
>>>>> --
>>>>> Santilli Jonathan
>>>>>
>>>>
>>>
>>>
>>> --
>>> Santilli Jonathan
>>>
>>
>
>
Comments
Post a Comment