Skip to main content

Posts

Showing posts from January, 2025

Re: JoinGroup API response timing.

Hi Paul, My question is, which of the above benefits you mention *cannot* be achieved with plan consumers and no groups? If I, as a developer, knew the number of partitions in advance and limiting to Kubernetes as the platform: 1, 2. I can launch as many consumers as partitions. A failed consumer will be restarted by Kubernetes. 3. Agree. 4. I can assign multiple partitions in my consumers. Best regards. On Thu, Jan 30, 2025 at 2:46 PM Brebner, Paul <Paul.Brebner@netapp.com.invalid> wrote: > Hi again – interesting discussion thanks. Coming from a fairly old school > background of MOM and JMS in the 00's I guess I found Kafka consumer groups > "interesting" because: > > 1 – they enable message fan-out to multiple consumers – i.e. each group > gets a copy of each message – this isn't "normal" for traditional > pub-sub/MOM > 2 – they enable the sharing of messages for scalability and concurrency – > y...

Re: JoinGroup API response timing.

Hi again – interesting discussion thanks. Coming from a fairly old school background of MOM and JMS in the 00's I guess I found Kafka consumer groups "interesting" because: 1 – they enable message fan-out to multiple consumers – i.e. each group gets a copy of each message – this isn't "normal" for traditional pub-sub/MOM 2 – they enable the sharing of messages for scalability and concurrency – you can easily scale consumer processing by increasing the number of consumers (and partitions) 3 – message processing order – order is only guaranteed per partition 4 – check out the parallel consumer for higher throughput with less consumers/partitions – there are more options for parallel processing and ordering Regards, Paul From: Artem Livshits <alivshits@confluent.io.INVALID> Date: Friday, 24 January 2025 at 5:29 am To: users@kafka.apache.org < users@kafka.apache.org > Subject: Re: JoinGroup API response timing. [You don't often ...

Re: Random access to kafka messages

Hello, One possible drawback you may want to consider is that the proposed scenario will not work on a DR cluster maintained by MirrorMaker. Other than that it sounds interesting. Best regards, Radu On Fri, Jan 24, 2025 at 12:49 PM Jan Wypych <Jan.Wypych@billennium.com.invalid> wrote: > Hello, > > We are currently designing a system that will ingest some XML messages and > then it will store them into some kind of long-term storage (years). The > access pattern to data shows that new messages (1-2 weeks old) will be > frequent, older data will be accessed rarely. > We currently chose Kafka as an ingest part, some kind of S3 for cold > long-term, but we are still thinking how we should approach hot storage > (1-2 weeks). We established that our S3 for hot data is too slow. > We have a few options for this hot part of a storage, but one of them is > Kafka (it will greatly simplify the whole system and Kafka reliability if ...

Re: Random access to kafka messages

Hello Jan, Thanks for your question. It is my understanding that the producer batching strongly informs the on-disk layout, because appending to the log is done on a per-batch level, not a per-record level. This is done for any number of reasons, one of them is being able to append data without uncompressing and recompressing it on the broker side. There are situations when Kafka must fall back to recompressing batches before append, but I don't think they're relevant to this discussion. The consumer batching is also somewhat informed by the on-disk layout. Data fetched by the consumers should always be one or more batches, never a fraction of a batch. This is done to avoid uncompressing and recompressing the data similar to the Produce side, but also to enable sendfile and block cache optimizations. So while consumer batching may be configurable per-client, this is done by including multiple batches together. Consumer fetches should have a lower bound, wh...

RE: Random access to kafka messages

Hello Greg, Thanks for mentioning batch sizes. Could you please elaborate on the impact of producer batch size? My understanding is that producer just writes messages (in batches of configurable size) to Kafka, but these batches does not impact data stored on disks. The consumer reading messages should be totally decupled from how producer is writing them, but maybe I am missing something. For the batch reading we are aware of that and will set fetch.min.bytes and fetch.max.bytes to force Kafka not to prefetch more than one message. Our understanding (maybe wrong) is that Kafka have many IO threads (I am not sure if there are/can be separate reading and writing pools) and each one can process IO assignments with different parameters (you can have one thread returning data after 10 bytes read and second one waiting for 10MB). By this understanding (and, as said, we may be wrong here) batch sizes can be an issue, but they are configurable on the individual producer and consumer le...

Re: Random access to kafka messages

Sounds "interesting" – in theory it could work, just remember that segment size will impact latency – records are stored in segments on local/remote storage (with tiering enabled), bigger segments improve throughput, but smaller segments may improve read latency, Paul Brebner From: Greg Harris <greg.harris@aiven.io.INVALID> Date: Saturday, 25 January 2025 at 7:02 am To: Users < users@kafka.apache.org > Subject: Re: Random access to kafka messages EXTERNAL EMAIL - USE CAUTION when clicking links or attachments Hello Jan, I also have not heard of a use case like this for Kafka. One statistic that I think you might need to manage is batch size, and its effect on compression and read amplification. Larger batches on the producer side can make your producers more performant and compression more effective. But large batches will also increase the amount of data delivered to consumers that is then discarded to read a single message. This additi...

Re: Random access to kafka messages

Hello Jan, I also have not heard of a use case like this for Kafka. One statistic that I think you might need to manage is batch size, and its effect on compression and read amplification. Larger batches on the producer side can make your producers more performant and compression more effective. But large batches will also increase the amount of data delivered to consumers that is then discarded to read a single message. This additional data transfer wastes disk bandwidth on the brokers and network bandwidth on the broker and consuming application. So while a lot of existing tuning advice and optimizations in Kafka work with larger batches, you will need to spend some time profiling and making batch size tradeoffs. Hope this helps, Greg On Fri, Jan 24, 2025, 3:05 AM Ömer Şiar Baysal < osiarbaysal@gmail.com > wrote: > Hi, > > The data you gathered shows promising results, one thing the consider is > testing how the Page Cache that Ka...

Re: Random access to kafka messages

Hi Jan, One immediate concern (which you probably have thought through) is that you will have to be careful with configuring Kafka's cleanup policy. From my understanding, in "delete" cleanup mode, Kafka calculates which messages to delete based on a combination of the message's age and the ratio of used-to-total available disk space (corrections welcome!). Given that, it might be fussy to adjust the cleanup policy s.t. you get your 2-week retention. Since it sounds like you're basically using Kafka as a key-value caching layer, maybe using something like Redis (Elasticache on AWS) with a TTL would better suit your use case? You would get a definite guarantee on message persistence time. You would also be able to choose the key generation policy, instead of (what I'm guessing you're doing now) keeping an external index of fingerprint-to-partition:offset mappings. If, on the other hand, you're looking for a cache whose persistence varie...

Re: Random access to kafka messages

Hi, The data you gathered shows promising results, one thing the consider is testing how the Page Cache that Kafka utilizes affect the response times, which greatly improves response time for the fetch requests that are already in the cache since it is stored in memory and may give an impression that all the fetch requests performance would be the same, it is in fact would be different for non-cached data. Good luck and let me know if you need more information about page cache. Omer Siar Baysal On Fri, Jan 24, 2025, 11:48 Jan Wypych <Jan.Wypych@billennium.com.invalid> wrote: > Hello, > > We are currently designing a system that will ingest some XML messages and > then it will store them into some kind of long-term storage (years). The > access pattern to data shows that new messages (1-2 weeks old) will be > frequent, older data will be accessed rarely. > We currently chose Kafka as an ingest part, some kind of S3 for cold > lo...

Random access to kafka messages

Hello, We are currently designing a system that will ingest some XML messages and then it will store them into some kind of long-term storage (years). The access pattern to data shows that new messages (1-2 weeks old) will be frequent, older data will be accessed rarely. We currently chose Kafka as an ingest part, some kind of S3 for cold long-term, but we are still thinking how we should approach hot storage (1-2 weeks). We established that our S3 for hot data is too slow. We have a few options for this hot part of a storage, but one of them is Kafka (it will greatly simplify the whole system and Kafka reliability if extremely high). Each Kafka message can be accessed using the offset/partition pair (we need some metadata from messages anyway, so getting this pair is free for us). Kafka stores its data in segments, each of them has its own index, so we do not do a full scan of a topic. Consumer configs can be tweaked, so we do not prefetch more than one message, do not commit of...

Re: JoinGroup API response timing.

> I wanted to understand the motivation for consumer group The motivation for the consumer group is to let Kafka manage the workload distribution among consumers. The application then can just run multiple uncoordinated instances and not worry about partitions that have no consumer assigned or consumers that are overloaded / idle -- the consumer group coordinator would coordinate partition assignment across consumers in the same consumer group. If the application doesn't need this functionality it can use KafkaConsumer.assign method to manually assign partitions to consumers and run instance coordination logic to manage liveness / workload balance by itself. > As for offsets, let them be saved locally by consumer or Redis, etc. This can work, unless we need exactly-once semantics (make sure that if we consume a record from a topic and produce a corresponding record to another topic, we either atomically do both or not do both). For exactly-once semanti...

Re: Kafka Streams Consumer Constantly Rebalance over 100k tps

As I said: you first should try to understand the root cause. A `DisconnectException` sounds like a network issue to me. So it might not have anything to do with KS configurations, but seems to be an environment question. Of course, it could mean that heartbeats are not sent, and that a thread drop out of the consumer group, triggering a rebalance. Would be good to verify this with broker (ie, group coordinator) logs. `TaskMigratedException` is a follow up error if you wish -- the thread did already drop out of the consumer group (w/o noticing it). In general, the root cause for this one is not calling poll() on time to rejoin the group during an on-going rebalance. >> - setting max.poll.interval.ms starting from 60000 to 300000 The default is 300,000 (ie, 5 minutes). Reducing it to 1 minute would rather destabilize the system. We did see deployment with higher settings, so maybe increasing it to 15 minutes could help. But larger timeout have o...

Re: JoinGroup API response timing.

I wanted to understand the motivation for consumer group. *If number of partitions are known in advance*, then each consumer can subscribe to individual topic-partition. If such a consumer fails, let kubernetes reschedule it. (Elsewhere, similar restart mechanism may exist.) Sharing the load of a failed consumer assumes that data across partitions are "same". Else, such sharing is needless burden. As for offsets, let them be saved locally by consumer or Redis, etc. The concept of consumer group shifts the responsibility of partition assignment to broker because only broker knows the number of partitions. Best regards. On Thu, 23 Jan, 2025, 06:25 Brebner, Paul, <Paul.Brebner@netapp.com.invalid> wrote: > Hi – short answer is consumers can read from a specific partition, but in > general for a consumer group you want to balance the partitions across the > available consumers for high throughput – if a consumer fails or is kicked > off t...

Re: JoinGroup API response timing.

Hi – short answer is consumers can read from a specific partition, but in general for a consumer group you want to balance the partitions across the available consumers for high throughput – if a consumer fails or is kicked off the group because it times out etc then the remainder of the consumers are rebalanced across the partitions etc. Paul From: Chain Head < mrchainhead@gmail.com > Date: Wednesday, 22 January 2025 at 5:19 pm To: users@kafka.apache.org < users@kafka.apache.org > Subject: Re: JoinGroup API response timing. EXTERNAL EMAIL - USE CAUTION when clicking links or attachments Thanks for the explanation. Slight digression and perhaps a silly question w.r.t. consumers. Since multiple groups are possible, at a high level, the broker effectively sends data of a given topic-partition to multiple consumers while keeping track of offsets. So, why not let consumers specify the partition ID they want to consume? Is the concept of consumer grou...