Skip to main content

Re: Kafka streams DSL going into infinite loop

Only streams specific props I am using are:

props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);

Yes there are three sub topologies and key does change between joins.
Is this something to do with any of the above configs.
I really don't know what TOPOLOGY_OPTIMIZATION is for.

The way I start my stream is:

final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new
Thread("streams-shutdown-hook") {
...
});
try {
streams.cleanUp();
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);

Is the method to start the stream OK.
There is indeed no loop so why the does stream processing get stuck logging
the final loop forever.

Is there any direction you can suggest to look into. Any more logging at
streams or kafka level.

Thanks
Sachin



On Fri, Jan 31, 2020 at 12:31 AM Matthias J. Sax <matthias@confluent.io>
wrote:

> Your program does not seem to contain a loop. Hence, it's unclear to me
> atm what could be the issue.
>
> Does the application commit offset on the input topic, what would be an
> indicator that it actually does make progress.
>
> Do you change the key between joins, ie, is there multiple
> sub-topologies that are connected via repartition topics, or is it one
> single sub-topology?
>
>
> -Matthias
>
> On 1/30/20 10:29 AM, Sachin Mittal wrote:
> > Hi,
> > In a case I have found that when I define my topology using streams DSL
> it
> > tends to go into infinite loop.
> > This usually happens if I start my stream and shut it down and restart it
> > again after a while (by that time source topic has moved ahead).
> >
> > Stream processing seems to be stuck in a loop and does not seem to
> progress
> > ahead.
> >
> > My topology is something like this:
> >
> > source.branch(
> > (k, v) -> ...,
> > (k, v) -> ...,
> > (k, v) -> ...,
> > (k, v) -> ...,
> > (k, v) -> ...,
> > (k, v) -> ...,
> > (k, v) -> ...,
> > (k, v) -> ...
> > )
> > stream12 = stream1.join(stream2, ...).peek((k, v12) -> log(v12))
> > stream34 = stream3.join(stream4, ...).peek((k, v34) -> log(v34))
> > stream56 = stream5.join(stream6, ...).peek((k, v56) -> log(v56))
> > stream78 = stream7.join(stream8, ...).peek((k, v78) -> log(v78))
> > stream1234 = stream12.join(stream34, ...).peek((k, v1234) -> log(v1234))
> > stream123478 = stream1234.join(stream78, ...).peek((k, v123478) ->
> > log(v123478))
> > stream567 = stream56.join(stream7, ...).peek((k, v567) -> log(v567))
> > final = stream123478.join(stream567, ...).foreach((k, vFinal) ->
> > log(vFinal))
> >
> > What I observe is that it keeps printing log(vFinal)with same value again
> > and again and does not seem to progress ahead.
> > Any idea why this may be happening. What can I check to understand what
> > could be going wrong.
> >
> > If I stop kafka and delete all the data and the restart everything then
> it
> > all works fine from the next set of data.
> > But then I loose the source streams data from processing, before I had
> > restarted it (when it went into this infinite loop).
> >
> > Thanks
> > Sachin
> >
>
>

Comments