Skip to main content

Posts

Showing posts from February, 2021

Re: high CPU usage after Kafka upgrade

The last message format version was in 0.11. Once you have the flame graph, please file a JIRA with it attached. CPU usage is expected to be lower in the new version, not higher. Ismael On Fri, Feb 26, 2021, 9:59 AM Jhanssen Fávaro < jhanssenfavaro@gmail.com > wrote: > Not sure if this applies only to this 0.10 version. > > [image: image.png] > Regards. > > > On Fri, Feb 26, 2021 at 2:54 PM Péter Sinóros-Szabó > <peter.sinoros-szabo@transferwise.com.invalid> wrote: > >> Hi, >> >> No, CPU increase shouldn't be there. Upgrades usually bring lower CPU >> usage. >> >> And yes, I followed the upgrade protocol as it is described in the >> documentation, I got the CPU increase when I upgraded the 1st instance as >> the first step. >> >> Cheers, >> Peter >> >> On Fri, 26 Feb 2021 at 18:35, Jhanssen Fávaro < jhanss...

Introducing JulieOps, a tool to help you build your GitOps and self-service with Apache Kafka and related

Hi, I hope this message finds everyone healthy and good. I wanted to reach out to the community to introduce JulieOps ( https://github.com/kafka-ops/julie ). This is a project I have been working in recent times to help bring automation and self-service when using Apache Kafka. From here I would like to thanks the many people who contributed with feedback, ideas, documentation and code to the project. This project would not be what it is without the community it has. If you find it useful, I am grateful. If you have any idea, improvement or want to contribute, you are very much welcome!. Releases are available as RPM, DEB and fat jar from https://github.com/kafka-ops/julie/releases/ as well from docker hub. All the details, as best as possible, from the documentation https://julieops.readthedocs.io/en/latest/ -- Pere Urbon-Bayes Software Architect https://twitter.com/purbon https://www.linkedin.com/in/purbon/

Re: Rebalancing stuck, never finishes

Peter, It does seem like KAFKA-9752 is the most likely suspect, although if your clients were upgraded to 2.6.1 then I don't believe they would be on an early enough version of the JoinGroup to run into this. I'm not 100% sure though, it may be a good idea to leave a comment on that ticket and ping Jason directly since he implemented the fix Murilo, I agree that your problem is not likely to be KAFKA-9752, since that was caused by KAFKA-9232 and that code is not present in 2.2.1. But maybe you're hitting up on the issue which KAFKA-9232 was originally intended to fix? In any case, 2.2.1 is quite old now so there may be other known bugs which have since been fixed. I know it's not always possible/easy, but I would still recommend to upgrade your brokers to a more recent version if you can. On Fri, Feb 26, 2021 at 7:19 AM Murilo Tavares < murilofla@gmail.com > wrote: > Just to provide a bit more detail, I noticed Peter's p...

Re: high CPU usage after Kafka upgrade

Not sure if this applies only to this 0.10 version. Regards. On Fri, Feb 26, 2021 at 2:54 PM Péter Sinóros-Szabó <peter.sinoros-szabo@transferwise.com.invalid> wrote: Hi, No, CPU increase shouldn't be there. Upgrades usually bring lower CPU usage. And yes, I followed the upgrade protocol as it is described in the documentation, I got the CPU increase when I upgraded the 1st instance as the first step. Cheers, Peter On Fri, 26 Feb 2021 at 18:35, Jhanssen Fávaro < jhanssenfavaro@gmail.com > wrote: > Hi Peter, > I am on the same, a lot of questions about the Kafka's upgrade process. But > looks like tha this CPU Increase is expected, at least while you don't > finish every broker upgrade. > > In this case, when you say you didn't change the version, you say that for > any brokers right ? > > Basically, you should upgrade the binaries in every broker, but before the > restart, change its...

Re: high CPU usage after Kafka upgrade

Hi, No, CPU increase shouldn't be there. Upgrades usually bring lower CPU usage. And yes, I followed the upgrade protocol as it is described in the documentation, I got the CPU increase when I upgraded the 1st instance as the first step. Cheers, Peter On Fri, 26 Feb 2021 at 18:35, Jhanssen Fávaro < jhanssenfavaro@gmail.com > wrote: > Hi Peter, > I am on the same, a lot of questions about the Kafka's upgrade process. But > looks like tha this CPU Increase is expected, at least while you don't > finish every broker upgrade. > > In this case, when you say you didn't change the version, you say that for > any brokers right ? > > Basically, you should upgrade the binaries in every broker, but before the > restart, change its configuration to reflect your old version. > > - inter.broker.protocol.version=2.4.1 > - log.message.format.version=2.4.1 > > And so, after you finish every hos...

Re: high CPU usage after Kafka upgrade

Hi, thanks, yes I planned to run a profiler on it, Opsian to be exact, to see what's going on, but the async profiles is a good option as well. I just wanted to ask if anyone experienced this before. I will get back here if I find something useful. Peter On Fri, 26 Feb 2021 at 18:34, Alex Woolford < alex@woolford.io > wrote: > It might be worth attaching a profiler to see what's eating up all the > cycles, Peter. > > I used this recently, and it turned out that my Prometheus monitoring was > the culprit: https://github.com/jvm-profiling-tools/async-profiler > > From my terminal history: > > cd /tmp > wget > > https://github.com/jvm-profiling-tools/async-profiler/releases/download/v1.8.3/async-profiler-1.8.3-linux-x64.tar.gz > tar xvf async-profiler-1.8.3-linux-x64.tar.gz > cd async-profiler-1.8.3-linux-x64 > ./profiler.sh -d 30 -f /tmp/flamegraph.svg 8983 > > > ... where 8983 is the ...

Re: high CPU usage after Kafka upgrade

Hi Peter, I am on the same, a lot of questions about the Kafka's upgrade process. But looks like tha this CPU Increase is expected, at least while you don't finish every broker upgrade. In this case, when you say you didn't change the version, you say that for any brokers right ? Basically, you should upgrade the binaries in every broker, but before the restart, change its configuration to reflect your old version. - inter.broker.protocol.version=2.4.1 - log.message.format.version=2.4.1 And so, after you finish every host/broker binary upgrade and clients(consumers/producers upgrades its .jars/library versions to 2.6.1) you should just put a comment on those lines versions(log and inter.broker.vesion) for each of the brokers and restart one by one. Thats what I understood reading the documentation: https://kafka.apache.org/documentation/#upgrade Best Regards! On Fri, Feb 26, 2021 at 2:19 PM Péter Sinóros-Szabó <pet...

Re: high CPU usage after Kafka upgrade

It might be worth attaching a profiler to see what's eating up all the cycles, Peter. I used this recently, and it turned out that my Prometheus monitoring was the culprit: https://github.com/jvm-profiling-tools/async-profiler From my terminal history: cd /tmp wget https://github.com/jvm-profiling-tools/async-profiler/releases/download/v1.8.3/async-profiler-1.8.3-linux-x64.tar.gz tar xvf async-profiler-1.8.3-linux-x64.tar.gz cd async-profiler-1.8.3-linux-x64 ./profiler.sh -d 30 -f /tmp/flamegraph.svg 8983 ... where 8983 is the pid of the Kafka process. ... and then it spat out a beautiful interactive flame chart. On Fri, Feb 26, 2021 at 10:26 AM Péter Sinóros-Szabó <peter.sinoros-szabo@transferwise.com.invalid> wrote: > Hi, > > I just upgraded from Kafka 2.4.1 to 2.6.1 and I see huge CPU usage on the > broker after the upgrade. Upgrade in this case means that I only bumped the > broker version on 1 of the bro...

high CPU usage after Kafka upgrade

Hi, I just upgraded from Kafka 2.4.1 to 2.6.1 and I see huge CPU usage on the broker after the upgrade. Upgrade in this case means that I only bumped the broker version on 1 of the brokers out of the 6 and didn't change the protocol or message format versions. Before the upgrade, it used about 35% CPUs. After the upgrade it uses 200% but if I add two more CPUs to the host, it is happy to use about 350%. I tried 2.5.1 and 2.7.0 versions too. All of those versions show the same. Any idea what may be wrong? Thanks, Peter

Re: Rebalancing stuck, never finishes

Just to provide a bit more detail, I noticed Peter's pattern: "Rebalance failed. org.apache.kafka.common.errors.DisconnectException: null" "(Re-)joining group" But I also get a different pattern, interchangeably: Group coordinator broker-1:9092 (id: 2147483646 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted. Followed by Discovered group coordinator broker-1:9092 (id: 2147483646 rack: null) On Fri, 26 Feb 2021 at 09:59, Murilo Tavares < murilofla@gmail.com > wrote: > Hi > I got the same behaviour yesterday while trying to upgrade my KafkaStreams > app from 2.4.1 to 2.7.0. Our brokers are on 2.2.1. > > Looking at KAFKA-9752 it mentions the cause being two other tickets: > https://issues.apache.org/jira/browse/KAFKA-7610 > https://issues.apache.org/jira/browse/KAFKA-9232 > > Although the first ticket seems fixed in 2.2.0, the latter was just f...

Re: Rebalancing stuck, never finishes

Hi I got the same behaviour yesterday while trying to upgrade my KafkaStreams app from 2.4.1 to 2.7.0. Our brokers are on 2.2.1. Looking at KAFKA-9752 it mentions the cause being two other tickets: https://issues.apache.org/jira/browse/KAFKA-7610 https://issues.apache.org/jira/browse/KAFKA-9232 Although the first ticket seems fixed in 2.2.0, the latter was just fixed in 2.2.3, so my brokers shouldn't have the code for KAFKA-9232. But what I don't understand is that KAFKA-9752 says: "Note that this is only possible if 1) we have a consumer using an old JoinGroup version, 2) the consumer times out and disconnects from its initial JoinGroup request." In this case, I guess my consumer is not using an old JoinGroup, as my consumers (KafkaStreams) are on 2.7.0... Thanks Murilo On Fri, 26 Feb 2021 at 06:06, Péter Sinóros-Szabó <peter.sinoros-szabo@transferwise.com.invalid> wrote: > Hey Sophie, > > thanks for the li...

Re: Rebalancing stuck, never finishes

Hey Sophie, thanks for the link, I was checking that ticket, but I was not sure if it is relevant for our case. Eventually we "fixed" our problem with reducing the session.timeout.ms (it was set to a high value for other reasons). But today, in another service, we faced the same problem when upgrading the Kafka Client from 2.5.1 to 2.6.1. We are still using 2.4.1 on the brokers. Do you think the same problem (KAFKA-9752) might cause this problem too? It's hard to judge just based on the description of that ticket. Thanks, Peter

Re: Rebalancing stuck, never finishes

Hey Peter, It does sound like you may have hit https://issues.apache.org/jira/browse/KAFKA-9752 You will need to upgrade your brokers in order to get the fix, since it's a broker-side issue On Tue, Feb 9, 2021 at 2:48 AM Péter Sinóros-Szabó <peter.sinoros-szabo@transferwise.com.invalid> wrote: > Hi, > > I have an application running with 6 instances of it on Kubernetes. All 6 > instances (pods) are the same, using the same consumer group id. > Recently we see that when the application is restarted (rolling restart on > K8s), the triggered rebalancing sometimes doesn't finish at all and the > Kafka Client stucks in rebalancing. Occasionally it finishes after 30-60 > minutes, sometimes it doesn't. > > If it is stuck, then if we stop the application and wait until > kafka-consumer-groups.sh doesn't show the group, and then we restart the > application, then the initial rebalancing finishes j...

Re: Create a new consumer for a consumer group from within the assign method of the rebalancing interface (AbstractPartitionAssignor)

If you create a new consumer inside the assign method (and assuming you actually start polling with it so that it sends the JoinGroup request), then yes, it would need a new rebalance to accommodate this consumer. The group coordinator will inform all the existing members to rejoin the group so that the rebalance can proceed with the latest up-to-date view of the current group. On Wed, Feb 24, 2021 at 7:58 AM Mazen Ezzeddine < mazen.ezzeddine@etu.univ-cotedazur.fr > wrote: > I am running a Kafka cluster on Kubernetes. I am implementing a custom > PartitionAssignor to personalize the way topic partitions are assigned to > existing consumers in the consumer group. To this end, I am overriding the > method Map<String, Assignment> assign( Cluster metadata, Map<String, > Subscription> subscriptions) > > > If inside the assign method I dynamically created a new consumer through > the Kubernetes client APIs, how would the reb...

Re: Event Sourcing with Kafka Streams and processing order of a re-entrant pipeline

Thank you John for the explanation! I confirm that introducing a full integration I have reproduced the problem. We have reviewed our pipelines using your suggestion (ValueTransformer and a state store) and now it seems to work correctly! If someone is interested here is the improved version of the same pipeline: https://github.com/davideicardi/es4kafka/blob/ca6f27a9db5e38ac029493ae4e1ddd47ade8266e/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala regards Davide On Sun, Jan 31, 2021 at 9:23 PM John Roesler < vvcephei@apache.org > wrote: > Hi David, > > Thank you for the question. > > If I can confirm, it looks like the "operations" topic is > the only input to the topology, and the topology reads the > "operations" topic joined with the "account" table and > generates a "movements" stream. It reads (and aggregates) > the "movements" stream to create the ...

Re: joins & co-location

Hi Nicolae In KStream-KStream, KStream-KTable, and KTable-KTable joins, both sides of the join need to have the same number of partitions. Kafka Streams will check for this, and if the number of partitions isn't the same, Kafka Streams will throw a TopologyException. Foreign key joins don't require the same number of partitions between the two tables, since you provide a foreignKeyExtractor function that extracts the key for the join from the "calling" or left side table of the join. HTH, Bill FYI I've answered the question in the forum as well. On Thu, Feb 25, 2021 at 8:15 AM Dumitru Nicolae Marasoiu < nicolae.marasoiu@gmail.com > wrote: > Hi guys, in case of KTable with KTable joins, is co-location a requirement? > Do the 2 topics need to have the same number of partitions? > https://forum.confluent.io/t/ktable-ktable-joins-is-colocation-needed/1034 > Thank you, > Nicolae >

joins & co-location

Hi guys, in case of KTable with KTable joins, is co-location a requirement? Do the 2 topics need to have the same number of partitions? https://forum.confluent.io/t/ktable-ktable-joins-is-colocation-needed/1034 Thank you, Nicolae

Re: kafka log.retention.bytes

log.retention.bytes is a broker-level config that sets the maximum size of a topic partition on a broker, so it will apply to all topics… unless a topic has the retention.bytes property configured — this is a topic-level config and only applies to a single topic — in which case that takes precedence. Kafka does not have a built-in mechanism for preventing full disks. You must do some topic growth prediction and set your topic retentions to be proportional to the total storage available per broker. As with most times disks fill up, it's not enjoyable dealing with the fallout, so spending the time to get it right is well worth it. -- Peter (from phone) > On Feb 24, 2021, at 7:46 AM, Calvin Chen < pingc.sh@hotmail.com > wrote: > > Hi all > > I have question about Kafka topic log retention on bytes, does the log.retention.bytes apply to each topic or it apply to all topics in broker? If it apply to each topic, then, when topic numbers keep gro...

Create a new consumer for a consumer group from within the assign method of the rebalancing interface (AbstractPartitionAssignor)

I am running a Kafka cluster on Kubernetes. I am implementing a custom PartitionAssignor to personalize the way topic partitions are assigned to existing consumers in the consumer group. To this end, I am overriding the method Map<String, Assignment> assign( Cluster metadata, Map<String, Subscription> subscriptions) If inside the assign method I dynamically created a new consumer through the Kubernetes client APIs, how would the rebalancing protocol behave in such case. Precisely, when the newly created consumer send a joinGroup request to the group coordinator(while the rebalancing process is still in progress), would the current in progress rebalancing completes, and then a new rebalance process is triggered to accommodate for the newly created consumer? Thanks.

kafka log.retention.bytes

Hi all I have question about Kafka topic log retention on bytes, does the log.retention.bytes apply to each topic or it apply to all topics in broker? If it apply to each topic, then, when topic numbers keep growing in broker, how can we make sure total disk size for all topic logs will not exceed total disk capacity of that broker? Thanks Calvin

Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

I guess it's possible but very unlikely because it works perfectly with all the previous versions and the current one? (2.5.1) Why did a change in the version introduce NULLS there? On Tue, Feb 23, 2021 at 9:16 PM Guozhang Wang < wangguoz@gmail.com > wrote: > Is it possible that the flattened values contain `null` and hence `_.split` > throws? > > On Tue, Feb 23, 2021 at 8:23 AM Nitay Kufert < nitay.k@ironsrc.com > wrote: > > > Hey, missed your replay - but the code i've shared above the logs is the > > code around those lines (removed some identifiers to make it a little bit > > more generic): > > > > > inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k -> > > > ${v.printForDebug}")}) # return type KStream[Windowed[String], > > > SingleInputMessage] > > > > > > On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang < wangguoz@gmail.com > ...

Re: Window Store

Thanks a lot Guozhang. I will try and let you know. Really appreciate all the help. This community has been amazing. Thanks On Tue, Feb 23, 2021 at 5:48 PM Guozhang Wang < wangguoz@gmail.com > wrote: > Sorry I was not very clear before: by "WindowStore" I meant implementing > your own customized store based on a kvStore where the key is a combo > <timestamp, key>. Note you put timestamp first then key in your > serialization format, so that you can range-fetch with just the prefix on > timestamp then. In fact `WindowStore` that we provide is also following > this design principle, but it's combo key is in <key, timestamp> so range > fetch is not as efficient since you'd need to fetch a much larger range and > then filter a lot of records. > > > Guozhang > > On Tue, Feb 23, 2021 at 4:04 PM Navneeth Krishnan < > reachnavneeth2@gmail.com > > wrote: > > > Thanks Guozh...