Skip to main content

Posts

Showing posts from January, 2024

Re: Re-key by multiple properties without composite key

Thanks for the details. This does make sense. So it seems you can read all topic as table (ie, builder.table("topic") -- no need to so `builder.stream().toTable()`). And you can use the built-in FK-table-table join, and aggregate the result: KTable result = folderTable .join(personTable, (folderId, folder) -> folder.customerId, ...) .groupBy((...) -> (personId, ...)) .aggregate(...); result.toStream().to("resultTopic"); Note the fk-extractor `(folderId, folder) -> folder.customerId` that tells the join to use `customerId` from the `folderTable` to lookup the person from personTable. Think of `folderTable` as fact-table and `personTable` as dimension table. KS will take care of everything else under the hood automatically. -Matthias On 1/30/24 11:25 AM, Karsten Stöckmann wrote: > Matthias, thanks for getting back on this. I'll try to illustrate my > intent with an example as I...

question: mirror maker 2

Hello all, I am setting up mm2 to replicate messages, consumer groups, and consumer offset from a->b. I believe I am replicating those 3 items from a->b. my mm2 prop file is as followed: ``` # specify any number of cluster aliases clusters = a,b b.group.id =mm2-request # replication settings tasks.max = 24 replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy a.max.poll.records = 20000 #a.receive.buffer.bytes = 33554432 #a.send.buffer.bytes = 33554432 #a.max.partition.fetch.bytes = 33554432 #a.message.max.bytes = 37755000 a.compression.type = gzip #a.max.request.size = 26214400 #a.buffer.memory = 524288000 a.batch.size = 524288 b.max.poll.records = 20000 #b.receive.buffer.bytes = 33554432 #b.send.buffer.bytes = 33554432 #b.max.partition.fetch.bytes = 33554432 #b.message.max.bytes = 37755000 b.compression.type = gzip #b.max.request.size = 26214400 #b.buffer.memory = 524288000 b.batch.size = 524288 a.bootstrap...

Re: Re-key by multiple properties without composite key

Matthias, thanks for getting back on this. I'll try to illustrate my intent with an example as I'm not yet fully familiar with Kafka (Streams) and its idioms... Assume classes Person and Folder: class Person { Long id; String firstname; String lastname; // some content } class Folder { Long id; String folderNumber; // some other content Long customerId; // FK, points to Person.id Long billingAddressId; // FK, also points to Person.id } Thus both foreign keys of Folder point to Person entities, yet with different semantics. They're not composite keys but act independently. Now assume I want to build an aggregate Person object containing Folder.folderNumber of all folders associated with a Person entity, regardless whether it acts as a customer or billing address. My (naive) idea was to build re-keyed KTables by Folder.customerId and Folder.billingAddressId and then joining / aggregating them with the Person KTable in order ...

Re: What does kafka streams groupBy does internally?

Did reply on SO. -Matthias On 1/24/24 2:18 AM, warrior2031@gmail.com wrote: > Let's say there's a topic in which chunks of different files are all > mixed up represented by a tuple |(FileId, Chunk)|. > > Chunks of a same file also can be a little out of order. > > The task is to aggregate all files and store them into some store. > > The number of files is unbound. > > In pseudo stream DSL that might look like > > |topic('chunks') .groupByKey((fileId, chunk) -> fileId) .sortBy((fileId, > chunk) -> chunk.offset) .aggregate((fileId, chunk) -> > store.append(fileId, chunk)); | > > I want to understand whether kafka streams can solve this efficiently. > Since the number of files is unbound how would kafka manage intermediate > topics for groupBy operation? How many partitions will it use etc? Can't > find this details in the docs. Also let's say chunk has a flag t...

Re: Re-key by multiple properties without composite key

>> Both fk1 and fk2 point to the PK of another entity (not shown for >> brevity, of no relevance to the question). It this two independent FK, or one two-column FK? > Ingesting the topic into a Kafka Streams application, how can I re-key > the resulting KTable<Long, A> by both fk1 and fk2? If you read the topic a KTable, you cannot repartition because it violates the contract. A KTable must be partitioned by it's primary key, ie, the ID field, and thus the DSL does not offer you a repartition option. You could read the topic as KStream though, and provide a custom `StreamPartitioner` for a `repartition()` operation. However, this is also "dangerous" because for a KStream it's also assumed that it's partitioned by it's key, and you might break downstream DSL operators with such a violation of the "contract". Looking into your solution: > .toTable() > .groupBy( > (key, val...

v3.5.1 KRAFT mode: Cluster every day exact same time unavailable for ~30sec -> no controller available

Hello, we have Kafka v3.5.1 in KRAFT mode running, with two datacenters (via 10Gb/s Darkfiber): DC 1 : 3 nodes Controller / Broker DC 2 : 2 nodes Controller / Broker DC 2 : 1 node Broker Exactly at the same time: 21:01:00 (CEST) the cluster is unstable and no producer / consumer can access the cluster Every node has: * Own node ID * RACK ID  grep -E  '(id|rack)' /etc/kafka/server.properties broker.rack=0 node.id =1 broker.rack=0 -> DC1, broker.rack=1 -> DC2 We have the complete same setup also on our test system, but it runs without any issues. The only differences, are the missing darkfiber and different hostnames / certs. The rest is the same,because we use Puppet for CFG management. The logs looks like this: DC 1, Node 1: Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 21:01:05,135] INFO [RaftManager id=1] Completed transition to Unattached(epoch=1494, voters=[1, 2, 3, 4, 5], electionTimeoutMs=...

Re: [VOTE] 3.7.0 RC2

Hi Stan and Gaurav, Just to clarify some points mentioned here before KAFKA-14616: I raised a year ago so it's not related to JBOD work. It is rather a blocker bug for KRAFT in general. The PR from Colin should fix this. Am not sure if it is a blocker for 3.7 per-say as it was a major bug since 3.3 and got missed from all other releases. Regarding the JBOD's work: KAFKA-16082: Is not a blocker for 3.7 instead it's nice fix. The pr https://github.com/apache/kafka/pull/15136 is quite a small one and was approved by Proven and I but it is waiting for a committer's approval. KAFKA-16162: This is a blocker for 3.7. Same it's a small pr https://github.com/apache/kafka/pull/15270 and it is approved Proven and I and the PR is waiting for committer's approval. KAFKA-16157: This is a blocker for 3.7. There is one small suggestion for the pr https://github.com/apache/kafka/pull/15263 but I don't think any of the current feedback is blocking the pr fr...