Skip to main content

Re: Unexpected Tombstone records after kafka streams update 3.7.1

Sounds like https://issues.apache.org/jira/browse/KAFKA-16394

-Matthias

On 8/29/24 02:35, Vogel, Kevin, DP EPS, BN, extern, external wrote:
> Hello there,
>
> I searched the Apache Jira for a bug report on this topic but couldn't find one. Maybe anyone else has noticed something similar or knows more about this.
>
> After we updated our Spring Boot Kafka Streams application kafka-streams dependency from 3.6.2 to 3.7.1, we noticed some failing tests. The expected behavior of the streams-processor is, to join two input topics together and produce a single output record. Then the test injects a third record with a null value and expects another output record with a null value. But since the update we are getting two records with null value instead of one.
>
> I stripped down the processor to the absolute minimum, so you have some example code. Don't worry about the strange names. We are working with a legacy system and are unfortunately bound to them:
>
> @Bean
> public BiFunction<
> KStream<Gidpf01iKey, Gidpf01iValue>,
> KTable<String, Gidpf01gAggregate>,
> KStream<UuidString, Teil>>
> processTeil() {
> return (gidpf01i, gidpf01gAggregate) -> {
> var gidpf01iTable =
> gidpf01i
> .toTable();
>
> var joined =
> gidpf01iTable.leftJoin(
> gidpf01gAggregate,
> new Gidpf01iGidpf01gAggregateForeignKeyExtractor(),
> new Gidpf01iGidpf01gAggregateValueJoiner(),
> TableJoined.as("gidpf01i-gidpf01g-aggregate-to-teil"));
>
> return joined.toStream().selectKey(new GenericSbamUuidGenerator<>());
> };
> }
>
> You see nothing fancy. Just a KTable - Ktable left-join. Even more strange is, that not all streams-processors behave differently after the update. But they are all very similar and I can not see any significant difference.
>
> The test looks like this:
>
> @Test
> void testProcessGidpf01iTombstone() {
>
> // Key und Value für Teilestamm und Teiletext
> final var gidpf01i = createGidpf01iKeyValue("A", REFRESH);
> final var gidpf01gAggregate =
> createGidpf01gAggregate("T123456789", "Test Beschreibung", "Test Zusatzinformation");
>
> gipf01gAggregateInputTopic.pipeInput(gidpf01gAggregate);
> gidpf01iInputTopic.pipeInput(gidpf01i);
>
> // Datensatz gelesen
> var results = teilOutputTopic.readKeyValuesToList();
> assertThat(results).hasSize(1);
> assertThat(results.get(0).key).isInstanceOf(UuidString.class);
> assertThat(results.get(0).value).isNotNull();
>
> // Tombstone für GIDPF01I senden
> gidpf01iInputTopic.pipeInput(new TestRecord<>(gidpf01i.key(), null));
>
> results = teilOutputTopic.readKeyValuesToList();
> assertThat(results).hasSize(1); // <--- failing here because results contain two identical records with null value instead of one
> assertThat(results.get(0).key).isInstanceOf(UuidString.class);
> assertThat(results.get(0).value).isNull();
>
> assertThat(gidpf01iDlqOutputTopic.isEmpty()).isTrue();
> }
>
> Does anyone know about a bug or behavioral change like this in Kafka streams 3.7.1? I'm very grateful for any response.
>
> Kind Regards
> Kevin Vogel
> Software Developer extern
> Mitarbeiter der Qvest Digital AG
>
> Am Dickobskreuz 10, D-53121 Bonn
> Tel.: +49 228 54881-0
> HRB AG Bonn 18196 Ust-ID (VAT): DE274355441
> Vorstand: Dr. Stefan Barth, Kai Ebenrett, Boris Esser, Alexander Steeg
> Vorsitzender Aufsichtsrat: Stefan Nöthen
>
>

Comments