Skip to main content

Posts

Showing posts from September, 2020

Unexpected cluster rebalancing

As part of our load tests on Kafka, we are trying to create a certain number of sink connectors, each one with a unique name and topic and therefore, each sink connector is in his own consumer group. For every new connector added, a rebalance is triggered for all the connectors (the more connectors created before, the longer the current rebalance). Our creation is done by sending parallel create requests to the connector RestAPI (we have tried doing this in several ways – 1000 requests, the desired amount, at once, and also in smaller "batches" – 10/20 requests). We have a few questions: Why is the rebalance happening? Why does it take so long (can be hours)? Is the rebalance configurable? The cluster is deployed using strimzi on kuberentes. The connectors are created with the following template: POST connectors/HttpSinkConnector-{index} { "name": "HttpSinkConnector-{index}", "config": { "connector....

Kafka consumer coming without partition assignment

Hi All, I am using Kafka version 2.4.0. I have 8 Kafka broker. I am having a topic with 80 partition and 80 java based consumer. Sometime I find very stange behaviour on consumer restart i.e. consumer comes up with no partitions assigned to them.. in log messages appear like "adding newly assigned partitions : " i.e. empty... I also verified from kafka-consumer-group script to check member and state and its showing member as 80 and state as stable. But with verbose option it is showing assignment as - . As I am not having any error on the broker logs and consumer logs. Could not understand what's happening here and under what circumstance this possible. Any help would be appreciated. Thanks and Regards Sanjay

Re: Sign up email for apache kafka notifications

Hello Josh, Please help yourself subscribing to the mailing list of Kafka: https://kafka.apache.org/contact On Wed, Sep 30, 2020 at 9:36 AM <barclays.middleware@barclays.com.invalid> wrote: > Hello, > > This is an email to sign up for the notifications for apache kafka. > Please advise if there are any further actions that I will need to take. > > Many thanks > Josh > Joshua Racey | Middleware Security & Control | Apprentice | Infrastructure > Services > Phone : 03301529774 > Email: Joshua.Racey@barclayscorp.com <mailto: Joshua.Racey@barclayscorp.com > > Ground Floor | Turing House | BTC Radbroke | Knutsford | WA16 9EU > Respect | Integrity | Service | Excellence | Stewardship > Helping people achieve their ambitions - in the right way > > P Please consider the environment before printing this email > > > > Restricted - External > > This e-mail and any attachments ...

Re: Is this a valid use case for reading local store ?

Hello Mohan, If I understand correctly, your async event trigger process runs out of the streams application, that reads the state stores of app2 through the interactive query interface, right? This is actually a pretty common use case pattern for IQ :) Guozhang On Wed, Sep 30, 2020 at 1:22 PM Parthasarathy, Mohan < mparthas@hpe.com > wrote: > Hi, > > A traditional kafka streams application (App1) reading data from a kafka > topic, doing aggregations resulting in some local state. The output of this > application is consumed by a different application(App2) for doing a > different task. Under some conditions, there is an external trigger (async > event) which needs to trigger requests for all the keys in the local store > to App2. To achieve this, we can read the local stores from all the > replicas and send the request to App2. > > This async event happens less frequently compared to the normal case that > leads t...

Is this a valid use case for reading local store ?

Hi, A traditional kafka streams application (App1) reading data from a kafka topic, doing aggregations resulting in some local state. The output of this application is consumed by a different application(App2) for doing a different task. Under some conditions, there is an external trigger (async event) which needs to trigger requests for all the keys in the local store to App2. To achieve this, we can read the local stores from all the replicas and send the request to App2. This async event happens less frequently compared to the normal case that leads to the state creation in the first place. Are there any caveats doing it this way ? If not, any other suggestions ? Thanks Mohan

Sign up email for apache kafka notifications

Hello, This is an email to sign up for the notifications for apache kafka. Please advise if there are any further actions that I will need to take. Many thanks Josh Joshua Racey | Middleware Security & Control | Apprentice | Infrastructure Services Phone : 03301529774 Email: Joshua.Racey@barclayscorp.com <mailto: Joshua.Racey@barclayscorp.com > Ground Floor | Turing House | BTC Radbroke | Knutsford | WA16 9EU Respect | Integrity | Service | Excellence | Stewardship Helping people achieve their ambitions - in the right way P Please consider the environment before printing this email Restricted - External This e-mail and any attachments are confidential and intended solely for the addressee and may also be privileged or exempt from disclosure under applicable law. If you are not the addressee, or have received this e-mail in error, please notify the sender immediately, delete it from your system and do not copy, disclose or otherwise act upon ...

New Go client & cli program

Hello all, For the past year and a half I've been working on two open source projects in my spare time, and I'd love anybody to check it out and potentially provide feedback. The first project is a pure Go client that (theoretically) supports all KIPs: https://github.com/twmb/kafka-go The second project is a command line program that uses my client. This program supports all behavior that speaks to Kafka directly (not Zookeeper): https://github.com/twmb/kcl I'll continue adding features to the client as new KIPs are adopted, and to kcl as new Kafka requests are added. For the client, I've integration tested a chain of producing and consuming, but as with all programs, I can't be 100% sure that it is bug free. If anybody else tests it and notices something off, please let me know. I hope that either of these can be useful for somebody, and thank you! - Travis

Re: Newly added topic or partitions are not assigned to running consumer groups using static membership

That seems to be a bug indeed, I will reply on the ticket. Guozhang On Thu, Sep 24, 2020 at 8:03 PM Fu, Tony < tfu@ea.com > wrote: > Is anyone seeing this problem as well ( > https://issues.apache.org/jira/browse/KAFKA-10513 )? I think it also > happens when new topics created within the subscription pattern. > > Tony > -- -- Guozhang

Re: AdminClient fails to authenticate to Azure Eventhub for MirrorMaker2

This seems to be related to the strimzi side. I will continue investigating with stimzi . Thanks. On Sun, Sep 27, 2020 at 3:00 PM Yu Watanabe < yu.w.tennis@gmail.com > wrote: > Hello . > > I use MirrorMaker2 on strimzi to mirror data from Azure Eventhub to kafka > cluster. > However, MirrorMaker2 fails to authenticate to Azure Eventhub with > following error. > This happens when consuming logs from eventhub not when mirroring to > eventhub.. > > I use strimzi 0.17.0 and kafka 2.4.0 for my environment. > > > --------------------------------------------------------------------------------------------- > 2020-09-27 02:08:16,019 INFO [AdminClient clientId=adminclient-21533] > Failed authentication with > migration-kafka-topics-connect.servicebus.windows.net/40.79.186.34 > (Invalid SASL mechanism response, server may be expecting a different > protocol) (org.apache.kafka.common.network.Selector) > [k...

Re: Kafka Size of ISR Set(3) insufficient for min.isr 2

In order to produce the data , topic should be have min ISR =2 but look like ISR is out of sync . kafka cluster health is not good . Topic: FooBar Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3 |[root@LoremIpsum kafka]# /usr/lib/kafka/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic FooBar Topic: FooBar PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,cleanup.policy=compact,segment.bytes=1073741824,max.message.bytes=5242880, min.compaction.lag.ms =604800000,message.timestamp.type=LogAppendTime,unclean.leader.election.enable=false Topic: FooBar Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3 | On 9/26/20, 9:55 PM, "Franz van Betteraey" < fvbetteraey@web.de > wrote: [External] Hi all, < https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fposts%2F64080819%2Ftimeline&amp;data=02%7C01%7CManoj.Agrawal2%40cognizant.com%7Cab4b6e4b451d...

AdminClient fails to authenticate to Azure Eventhub for MirrorMaker2

Hello . I use MirrorMaker2 on strimzi to mirror data from Azure Eventhub to kafka cluster. However, MirrorMaker2 fails to authenticate to Azure Eventhub with following error. This happens when consuming logs from eventhub not when mirroring to eventhub.. I use strimzi 0.17.0 and kafka 2.4.0 for my environment. --------------------------------------------------------------------------------------------- 2020-09-27 02:08:16,019 INFO [AdminClient clientId=adminclient-21533] Failed authentication with migration-kafka-topics-connect.servicebus.windows.net/40.79.186.34 (Invalid SASL mechanism response, server may be expecting a different protocol) (org.apache.kafka.common.network.Selector) [kafka-admin-client-thread | adminclient-21533] 2020-09-27 02:08:16,019 WARN [AdminClient clientId=adminclient-21533] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager) [kafka-admin-client-thread | adminclient-21533] ...

Re: Kafka Size of ISR Set(3) insufficient for min.isr 2

Hi Franz, The last bit of your command output shows that only one partition (partition 3) is in sync, I've put asterisks around it, ISR being short for In Sync Replicas Hence why you're seeing that exception. Topic: FooBar Partition: 0 Leader: 3 Replicas: 2,3,1 *Isr: 3* I'd suggest looking in the logs for brokers 1 and 2 for any ReplicaFetcher errors as a next step. Cheers, Liam Clarke-Hutchinson On Sun, 27 Sep. 2020, 5:54 pm Franz van Betteraey, < fvbetteraey@web.de > wrote: > Hi all, > < https://stackoverflow.com/posts/64080819/timeline > > > I have a strange Kafka Server error when mirroring data with the > MirrorMaker 1 in Apache Kafka 2.6. > > |org.apache.kafka.common.errors.NotEnoughReplicasException: The size of > the current ISR Set(3) is insufficient to satisfy the min.isr > requirement of 2 for partition FooBar-0 | > > The strange thing is, that the |min.isr| setting is 2 and th...

Kafka Size of ISR Set(3) insufficient for min.isr 2

Hi all, < https://stackoverflow.com/posts/64080819/timeline > I have a strange Kafka Server error when mirroring data with the MirrorMaker 1 in Apache Kafka 2.6. |org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(3) is insufficient to satisfy the min.isr requirement of 2 for partition FooBar-0 | The strange thing is, that the |min.isr| setting is 2 and the ISR Set has 3 nodes. Nevertheless I get the /NotEnoughReplicasException/ Exception. Also taking a deeper look to the topic does not show any curiosities |[root@LoremIpsum kafka]# /usr/lib/kafka/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic FooBar Topic: FooBar PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,cleanup.policy=compact,segment.bytes=1073741824,max.message.bytes=5242880, min.compaction.lag.ms =604800000,message.timestamp.type=LogAppendTime,unclean.leader.election.enable=false Topic: FooBar Partition:...

Re: Kafka StreamThread default config override

All configs set on `StreamsConfig` should be "forwarded" to every client. On startup, each client logs its config. Can you double check if the forwarding is done correctly? What I don't understand is: > I am passing the right SSLconfig and Kstreams topologies are able >> to connect and consume data. But also >> Issue is we see in between is that the StreamThreads *launched internally* >> are missing the SSL configuration, like "ssl.keystore.location" etc. and >> create Consumer/Producer/AdminConfig with default config. Both statements seem to contradict each other. Toplogies are executed by `StreamThreads` and thus, only if the internal Consumer client can connect to the broker, data would be piped through the topology. However, the log line you show says "restore consumer" -- note that there are two consumers! Could it be that you only forward the ssl-configs to the main consumer? How do you e...

Azkarra Streams v0.8 is now available!

Hi everyone, I'm pleased to announced the released of Azkarra Streams 0.8 ( https://www.azkarrastreams.io/ ) Azkarra is an open source micro-framework that takes your Kafka Streams apps to the next level! The project is dedicated to making development of streaming microservices based on Apache Kafka simple and fast. To get a high-level overview of some of the most exciting changes in this release : What's new in Azkarra Streams v0.8 ? < https://medium.com/streamthoughts/whats-new-in-azkarra-streams-0-8-3ed1a4ab803d?source=friends_link&sk=b8aa22a671ba815f8ead394a4acd3a56 > Please feel free to join the Azkarra community on Slack < https://communityinviter.com/apps/azkarra-streams/azkarra-streams-community > for questions or comments about the project. -- Florian HUSSONNOIS Co-founder StreamThoughts.io | Senior Software Engineer Confluent Community Catalyst

Re: Bidirectional XDCR by MirrorMaker2

Oleg, if you want bidirectional replication, you should have: dc1->dc2.enabled = true dc2->dc1.enabled = true ... in _both_ DCs. The replication topology should be consistent across all DCs, generally. Otherwise DCs will disagree on what should get replicated and you'll likely encounter confusing behavior. So the topology is global, but you can use the --clusters argument to localize a driver instance to a specific DC: # in dc1: $ connect-mirror-maker.sh --clusters dc1 ... # in dc2: $ connect-mirror-maker.sh --clusters dc2 ... Ryanne On Fri, Sep 25, 2020, 3:06 AM Oleg Osipov < oleg.alex.osipov@gmail.com > wrote: > Hello! > KIP-382 states the following > "For cross-datacenter replication (XDCR), each datacenter should have a > single Connect cluster which pulls records from the other data centers via > source connectors. Replication may fan-out within each datacenter via sink > connectors." > > It l...

Re: error serializing to sink topic

Hello kafka community, fixed it with toTable(materialized).. On Fri, 25 Sep 2020 at 10:49, Dumitru-Nicolae Marasoui < Nicolae.Marasoiu@kaluza.com > wrote: > Hello kafka community, > I checked in debug that the serializer is KafkaAvroSerializer, like it > should be. > So in KStreamImpl.to(TopicNameExtractor<K, V> topicExtractor, > ProducedInternal<K, V> produced) I checked that the serializer is avro. > > However here is the exception i receive: > > "ClassCastException while producing data to a sink topic. > > A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) > > is not compatible to the actual key or value type (key type: com.ovoenergy.global.ContractKey / value type: com.ovoenergy.global.ContractValue). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if usin...

error serializing to sink topic

Hello kafka community, I checked in debug that the serializer is KafkaAvroSerializer, like it should be. So in KStreamImpl.to(TopicNameExtractor<K, V> topicExtractor, ProducedInternal<K, V> produced) I checked that the serializer is avro. However here is the exception i receive: "ClassCastException while producing data to a sink topic. A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: com.ovoenergy.global.ContractKey / value type: com.ovoenergy.global.ContractValue). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)." Thank you -- Dumitru-Nicolae Marasoui Software Engineer 140...

Bidirectional XDCR by MirrorMaker2

Hello! KIP-382 states the following "For cross-datacenter replication (XDCR), each datacenter should have a single Connect cluster which pulls records from the other data centers via source connectors. Replication may fan-out within each datacenter via sink connectors." It looks like, here we have unidirectional replication. Assume, we have two datacenters DC1 and DC2, so I may run MM2 with the configuration in DC1: dc1->dc2.enabled = true dc2->dc1.enabled = true Is it correct? Will I have bidirectional replication?

Kafka StreamThread default config override

Hello, Guys, I am trying to connect with a Kafka cluster v2.4, which has TLS enabled, I am passing the right SSLconfig and Kstreams topologies are able to connect and consume data. Issue is we see in between is that the StreamThreads *launched internally* are missing the SSL configuration, like "ssl.keystore.location" etc. and create Consumer/Producer/AdminConfig with default config. My question is, is there a way to specify some global configuration or override the default config which these StreamThreads use? As what's happening is due to this default config (which misses SSL key/truststore), the client does not provide a client X.509 cert and the server responds with an unexpected server_hello message right before completing the handshake. *Logs:* *00:15:50.863 [main] INFO o.a.k.s.p.internals.StreamThread - stream-thread [streams-processor-b02e0a4e-1197-43d4-a8c7-bf78c7f65824-StreamThread-1] Creating restore consumer client* * ssl.k...

Re: Two MirrorMakers 2 for two DCs

Ryanne, should I use Connect to run MM2? Now I use connect-mirror-maker.sh driver. I noticed when I restart M2Ms, all topics that I created before start syncing. But It doesn't work for newly created topics (after restart). On 2020/09/21 22:12:13, Ryanne Dolan < ryannedolan@gmail.com > wrote: > Oleg, yes you can run multiple MM2s for multiple DCs, and generally that's > what you want to do. Are you using Connect to run MM2, or the > connect-mirror-maker.sh driver? > > Ryanne > > On Mon, Sep 21, 2020, 3:38 PM Oleg Osipov < oleg.alex.osipov@gmail.com > > wrote: > > > I use the configuration for M2M for both datacentres > > clusters: > > - {"name": "dc1", "bootstrapServers": ip1} > > - {"name": "dc2", "bootstrapServers": ip2} > > > > Do you mean I need use additional names besides 'dc1' and 'dc2...

Re: Kafka streams - how to handle application level exception in event processing

Thanks Bruno.. yeah, I think I could figure it out... For dependencies such as database, for which all the events will be blocked, we are planning to put a retry mechanism, so processing will wait until the database connection is backup. If the problem is with the incoming event, like bad format etc. then we can skip that event and log it or add it to a dead letter queue topic. On Wed, Sep 23, 2020 at 4:04 PM Bruno Cadonna < bruno@confluent.io > wrote: > Hi Pushkar, > > if you do not want to lose any event, you should cache the events > somewhere (e.g. a state store) in case there is an issue with an > external system you connect to (e.g. database issue). If the order of > the event is important, you must ensure that the events in your cache > are processed in the order they where written to the cache (i.e. > first-in first-out). > > Maybe you can find some good hints in the links Gilles posted. > > Best, > Bruno >...

Re: Kafka streams - how to handle application level exception in event processing

Hi Pushkar, if you do not want to lose any event, you should cache the events somewhere (e.g. a state store) in case there is an issue with an external system you connect to (e.g. database issue). If the order of the event is important, you must ensure that the events in your cache are processed in the order they where written to the cache (i.e. first-in first-out). Maybe you can find some good hints in the links Gilles posted. Best, Bruno On 22.09.20 10:51, Pushkar Deole wrote: > Thank you Gilles..will take a look.. > > Bruno, thanks for your elaborate explanation as well... however it > basically exposes my application to certain issues.. > > e.g. the application deals with agent states of a call center, and where > the order of processing is important. So when agent is logged in then he > keeps rotating between Ready, and Not ready states and at the end of the > day he becomes Logged out... If while processing the Ready ev...

TimoutExceptions during commitSync and commited

Hi, We have a Consumer that occasionally catches a TimeoutException when trying to commit an offset after polling. Since it's a ReatriableException the Consumer tries to roll back and read from the last committed offset. However when trying to fetch the last committed offset with committed(), it throws another TimeoutException. [Logs from committed()]: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the last committed offsett for partition <partition-name> could be determined. [Logs from commitSync()]: Offset commit failed on partition <partition-name> at offset <number>: The coordinator is loading and hence can't process requests. Offset commit failed on partition <partition-name> at offset <number>: The coordinator is loading and hence can't process requests. Offset commit failed on partition <partition-name> at offset <number>: The coordinator is loading an...

Consumer TimeoutException

Hi All, I'm frequently getting the below error in one of the application consumers. From the error what I can infer is, the offset commit failed due to timeout after 30 seconds. One suggestion was to increase the timeout but I think it will just extend the time period. What should be the good way to handle this? *Note:* The consumer has auto commit disabled and after every poll commitAsync is performed. failed org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms. Thanks

Re: Trigger topic compaction before uploading to S3

These properties can't be triggered programatically. Kafka uses an internal thread pool called "Log Cleaner Thread" that does the job asynchronously of deleting old segments ("delete") and deleting repeated records ("compact"). Whatever the S3 connector picks up is already compacted and/or deleted. — Ricardo On Tue, 2020-09-22 at 11:50 +0200, Daniel Kraus wrote: > Hi, > I have a KStreams app that outputs a KTableto a topic with cleanup > policy "compact,delete". > I have the Confluent S3 Connector to store thistable in S3 where I do > further analysis with hive. > Now my question is, if there's a way to triggerlog compaction right > before the S3 Connectorreads the data so I store less data in S3 > thenwhen it simply copies all data from the stream? > Thanks, Daniel

Re: Not able to connect to bootstrap server when one broker down

Hi Manoj, Thanks but we caught the issue, it was coming most probably because the wrong jar was being picked up from hdfs and was being set in oozie classpath at runtime. In our code, kafka-client is on 2.3 but while running MR job 0.8.2.0 jar was being picked up. We caught it after seeing the producer client logs in our application, kafka-client version was different there. After removing that old jar, we have not observed this issue since. *Regards,* *Prateek Rajput* < prateek.rajput@flipkart.com > On Wed, Aug 26, 2020 at 9:55 AM < Manoj.Agrawal2@cognizant.com > wrote: > What error you are getting , can you share the exact error ? > What is version of kafka lib at client side ? > > On 8/25/20, 7:50 AM, "Prateek Rajput" <prateek.rajput@flipkart.com.INVALID> > wrote: > > [External] > > > Hi, please if anyone can help, will be a huge favor. > > *Regards,* > *Prateek Rajput*...