Skip to main content

Re: Kafka streams DSL going into infinite loop

-----BEGIN PGP SIGNATURE-----

iQIzBAEBCgAdFiEE8osu2CcCCF5douGQu8PBaGu5w1EFAl4zJ+cACgkQu8PBaGu5
w1F9TA/+ISPhav3MyzR8x/wYbeqM/MhjwGoIuKcShezmXQEAkmP2EHHv5/qR9sEG
EMFcV5sLrDRGYOM/gFml6gsGA1VScOqBR8ITF8635/OpsBI6+YLAQLPEuNPX7GH5
uDMRSMGxf1spnoJWr9IfykY7uPeyWb2XlFGrkCIyvNwrVxzYsdp5i2Q5X9YLreLf
1vK1jpNQylvrYIOcA90dzDqRfMCBOM/84LzN8KGT5VoN54RegvpsiDw167R7d6N0
8E7uc92oBF+rVOdaihaBjE1xNi088g2A/JiOblSDgDy8xxph31yoAe0aRStxbnpq
ggWaz/NbRDFaVvSZ2CEF8d1ftgM9quJDIeWTYG3helioaKCn/bCtV1OIzg+wbO7R
9/JALz8R8EzL+U9Ph4pKuzHBIOe5FzZsH7alPg1PzSF9Oss3FLaOGzNZ6ehe143/
a64DeyNWxbnX1eC08mjiKC6F0OJpze+vgF8JakBPym8FqQEk11CUqJ0CSv3nFVrC
reyspwz2/d0lPtp3x6nmZK894ywk+x8atcmgvuSAhFlFRO7W4dKdlfCOrIA8pmFq
cWPKvyDL1mfBvrn93eQA0FyOxZ2pQ6zYqDW0w3QfXaEsTcG7qAwcjP05yak+Vi+o
4m+uwcIVAc/6EFsricK1RhOSw5JwrmUvHbinnSitSaT+iyVAEPI=
=PnvB
-----END PGP SIGNATURE-----
Your program does not seem to contain a loop. Hence, it's unclear to me
atm what could be the issue.

Does the application commit offset on the input topic, what would be an
indicator that it actually does make progress.

Do you change the key between joins, ie, is there multiple
sub-topologies that are connected via repartition topics, or is it one
single sub-topology?


-Matthias

On 1/30/20 10:29 AM, Sachin Mittal wrote:
> Hi,
> In a case I have found that when I define my topology using streams DSL it
> tends to go into infinite loop.
> This usually happens if I start my stream and shut it down and restart it
> again after a while (by that time source topic has moved ahead).
>
> Stream processing seems to be stuck in a loop and does not seem to progress
> ahead.
>
> My topology is something like this:
>
> source.branch(
> (k, v) -> ...,
> (k, v) -> ...,
> (k, v) -> ...,
> (k, v) -> ...,
> (k, v) -> ...,
> (k, v) -> ...,
> (k, v) -> ...,
> (k, v) -> ...
> )
> stream12 = stream1.join(stream2, ...).peek((k, v12) -> log(v12))
> stream34 = stream3.join(stream4, ...).peek((k, v34) -> log(v34))
> stream56 = stream5.join(stream6, ...).peek((k, v56) -> log(v56))
> stream78 = stream7.join(stream8, ...).peek((k, v78) -> log(v78))
> stream1234 = stream12.join(stream34, ...).peek((k, v1234) -> log(v1234))
> stream123478 = stream1234.join(stream78, ...).peek((k, v123478) ->
> log(v123478))
> stream567 = stream56.join(stream7, ...).peek((k, v567) -> log(v567))
> final = stream123478.join(stream567, ...).foreach((k, vFinal) ->
> log(vFinal))
>
> What I observe is that it keeps printing log(vFinal)with same value again
> and again and does not seem to progress ahead.
> Any idea why this may be happening. What can I check to understand what
> could be going wrong.
>
> If I stop kafka and delete all the data and the restart everything then it
> all works fine from the next set of data.
> But then I loose the source streams data from processing, before I had
> restarted it (when it went into this infinite loop).
>
> Thanks
> Sachin
>

Comments