Skip to main content

Re: Kafka streams DSL going into infinite loop

-----BEGIN PGP SIGNATURE-----

iQIzBAEBCgAdFiEE8osu2CcCCF5douGQu8PBaGu5w1EFAl4zM3AACgkQu8PBaGu5
w1HlPxAAq3JN/8iSEMwWNbrUktjWRdJosdUne3Ha90FdzNWYzYlaB/kaKFLojKjK
+9VmfKf1rw9lZsiEFo7Ft1Ei5rs8vRZ4fRsBCIgZwoPBJTS+r+EKw9EhLjsymZrG
WSO/nCNmbGNzih1QtZ+TJrXdPMAw+LMlMip5X17BmPE5LbkKXNBTtT2FuYN+LLRd
50aCR1sTMwYHddawLLG1x6KXvzc4GMmxe503/Qt5a0Q9GLhxk6KFCJUblFzxnEwe
jVRILVuj3EyvjCi35+SZZHUyywHtAzfPpwDmMVBaLEh9zhxP0NgHqXQZuVaS1WQa
gGLNcWjgoCfsPTZxbHjcWzuvwsK56xFNy9Cu90MMjNKiLVW0zmMZUbSXw4h3gzza
VQ1PJYdFfhAPKnhnwu7e8LmN70pC9UCJcs2YEAycGzEcSYB4O2xSQRO5QSa19+12
P9RbN57dMIPnNYT4p5ag62s5ErorRAILILqLkL7cfeXaA8I5SlQPuqudztYZ0CTh
ZXveWCK2isE2L7cPXWE8fHg3+v6jqEdi0Kc9Y8fndQy8eiELeZx8ZwlMwxTVGGxw
x9MpNSebgwqDu9tjfpezcgF18VXRVURh2LYSefQW5/ccFZZ7fNq2s15/cl4AhpIL
QMjFJ0u02HfoFHI7lYiNvhuOOtgpTL91WbREIAW0uTEMzgRMXhk=
=6StC
-----END PGP SIGNATURE-----
>> I really don't know what TOPOLOGY_OPTIMIZATION is for.

If you enable optimization, Kafka Streams tries to generate a more
efficient Topology when translating the DSL program. We are working on
some more documentation of this feature atm:
https://cwiki.apache.org/confluence/display/KAFKA/DSL+Optimizer

Can you share your `TopologyDescription`? Also, do you see the same
problem if you disable topology-optimization? I would like to understand
if it might be related to repartition topics (and the optimization that
might merge repartition topics).

Also, you did not answer my question from before. Does the application
commit offsets on the input (and repartition) topic what shows if it
does make progress or not?

I have no idea yet what the problem could be.

-Matthias



On 1/30/20 11:16 AM, Sachin Mittal wrote:
> Only streams specific props I am using are:
>
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
> props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
>
> Yes there are three sub topologies and key does change between joins.
> Is this something to do with any of the above configs.
> I really don't know what TOPOLOGY_OPTIMIZATION is for.
>
> The way I start my stream is:
>
> final KafkaStreams streams = new KafkaStreams(topology, props);
> final CountDownLatch latch = new CountDownLatch(1);
> // attach shutdown handler to catch control-c
> Runtime.getRuntime().addShutdownHook(new
> Thread("streams-shutdown-hook") {
> ...
> });
> try {
> streams.cleanUp();
> streams.start();
> latch.await();
> } catch (Throwable e) {
> System.exit(1);
> }
> System.exit(0);
>
> Is the method to start the stream OK.
> There is indeed no loop so why the does stream processing get stuck logging
> the final loop forever.
>
> Is there any direction you can suggest to look into. Any more logging at
> streams or kafka level.
>
> Thanks
> Sachin
>
>
>
> On Fri, Jan 31, 2020 at 12:31 AM Matthias J. Sax <matthias@confluent.io>
> wrote:
>
>> Your program does not seem to contain a loop. Hence, it's unclear to me
>> atm what could be the issue.
>>
>> Does the application commit offset on the input topic, what would be an
>> indicator that it actually does make progress.
>>
>> Do you change the key between joins, ie, is there multiple
>> sub-topologies that are connected via repartition topics, or is it one
>> single sub-topology?
>>
>>
>> -Matthias
>>
>> On 1/30/20 10:29 AM, Sachin Mittal wrote:
>>> Hi,
>>> In a case I have found that when I define my topology using streams DSL
>> it
>>> tends to go into infinite loop.
>>> This usually happens if I start my stream and shut it down and restart it
>>> again after a while (by that time source topic has moved ahead).
>>>
>>> Stream processing seems to be stuck in a loop and does not seem to
>> progress
>>> ahead.
>>>
>>> My topology is something like this:
>>>
>>> source.branch(
>>> (k, v) -> ...,
>>> (k, v) -> ...,
>>> (k, v) -> ...,
>>> (k, v) -> ...,
>>> (k, v) -> ...,
>>> (k, v) -> ...,
>>> (k, v) -> ...,
>>> (k, v) -> ...
>>> )
>>> stream12 = stream1.join(stream2, ...).peek((k, v12) -> log(v12))
>>> stream34 = stream3.join(stream4, ...).peek((k, v34) -> log(v34))
>>> stream56 = stream5.join(stream6, ...).peek((k, v56) -> log(v56))
>>> stream78 = stream7.join(stream8, ...).peek((k, v78) -> log(v78))
>>> stream1234 = stream12.join(stream34, ...).peek((k, v1234) -> log(v1234))
>>> stream123478 = stream1234.join(stream78, ...).peek((k, v123478) ->
>>> log(v123478))
>>> stream567 = stream56.join(stream7, ...).peek((k, v567) -> log(v567))
>>> final = stream123478.join(stream567, ...).foreach((k, vFinal) ->
>>> log(vFinal))
>>>
>>> What I observe is that it keeps printing log(vFinal)with same value again
>>> and again and does not seem to progress ahead.
>>> Any idea why this may be happening. What can I check to understand what
>>> could be going wrong.
>>>
>>> If I stop kafka and delete all the data and the restart everything then
>> it
>>> all works fine from the next set of data.
>>> But then I loose the source streams data from processing, before I had
>>> restarted it (when it went into this infinite loop).
>>>
>>> Thanks
>>> Sachin
>>>
>>
>>
>

Comments