> Can someone confirm that each
>>>> partition has its own stream time and that the stream time for a partition
>>>> only advances when a record is written to the partition after the window
>>>> closes?
That's correct.
On 5/21/24 10:11 AM, Chad Preisler wrote:
> After reviewing the logs, I think I understand what happens with the
> repartition topics. Looks like they will be assigned to one or more
> instances. In my example I ran three instances of the application (A, B,
> C). Looks like the two repartition topics got assigned to A and B. The six
> partitions from the input topics got split evenly across all three running
> instances A, B, and C. Since the repartitioned streams are what I'm joining
> on, I guess the join will run on two instances, and any input topic
> processing will run across all three. Is that correct?
>
> Still would like clarification regarding some records appearing to not get
> processed: I think the issue is related to certain partitions not getting
> records to advance stream time (because of low volume). Can someone confirm
> that each partition has its own stream time and that the stream time for a
> partition only advances when a record is written to the partition after the
> window closes?
>
> On Tue, May 21, 2024 at 10:27 AM Chad Preisler <chad.preisler@gmail.com>
> wrote:
>
>> See one small edit below...
>>
>> On Tue, May 21, 2024 at 10:25 AM Chad Preisler <chad.preisler@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I think the issue is related to certain partitions not getting records to
>>> advance stream time (because of low volume). Can someone confirm that each
>>> partition has its own stream time and that the stream time for a partition
>>> only advances when a record is written to the partition after the window
>>> closes?
>>>
>>> If I use the repartition method on each input topic to reduce the number
>>> of partitions for those streams, how many instances of the application will
>>> process records? For example, if the input topics each have 6 partitions,
>>> and I use the repartition method to set the number of partitions for the
>>> streams to 2, how many instances of the application will process records?
>>>
>>> Thanks,
>>> Chad
>>>
>>>
>>> On Wed, May 1, 2024 at 6:47 PM Matthias J. Sax <mjsax@apache.org> wrote:
>>>
>>>>>>> How do you know this?
>>>>>> First thing we do is write a log message in the value joiner. We
>>>> don't see
>>>>>> the log message for the missed records.
>>>>
>>>> Well, for left/right join results, the ValueJoiner would only be called
>>>> when the window is closed... And for invalid input (or late record, ie,
>>>> which arrive out-of-order and their window was already closes), records
>>>> would be dropped right away. So you cannot really infer that a record
>>>> did make it into the join or not, or what happens if it did make it into
>>>> the `Processor`.
>>>>
>>>> -> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring
>>>>
>>>> `dropped-records-total` is the name of the metric.
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 5/1/24 11:35 AM, Chad Preisler wrote:
>>>>> Hello,
>>>>>
>>>>> We did some testing in our test environment today. We are seeing some
>>>>> records processes where only one side of the join has a record. So
>>>> that's
>>>>> good. However, we are still seeing some records get skipped. They
>>>> never hit
>>>>> the value joiner (we write a log message first thing in the value
>>>> joiner).
>>>>> During the test we were putting some load on the system, so stream
>>>> time was
>>>>> advancing. We did notice that the join windows were taking much longer
>>>> than
>>>>> 30 minutes to close and process records. Thirty minutes is the window
>>>> plus
>>>>> grace.
>>>>>
>>>>>> How do you know this?
>>>>> First thing we do is write a log message in the value joiner. We don't
>>>> see
>>>>> the log message for the missed records.
>>>>>
>>>>> I will try pushing the same records locally. However, we don't see any
>>>>> errors in our logs and the stream does process one sided joins after
>>>> the
>>>>> skipped record. Do you have any docs on the "dropper records" metric?
>>>> I did
>>>>> a Google search and didn't find many good results for that.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Chad
>>>>>
>>>>> On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax <mjsax@apache.org>
>>>> wrote:
>>>>>
>>>>>>>> Thanks for the information. I ran the code using Kafka locally.
>>>> After
>>>>>>>> submitting some records inside and outside of the time window and
>>>> grace,
>>>>>>>> the join performed as expected when running locally.
>>>>>>
>>>>>> That gives some hope :)
>>>>>>
>>>>>>
>>>>>>
>>>>>>> However, they never get into the join.
>>>>>>
>>>>>> How do you know this?
>>>>>>
>>>>>>
>>>>>> Did you check the metric for dropper records? Maybe records are
>>>>>> considers malformed and dropped? Are you using the same records in
>>>>>> production and in your local test?
>>>>>>
>>>>>>
>>>>>>>> Are there any settings for the stream client that would affect the
>>>> join?
>>>>>>
>>>>>> Not that I can think of... There is one more internal config, but as
>>>>>> long as data is flowing, it should not impact the result you see.
>>>>>>
>>>>>>
>>>>>>>> Are there any settings on the broker side that would affect the
>>>> join?
>>>>>>
>>>>>> No. The join is computed client side. Broker configs should not have
>>>> any
>>>>>> impact.
>>>>>>
>>>>>>> f I increase the log level for the streams API would that
>>>>>>>> shed some light on what is happening?
>>>>>>
>>>>>> I don't think it would help much. The code in question is
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but
>>>> it
>>>>>> does not do any logging except WARN for the already mentioned
>>>> "dropping
>>>>>> malformed" records that is also recorded via JMX.
>>>>>>
>>>>>>> WARN: "Skipping record due to null key or value. "
>>>>>>
>>>>>>
>>>>>> If you can identify a specific record from the input which would
>>>> produce
>>>>>> an output, but does not, maybe you can try to feed it into your local
>>>>>> test env and try to re-produce the issue?
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 4/30/24 11:38 AM, Chad Preisler wrote:
>>>>>>> Matthias,
>>>>>>>
>>>>>>> Thanks for the information. I ran the code using Kafka locally. After
>>>>>>> submitting some records inside and outside of the time window and
>>>> grace,
>>>>>>> the join performed as expected when running locally.
>>>>>>>
>>>>>>> I'm not sure why the join is not working as expected when running
>>>> against
>>>>>>> our actual brokers. We are peeking at the records for the streams
>>>> and we
>>>>>>> are seeing the records get pulled. However, they never get into the
>>>> join.
>>>>>>> It's been over 24 hours since the expected records were created, and
>>>>>> there
>>>>>>> has been plenty of traffic to advance the stream time. Only records
>>>> that
>>>>>>> have both a left and right side match are getting processed by the
>>>> join.
>>>>>>>
>>>>>>> Are there any settings for the stream client that would affect the
>>>> join?
>>>>>>>
>>>>>>> Are there any settings on the broker side that would affect the join?
>>>>>>>
>>>>>>> The outer join is just one part of the topology. Compared to running
>>>> it
>>>>>>> locally there is a lot more data going through the app when running
>>>> on
>>>>>> our
>>>>>>> actual servers. If I increase the log level for the streams API would
>>>>>> that
>>>>>>> shed some light on what is happening? Does anyone know if there are
>>>>>>> specific packages that I should increase the log level for? Any
>>>> specific
>>>>>>> log message I can hone in on to tell me what is going on?
>>>>>>>
>>>>>>> Basically, I'm looking for some pointers on where I can start
>>>> looking.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Chad
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax <mjsax@apache.org>
>>>>>> wrote:
>>>>>>>
>>>>>>>>> I expect the join to
>>>>>>>>>> execute after the 25 with one side of the join containing a
>>>> record and
>>>>>>>> the
>>>>>>>>>> other being null
>>>>>>>>
>>>>>>>> Given that you also have a grace period of 5 minutes, the result
>>>> will
>>>>>>>> only be emitted after the grace-period passed and the window is
>>>> closed
>>>>>>>> (not when window end time is reached).
>>>>>>>>
>>>>>>>>> One has a
>>>>>>>>>> naming convention of "KSTREAM_OUTERSHARED". I see a record there,
>>>> but
>>>>>>>> I'm
>>>>>>>>>> not sure how to decode that message to see what is in it. What is
>>>> the
>>>>>>>>>> purpose of those messages?
>>>>>>>>
>>>>>>>> It's an internal store, that stores all records which are subject
>>>> to be
>>>>>>>> emitted as left/right join result, ie, if there is no inner join
>>>> result.
>>>>>>>> The format used is internal:
>>>>>>>>
>>>>>>>>
>>>>>>
>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java
>>>>>>>>
>>>>>>>> Also note: time is based on event-time, ie, if the input stream
>>>> stops to
>>>>>>>> send new records, "stream-time" will stop to advance and the result
>>>>>>>> might not be emitted because the window does not get closed.
>>>>>>>>
>>>>>>>> (Last, there is some internal wall-clock time delay of one second to
>>>>>>>> emit results for performance reasons...)
>>>>>>>>
>>>>>>>> HTH.
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 4/30/24 6:51 AM, Chad Preisler wrote:
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I have a KStream to KStream outer join with a time difference of 25
>>>>>>>> minutes
>>>>>>>>> and 5 minutes of grace. When I get a record for one side of the
>>>> join,
>>>>>>>> but
>>>>>>>>> don't get a record on the other side of the join, I expect the
>>>> join to
>>>>>>>>> execute after the 25 with one side of the join containing a record
>>>> and
>>>>>>>> the
>>>>>>>>> other being null. Is that correct? If it is correct, it's not
>>>> working
>>>>>>>> for
>>>>>>>>> me.
>>>>>>>>>
>>>>>>>>> I was poking around on the broker and saw some internal topics. I
>>>> see
>>>>>> the
>>>>>>>>> key I expected to execute the join on some of those topics. One
>>>> has a
>>>>>>>>> naming convention of "KSTREAM_OUTERSHARED". I see a record there,
>>>> but
>>>>>> I'm
>>>>>>>>> not sure how to decode that message to see what is in it. What is
>>>> the
>>>>>>>>> purpose of those messages? If I decode the message will it help me
>>>> see
>>>>>>>> when
>>>>>>>>> the join should have been executed?
>>>>>>>>>
>>>>>>>>> I also see the key on a topic with the naming convention
>>>>>>>>> "KSTREAM_OUTERTHIS".
>>>>>>>>>
>>>>>>>>> Are there any other topics that I should be looking at to
>>>> troubleshoot
>>>>>>>> this
>>>>>>>>> issue?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Chad
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>
>>>> partition has its own stream time and that the stream time for a partition
>>>> only advances when a record is written to the partition after the window
>>>> closes?
That's correct.
On 5/21/24 10:11 AM, Chad Preisler wrote:
> After reviewing the logs, I think I understand what happens with the
> repartition topics. Looks like they will be assigned to one or more
> instances. In my example I ran three instances of the application (A, B,
> C). Looks like the two repartition topics got assigned to A and B. The six
> partitions from the input topics got split evenly across all three running
> instances A, B, and C. Since the repartitioned streams are what I'm joining
> on, I guess the join will run on two instances, and any input topic
> processing will run across all three. Is that correct?
>
> Still would like clarification regarding some records appearing to not get
> processed: I think the issue is related to certain partitions not getting
> records to advance stream time (because of low volume). Can someone confirm
> that each partition has its own stream time and that the stream time for a
> partition only advances when a record is written to the partition after the
> window closes?
>
> On Tue, May 21, 2024 at 10:27 AM Chad Preisler <chad.preisler@gmail.com>
> wrote:
>
>> See one small edit below...
>>
>> On Tue, May 21, 2024 at 10:25 AM Chad Preisler <chad.preisler@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I think the issue is related to certain partitions not getting records to
>>> advance stream time (because of low volume). Can someone confirm that each
>>> partition has its own stream time and that the stream time for a partition
>>> only advances when a record is written to the partition after the window
>>> closes?
>>>
>>> If I use the repartition method on each input topic to reduce the number
>>> of partitions for those streams, how many instances of the application will
>>> process records? For example, if the input topics each have 6 partitions,
>>> and I use the repartition method to set the number of partitions for the
>>> streams to 2, how many instances of the application will process records?
>>>
>>> Thanks,
>>> Chad
>>>
>>>
>>> On Wed, May 1, 2024 at 6:47 PM Matthias J. Sax <mjsax@apache.org> wrote:
>>>
>>>>>>> How do you know this?
>>>>>> First thing we do is write a log message in the value joiner. We
>>>> don't see
>>>>>> the log message for the missed records.
>>>>
>>>> Well, for left/right join results, the ValueJoiner would only be called
>>>> when the window is closed... And for invalid input (or late record, ie,
>>>> which arrive out-of-order and their window was already closes), records
>>>> would be dropped right away. So you cannot really infer that a record
>>>> did make it into the join or not, or what happens if it did make it into
>>>> the `Processor`.
>>>>
>>>> -> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring
>>>>
>>>> `dropped-records-total` is the name of the metric.
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 5/1/24 11:35 AM, Chad Preisler wrote:
>>>>> Hello,
>>>>>
>>>>> We did some testing in our test environment today. We are seeing some
>>>>> records processes where only one side of the join has a record. So
>>>> that's
>>>>> good. However, we are still seeing some records get skipped. They
>>>> never hit
>>>>> the value joiner (we write a log message first thing in the value
>>>> joiner).
>>>>> During the test we were putting some load on the system, so stream
>>>> time was
>>>>> advancing. We did notice that the join windows were taking much longer
>>>> than
>>>>> 30 minutes to close and process records. Thirty minutes is the window
>>>> plus
>>>>> grace.
>>>>>
>>>>>> How do you know this?
>>>>> First thing we do is write a log message in the value joiner. We don't
>>>> see
>>>>> the log message for the missed records.
>>>>>
>>>>> I will try pushing the same records locally. However, we don't see any
>>>>> errors in our logs and the stream does process one sided joins after
>>>> the
>>>>> skipped record. Do you have any docs on the "dropper records" metric?
>>>> I did
>>>>> a Google search and didn't find many good results for that.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Chad
>>>>>
>>>>> On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax <mjsax@apache.org>
>>>> wrote:
>>>>>
>>>>>>>> Thanks for the information. I ran the code using Kafka locally.
>>>> After
>>>>>>>> submitting some records inside and outside of the time window and
>>>> grace,
>>>>>>>> the join performed as expected when running locally.
>>>>>>
>>>>>> That gives some hope :)
>>>>>>
>>>>>>
>>>>>>
>>>>>>> However, they never get into the join.
>>>>>>
>>>>>> How do you know this?
>>>>>>
>>>>>>
>>>>>> Did you check the metric for dropper records? Maybe records are
>>>>>> considers malformed and dropped? Are you using the same records in
>>>>>> production and in your local test?
>>>>>>
>>>>>>
>>>>>>>> Are there any settings for the stream client that would affect the
>>>> join?
>>>>>>
>>>>>> Not that I can think of... There is one more internal config, but as
>>>>>> long as data is flowing, it should not impact the result you see.
>>>>>>
>>>>>>
>>>>>>>> Are there any settings on the broker side that would affect the
>>>> join?
>>>>>>
>>>>>> No. The join is computed client side. Broker configs should not have
>>>> any
>>>>>> impact.
>>>>>>
>>>>>>> f I increase the log level for the streams API would that
>>>>>>>> shed some light on what is happening?
>>>>>>
>>>>>> I don't think it would help much. The code in question is
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but
>>>> it
>>>>>> does not do any logging except WARN for the already mentioned
>>>> "dropping
>>>>>> malformed" records that is also recorded via JMX.
>>>>>>
>>>>>>> WARN: "Skipping record due to null key or value. "
>>>>>>
>>>>>>
>>>>>> If you can identify a specific record from the input which would
>>>> produce
>>>>>> an output, but does not, maybe you can try to feed it into your local
>>>>>> test env and try to re-produce the issue?
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 4/30/24 11:38 AM, Chad Preisler wrote:
>>>>>>> Matthias,
>>>>>>>
>>>>>>> Thanks for the information. I ran the code using Kafka locally. After
>>>>>>> submitting some records inside and outside of the time window and
>>>> grace,
>>>>>>> the join performed as expected when running locally.
>>>>>>>
>>>>>>> I'm not sure why the join is not working as expected when running
>>>> against
>>>>>>> our actual brokers. We are peeking at the records for the streams
>>>> and we
>>>>>>> are seeing the records get pulled. However, they never get into the
>>>> join.
>>>>>>> It's been over 24 hours since the expected records were created, and
>>>>>> there
>>>>>>> has been plenty of traffic to advance the stream time. Only records
>>>> that
>>>>>>> have both a left and right side match are getting processed by the
>>>> join.
>>>>>>>
>>>>>>> Are there any settings for the stream client that would affect the
>>>> join?
>>>>>>>
>>>>>>> Are there any settings on the broker side that would affect the join?
>>>>>>>
>>>>>>> The outer join is just one part of the topology. Compared to running
>>>> it
>>>>>>> locally there is a lot more data going through the app when running
>>>> on
>>>>>> our
>>>>>>> actual servers. If I increase the log level for the streams API would
>>>>>> that
>>>>>>> shed some light on what is happening? Does anyone know if there are
>>>>>>> specific packages that I should increase the log level for? Any
>>>> specific
>>>>>>> log message I can hone in on to tell me what is going on?
>>>>>>>
>>>>>>> Basically, I'm looking for some pointers on where I can start
>>>> looking.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Chad
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax <mjsax@apache.org>
>>>>>> wrote:
>>>>>>>
>>>>>>>>> I expect the join to
>>>>>>>>>> execute after the 25 with one side of the join containing a
>>>> record and
>>>>>>>> the
>>>>>>>>>> other being null
>>>>>>>>
>>>>>>>> Given that you also have a grace period of 5 minutes, the result
>>>> will
>>>>>>>> only be emitted after the grace-period passed and the window is
>>>> closed
>>>>>>>> (not when window end time is reached).
>>>>>>>>
>>>>>>>>> One has a
>>>>>>>>>> naming convention of "KSTREAM_OUTERSHARED". I see a record there,
>>>> but
>>>>>>>> I'm
>>>>>>>>>> not sure how to decode that message to see what is in it. What is
>>>> the
>>>>>>>>>> purpose of those messages?
>>>>>>>>
>>>>>>>> It's an internal store, that stores all records which are subject
>>>> to be
>>>>>>>> emitted as left/right join result, ie, if there is no inner join
>>>> result.
>>>>>>>> The format used is internal:
>>>>>>>>
>>>>>>>>
>>>>>>
>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java
>>>>>>>>
>>>>>>>> Also note: time is based on event-time, ie, if the input stream
>>>> stops to
>>>>>>>> send new records, "stream-time" will stop to advance and the result
>>>>>>>> might not be emitted because the window does not get closed.
>>>>>>>>
>>>>>>>> (Last, there is some internal wall-clock time delay of one second to
>>>>>>>> emit results for performance reasons...)
>>>>>>>>
>>>>>>>> HTH.
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 4/30/24 6:51 AM, Chad Preisler wrote:
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I have a KStream to KStream outer join with a time difference of 25
>>>>>>>> minutes
>>>>>>>>> and 5 minutes of grace. When I get a record for one side of the
>>>> join,
>>>>>>>> but
>>>>>>>>> don't get a record on the other side of the join, I expect the
>>>> join to
>>>>>>>>> execute after the 25 with one side of the join containing a record
>>>> and
>>>>>>>> the
>>>>>>>>> other being null. Is that correct? If it is correct, it's not
>>>> working
>>>>>>>> for
>>>>>>>>> me.
>>>>>>>>>
>>>>>>>>> I was poking around on the broker and saw some internal topics. I
>>>> see
>>>>>> the
>>>>>>>>> key I expected to execute the join on some of those topics. One
>>>> has a
>>>>>>>>> naming convention of "KSTREAM_OUTERSHARED". I see a record there,
>>>> but
>>>>>> I'm
>>>>>>>>> not sure how to decode that message to see what is in it. What is
>>>> the
>>>>>>>>> purpose of those messages? If I decode the message will it help me
>>>> see
>>>>>>>> when
>>>>>>>>> the join should have been executed?
>>>>>>>>>
>>>>>>>>> I also see the key on a topic with the naming convention
>>>>>>>>> "KSTREAM_OUTERTHIS".
>>>>>>>>>
>>>>>>>>> Are there any other topics that I should be looking at to
>>>> troubleshoot
>>>>>>>> this
>>>>>>>>> issue?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Chad
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>
Comments
Post a Comment