Skip to main content

Re: Re-key by multiple properties without composite key

Thanks for the details. This does make sense.

So it seems you can read all topic as table (ie, builder.table("topic")
-- no need to so `builder.stream().toTable()`).

And you can use the built-in FK-table-table join, and aggregate the result:

KTable result =
folderTable
.join(personTable, (folderId, folder) -> folder.customerId, ...)
.groupBy((...) -> (personId, ...))
.aggregate(...);
result.toStream().to("resultTopic");

Note the fk-extractor `(folderId, folder) -> folder.customerId` that
tells the join to use `customerId` from the `folderTable` to lookup the
person from personTable.

Think of `folderTable` as fact-table and `personTable` as dimension table.


KS will take care of everything else under the hood automatically.


-Matthias

On 1/30/24 11:25 AM, Karsten Stöckmann wrote:
> Matthias, thanks for getting back on this. I'll try to illustrate my
> intent with an example as I'm not yet fully familiar with Kafka
> (Streams) and its idioms...
>
> Assume classes Person and Folder:
>
> class Person {
> Long id;
> String firstname;
> String lastname;
> // some content
> }
>
> class Folder {
> Long id;
> String folderNumber;
> // some other content
> Long customerId; // FK, points to Person.id
> Long billingAddressId; // FK, also points to Person.id
> }
>
> Thus both foreign keys of Folder point to Person entities, yet with
> different semantics. They're not composite keys but act independently.
>
> Now assume I want to build an aggregate Person object containing
> Folder.folderNumber of all folders associated with a Person entity,
> regardless whether it acts as a customer or billing address. My
> (naive) idea was to build re-keyed KTables by Folder.customerId and
> Folder.billingAddressId and then joining / aggregating them with the
> Person KTable in order to build something like this:
>
> class AggregatedPerson {
> Long id;
> List<String> folderNumbers; // or even List<Folder>
> // ...
> }
>
> (The latter supposed to be written to an output topic in order to
> serve as input for Solr or ElasticSearch.)
>
> Does this even make sense?
>
>
>> If you read the topic a KTable, you cannot repartition because it
>> violates the contract. A KTable must be partitioned by it's primary key,
>> ie, the ID field, and thus the DSL does not offer you a repartition option.
>
> So re-key means repartition? ATM the partition size of all input
> topics is 1 as per Kafka UI, as I've specified no extra configuration
> for them.
>
> Best wishes,
> Karsten
>
> Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax <mjsax@apache.org>:
>>
>>>> Both fk1 and fk2 point to the PK of another entity (not shown for
>>>> brevity, of no relevance to the question).
>>
>> It this two independent FK, or one two-column FK?
>>
>>
>>> Ingesting the topic into a Kafka Streams application, how can I re-key
>>> the resulting KTable<Long, A> by both fk1 and fk2?
>>
>> If you read the topic a KTable, you cannot repartition because it
>> violates the contract. A KTable must be partitioned by it's primary key,
>> ie, the ID field, and thus the DSL does not offer you a repartition option.
>>
>> You could read the topic as KStream though, and provide a custom
>> `StreamPartitioner` for a `repartition()` operation. However, this is
>> also "dangerous" because for a KStream it's also assumed that it's
>> partitioned by it's key, and you might break downstream DSL operators
>> with such a violation of the "contract".
>>
>> Looking into your solution:
>>
>>> .toTable()
>>> .groupBy(
>>> (key, value) -> KeyValue.pair(value.fk1(), value),
>>> Grouped.with(...))
>>
>> This will set fk1 as key, what seems not to align with you previous
>> comment about the key should stay the ID? (Same for f2k).
>>
>> Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's
>> unclear what you try to actually do to begin with? It sound like it's
>> overall a self-join of the input topic on fk1 and fk2 ?
>>
>>
>> -Matthias
>>
>> On 1/28/24 2:24 AM, Karsten Stöckmann wrote:
>>> Hi all,
>>>
>>> just stumbled upon another Kafka Streams issue that keeps me busy these days.
>>>
>>> Assume a (simplified) class A like this:
>>>
>>> class A {
>>> private Long id;
>>> private String someContent;
>>> private Long fk1;
>>> private Long fk2;
>>> // Getters and setters accordingly
>>> }
>>>
>>> Both fk1 and fk2 point to the PK of another entity (not shown for
>>> brevity, of no relevance to the question).
>>>
>>> Now assume a Kafka topic built from instances of class A, keyed by its
>>> id (see above).
>>>
>>> Ingesting the topic into a Kafka Streams application, how can I re-key
>>> the resulting KTable<Long, A> by both fk1 and fk2? Note that the
>>> resulting key should not be changed or turned into some kind of
>>> composite key as it is used in later join operations.
>>>
>>> My (naive) solution involves creating two KTables from the input
>>> stream, re-keying them by fk1 and fk2 accordingly and then outer
>>> joining both resulting (re-keyed) KTables.
>>>
>>> KStream<Long, A> in = streamsBuilder.stream(topic, Consumed.with(...));
>>>
>>> KTable<Long, A> rekeyedByFk1 = in
>>> .toTable()
>>> .groupBy(
>>> (key, value) -> KeyValue.pair(value.fk1(), value),
>>> Grouped.with(...))
>>> .aggregate(
>>> Aggregate::new,
>>> (key, value, aggregate) -> aggregate.add(value),
>>> (key, value, aggregate) -> aggregate.remove(value),
>>> Materialized.with(...));
>>>
>>> KTable<Long, a> rekeyedByFk2 = in
>>> .toTable()
>>> .groupBy(
>>> (key, value) -> KeyValue.pair(value.fk2(), value),
>>> Grouped.with(...))
>>> .aggregate(
>>> ... same as above
>>> );
>>>
>>> KTable<Long, A> joined = rekeyedByFk1
>>> .outerJoin(
>>> rekeyedByFk2,
>>> <value joiner>)
>>> .groupBy(KeyValue::pair, Grouped.with(...))
>>> .aggregate(...);
>>>
>>> <value joiner> would integrate the (already pre-joined) Aggregates as
>>> to avoid duplicates.
>>>
>>> Does this seem like a viable solution, or are there better / simpler /
>>> more efficient implementations?
>>>
>>> Best wishes,
>>> Karsten

Comments