Skip to main content

Posts

Showing posts from May, 2020

Setting kafka headers in ProducerRecord

Hi All, I need to use Kafka header in the ProducerRecord, however I see the constructor to provide headers also need to mention the partition number. I don't want the records to be distributed to available partitions based on kafka's default mechanism and don't want to specify the partition by myself. What is the option to provide headers without specifying the partition?

Re: a few URP causes high latencies to producers and consumers

**BUMP** Thanks, Nitin On Thu, May 28, 2020 at 10:17 PM nitin agarwal < nitingarg456@gmail.com > wrote: > Hi, > > We have noticed that where a URPs in the cluster causes increase in > producer and consumer latencies. > The cause of the URP was either one of the broker went down or Kafka > rebalancer was running. > > Is it the expected scenario ? > > > Thanks, > Nitin >

Re: Intial Data Load

Hi Suresh, So to achieve what you're after, you can use Kafka Connect's JDBC source with a log compacted topic. It can pick up on new entities (a new primary id) or updated entities (a mod_time column). If you want just the deltas, you want to look into Debezium. Kind regards, Liam Clarke-Hutchinson On Sun, 31 May 2020, 7:56 pm Suresh Chidambaram, < chida.suresh@gmail.com > wrote: > Hi Liam, > > Thank you for the reply. One Time Load from database to Kafka Topic. Kindly > let me know if more input required. > > Thanks > C Suresh > > On Sunday, May 31, 2020, Liam Clarke-Hutchinson < liam.clarke@adscale.co.nz > > > wrote: > > > Hi Suresh, > > > > I'm afraid you're not giving us much to work with. Initial/delta load > from > > what? > > > > Thanks, > > > > Liam Clarke-Hutchinson > > > > On Sun, 31 May 2020, 4:50 pm Sures...

Re: Intial Data Load

Hi Liam, Thank you for the reply. One Time Load from database to Kafka Topic. Kindly let me know if more input required. Thanks C Suresh On Sunday, May 31, 2020, Liam Clarke-Hutchinson < liam.clarke@adscale.co.nz > wrote: > Hi Suresh, > > I'm afraid you're not giving us much to work with. Initial/delta load from > what? > > Thanks, > > Liam Clarke-Hutchinson > > On Sun, 31 May 2020, 4:50 pm Suresh Chidambaram, < chida.suresh@gmail.com > > wrote: > > > Hi Team, > > > > Could someone help me with my request below? > > > > Thanks > > C Suresh > > > > On Saturday, May 30, 2020, Suresh Chidambaram < chida.suresh@gmail.com > > > wrote: > > > > > Hi Team, > > > > > > My requirement is that I have to perform an One Time Data Load(intial > > > load) to the topic, later I have to perform the delta loa...

Re: Intial Data Load

Hi Suresh, I'm afraid you're not giving us much to work with. Initial/delta load from what? Thanks, Liam Clarke-Hutchinson On Sun, 31 May 2020, 4:50 pm Suresh Chidambaram, < chida.suresh@gmail.com > wrote: > Hi Team, > > Could someone help me with my request below? > > Thanks > C Suresh > > On Saturday, May 30, 2020, Suresh Chidambaram < chida.suresh@gmail.com > > wrote: > > > Hi Team, > > > > My requirement is that I have to perform an One Time Data Load(intial > > load) to the topic, later I have to perform the delta load to the topic. > So > > could someone guide me how to achieve this requirement in Confluent > > Kafka/Apache Kafka? > > > > Thanks > > C Suresh > > >

Re: Intial Data Load

Hi Team, Could someone help me with my request below? Thanks C Suresh On Saturday, May 30, 2020, Suresh Chidambaram < chida.suresh@gmail.com > wrote: > Hi Team, > > My requirement is that I have to perform an One Time Data Load(intial > load) to the topic, later I have to perform the delta load to the topic. So > could someone guide me how to achieve this requirement in Confluent > Kafka/Apache Kafka? > > Thanks > C Suresh >

Re: Clients may fetch incomplete set of topic partitions during cluster startup

The above JIRA says Affect Version 2.2.1. So I am wondering if this effect the latest Kafka version or not. On Sat, 30 May 2020, 03:11 John Roesler, < vvcephei@apache.org > wrote: > Hello, > > Thanks for the question. It looks like the ticket is still open, > so I think it's safe to say it's not fixed. > > If you're affected by the issue, it would be helpful to leave > a comment on the ticket to that effect. > > Thanks, > -John > > On Fri, May 29, 2020, at 00:05, Debraj Manna wrote: > > Anyone any update on my below query? > > > > On Thu, 28 May 2020, 15:45 Debraj Manna, < subharaj.manna@gmail.com > > wrote: > > > > > Hi > > > > > > Is the below issue fixed in latest Kafka 2.5? > > > > > > https://issues.apache.org/jira/browse/KAFKA-8480 > > > > > > I am seeing this issue still open. So just confirming befor...

Intial Data Load

Hi Team, My requirement is that I have to perform an One Time Data Load(intial load) to the topic, later I have to perform the delta load to the topic. So could someone guide me how to achieve this requirement in Confluent Kafka/Apache Kafka? Thanks C Suresh

Re: Clients may fetch incomplete set of topic partitions during cluster startup

Hello, Thanks for the question. It looks like the ticket is still open, so I think it's safe to say it's not fixed. If you're affected by the issue, it would be helpful to leave a comment on the ticket to that effect. Thanks, -John On Fri, May 29, 2020, at 00:05, Debraj Manna wrote: > Anyone any update on my below query? > > On Thu, 28 May 2020, 15:45 Debraj Manna, < subharaj.manna@gmail.com > wrote: > > > Hi > > > > Is the below issue fixed in latest Kafka 2.5? > > > > https://issues.apache.org/jira/browse/KAFKA-8480 > > > > I am seeing this issue still open. So just confirming before upgrading > > Kafka to the latest. > > > > Thanks, > > > > >

Apache Kafka MirrorMaker2 issue

Hi Team We have configured KafkaMirrorMaker2 as shown below. apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaMirrorMaker2 metadata: name: brcm-mirror-maker2 spec: version: 2.4.0 replicas: 1 connectCluster: "mm-backup-cluster" clusters: - alias: "mm-src-cluster" authentication: certificateAndKey: certificate: user.crt key: user.key secretName: mm-consumer-user type: tls bootstrapServers: mm-src-cluster-kafka-bootstrap.kafka-mirror-src.svc:9093 tls: trustedCertificates: - certificate: ca.crt secretName: mm-src-cluster-cluster-ca-cert - alias: "mm-backup-cluster" authentication: certificateAndKey: certificate: user.crt key: user.key secretName: mm-producer-user type: tls bootstrapServers: mm-backup-cluster-kafka-bootstrap.kafka-mirror.svc:9093 While launching it with kubectl create it is throwing an error related to topic authorization... Please let us know if there is any miss in the configuration or what is that wr...

AW: Repeated UnknownProducerIdException

Hi Matthias, Thanks for the hint! My application is not using any repartition topics though. Best regards / Mit freundlichen Grüßen / Üdvƶzlettel / č‡“ä»„čÆšęŒšēš„é—®å€™ Mr. Georg Schmidt-Dumont Bosch Connected Industry – BCI/ESW17 Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | www.bosch.com Phone +49 711 811-49893  | Georg.Schmidt-Dumont@bosch.com Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000; Aufsichtsratsvorsitzender: Franz Fehrenbach; GeschƤftsführung: Dr. Volkmar Denner, Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. Markus Heyn, Dr. Dirk Hoheisel, Christoph Kübel, Uwe Raschke, Peter Tyroller -----Ursprüngliche Nachricht----- Von: Matthias J. Sax < mjsax@apache.org > Gesendet: Donnerstag, 28. Mai 2020 21:37 An: users@kafka.apache.org Betreff: Re: Repeated UnknownProducerIdException The issue with producer metadata often occurs for reparti...

Single Task running both worker node

Hi Team Recently, I had seen strange behavior in kafka-connect. We have source connector with single task only, which reads from S3 bucket and copy to kafka topic.We have two worker node in a cluster, so at any point of time a task can be assigned to single worker node. I saw in logs that both the worker node were processing/ reading the data from S3 bucket, which should be impossible since we have configured that a single task should be created and read the data. Is it possible in any scenario that due to worker process restarting multiple times or registering/ de-registering the connector, a task can be left assigned in both the worker node. Note : I have seen this only one time, after that it was never reproduced. Regards and Thanks Deepak Raghav

Re: Clients may fetch incomplete set of topic partitions during cluster startup

Anyone any update on my below query? On Thu, 28 May 2020, 15:45 Debraj Manna, < subharaj.manna@gmail.com > wrote: > Hi > > Is the below issue fixed in latest Kafka 2.5? > > https://issues.apache.org/jira/browse/KAFKA-8480 > > I am seeing this issue still open. So just confirming before upgrading > Kafka to the latest. > > Thanks, > >

Kafka _Querys

Hi Kafka team, As of now we have successfully implemented kafka for our environment, We stuck up with below questions so please provide assistance and help for below questions. 1) What are the distinct consumer group names currently consuming messages from the same topic 2) Total number of messages consumed from topic by the given Consumer group for the given time interval : from and to date time 3) for the given consumer group and topic, How many new messages arrived into topic from the previously committed offset position (Example: Consumer application is down and admin wants to know how many new messages arrived after that specific consuming app went down) 4) Explicitly move offset to a different position for the given topic and consumer group 5) Replay messages from Failure topic to Replay topic 6) How do we monitor a topic – alert when a new message is arrived or a threshold of 5 or 10 new messages arrived Hoping your response gives me all clarifications o...

Re: Repeated UnknownProducerIdException

-----BEGIN PGP SIGNATURE----- iQIzBAEBCAAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl7QEtgACgkQO4miYXKq /OikLw/+IJfDEZdy8P6tJNosUemkWaPbfmNzpWi7JUMMRUHBm2JV6ILvAl7h7x8S 0T/bT+TFdrJXyVBV7AmXDk7xgXyhi0mMXeXgcSxrs9Y/DPjd9L2rqTFNRjn/n5vN yx+CRTW+daIJqUikmZCQ5MkJowUhSEy/EWq9qKMTfIGQMWIb9aZ8jsH1J4zpkURD Y7/LY5D0pC63R44fPzfEeD6sMvADbs0eAFLMFZRJnUErtJBhvf8iE8NHpnr5q5Q3 aAwGXXY/sx+u503sXEJb77cirdbhzrh4RUo9w7VrsISjQnUcRuAG7oYSCvzyF3zI wJ6ve3Hz3+eetRKwmWtV6V5e3oXubtPmN7No3WiVrKpUN+qBcasZ6NmGnsbGNTBz xSGWwWeTpW/W7OAl2odXFzcVhPGPTkmvBZVYNQILREbeqik4TbpRHQeU1NSIzDCg 3AuZLZFha0KVpXckUf4H56B5XVAhqn4LgTSO5OSJPLbzVXDLzk+OgxgP8qdi5wO7 rJxzx3BaMfJODMsp604voKKOkZCCrhTaW1igI54AT066Fu4YFJ0q6s0V0MWh8BIx s0dmkzd3clc5kWS1H2TIcw9DdwG26raX1pQ2yTC9jHequeZV8EKD5gHJsZEgJu0N QvJscarBvPFlnqaT3uHcyDyZYS4VuFsn8bQIlq232gopp+sHD/s= =8EDE -----END PGP SIGNATURE----- The issue with producer metadata often occurs for repartition topics. Those are purged actively by Kafka Streams. It might help to increase the segme...

Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

Thanks for the update Pushkar! I'd have to say it is indeed very very misleading error message and we should fix it asap. Will follow-up on the ticket. Guozhang On Thu, May 28, 2020 at 9:17 AM John Roesler < vvcephei@apache.org > wrote: > Woah, that's a nasty bug. I've just pinged the Jira ticket. Please feel > free to > do the same. > > Thanks, > -John > > On Thu, May 28, 2020, at 02:55, Pushkar Deole wrote: > > Thanks for the help Guozhang! > > however i realized that the exception and actual problem is totally > > different. The problem was the client was not set with SSL truststore > while > > server is SSLenabled. > > I also found this open bug on kafka > > https://issues.apache.org/jira/browse/KAFKA-4493 > > After setting the SSL properties on stream, I am able to get it up and > > running. > > > > @kafka developers, I think the problem is very mi...

Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

Woah, that's a nasty bug. I've just pinged the Jira ticket. Please feel free to do the same. Thanks, -John On Thu, May 28, 2020, at 02:55, Pushkar Deole wrote: > Thanks for the help Guozhang! > however i realized that the exception and actual problem is totally > different. The problem was the client was not set with SSL truststore while > server is SSLenabled. > I also found this open bug on kafka > https://issues.apache.org/jira/browse/KAFKA-4493 > After setting the SSL properties on stream, I am able to get it up and > running. > > @kafka developers, I think the problem is very misleading and should be > fixed as soon as possible, or a proper exception should be thrown. > > On Thu, May 28, 2020 at 9:46 AM Guozhang Wang < wangguoz@gmail.com > wrote: > > > Hello Pushkar, > > > > I think the memory pressure may not come from the topic data consumption, > > but from rocksDB used...

Re: How to manually start ingesting in kafka source connector ?

You could look at https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/ and experiment with creating the connector elsewhere to see if you can pre-empt the key value that Kafka Connect will use when writing the offsets, and so do your list 2 - 1 - 3 instead -- Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff On Thu, 28 May 2020 at 10:12, Yu Watanabe < yu.w.tennis@gmail.com > wrote: > Robin > > Thank you for the reply. > > Any way to not automatically start after creating connector ? > > I am trying to find a way to change connector offset as described in > below link before starting connector .. > > > https://www.confluent.jp/blog/kafka-connect-deep-dive-jdbc-source-connector/#starting-table-capture > > Steps I want to do will be > > 1. Create jdbc connector > 2. Change connector offset > 3. Start connector > > Thanks, > Yu > ...

Repeated UnknownProducerIdException

Good morning, Since a couple of days ago we suddenly have the issue in our Kafka Steams application that a UnknownProducerException occurs. Digging into this I came across KIP-360< https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820 >. We are using a centrally managed Kafka, hence we are not able to just update it to the latest version which contains the fix for the corresponding issues. The Kafka version we are using is 2.2.1. I also found that a Workaround for Kafka versions pre 2.4 is to increase the retention time and the transactional.id.expiration.ms . However, the retention time for the topic for which the issue occurs already has a retention time of 30 days. So from my point of view this cannot be the reason the meta data for the producer was deleted. At this point I have two questions I hope someone can help me with. First, might be the reason that the producer meta data was deleted other than all the data was removed from the topic beca...

Re: How to manually start ingesting in kafka source connector ?

Robin Thank you for the reply. Any way to not automatically start after creating connector ? I am trying to find a way to change connector offset as described in below link before starting connector .. https://www.confluent.jp/blog/kafka-connect-deep-dive-jdbc-source-connector/#starting-table-capture Steps I want to do will be 1. Create jdbc connector 2. Change connector offset 3. Start connector Thanks, Yu On Thu, May 28, 2020 at 6:01 PM Robin Moffatt < robin@confluent.io > wrote: > > When you create the connector, it will start. > > > -- > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff > > > On Thu, 28 May 2020 at 04:12, Yu Watanabe < yu.w.tennis@gmail.com > wrote: > > > Dear community . > > > > I would like to ask question related to source connector in kafka > > connect (2.4.0) . > > > > Is there a way to manually start source c...

Re: How to manually start ingesting in kafka source connector ?

When you create the connector, it will start. -- Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff On Thu, 28 May 2020 at 04:12, Yu Watanabe < yu.w.tennis@gmail.com > wrote: > Dear community . > > I would like to ask question related to source connector in kafka > connect (2.4.0) . > > Is there a way to manually start source connector after registering to > kafka connect ? > > Looking at the document , I found PAUSE API , > > > https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-pause > > however, could not find set initial state for individual tasks in > connector properties .. > > https://docs.confluent.io/current/connect/managing/configuring.html > > I appreciate if I could get some help. > > Best Regards, > Yu Watanabe > > -- > Yu Watanabe > > linkedin: www.linkedin.com/in/yuwatanabe1/ ...

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Matthias, I realized that the exception and actual problem is totally different. The problem was the client was not set with SSL truststore while server is SSLenabled. I also found this open bug on kafka https://issues.apache.org/jira/browse/KAFKA-4493 After setting the SSL properties on stream, I am able to get it up and running. Due to above problem, it is very difficult to debug the issue and above bug can be fixed as soon as possible, or a proper exception should be thrown. On Wed, May 27, 2020 at 10:59 PM Pushkar Deole < pdeole2015@gmail.com > wrote: > Thanks... i will try increasing the memory in case you don't spot anything > wrong with the code. Other service also have streams and global k table but > they use spring-kafka, but i think that should not matter, and it should > work with normal kafka-streams code unless i am missing some > configuration/setting here > > On Wed, May 27, 2020 at 10:26 PM Matthias J. Sax < m...

Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

Thanks for the help Guozhang! however i realized that the exception and actual problem is totally different. The problem was the client was not set with SSL truststore while server is SSLenabled. I also found this open bug on kafka https://issues.apache.org/jira/browse/KAFKA-4493 After setting the SSL properties on stream, I am able to get it up and running. @kafka developers, I think the problem is very misleading and should be fixed as soon as possible, or a proper exception should be thrown. On Thu, May 28, 2020 at 9:46 AM Guozhang Wang < wangguoz@gmail.com > wrote: > Hello Pushkar, > > I think the memory pressure may not come from the topic data consumption, > but from rocksDB used for materializing the global table. Note rocksDB > allocates large chunk of memory beforehand in mem-table / page cache / > reader cache with default configs. You can get some detailed information > from this KIP: > > https://cwiki.apache.org/con...

Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

Hello Pushkar, I think the memory pressure may not come from the topic data consumption, but from rocksDB used for materializing the global table. Note rocksDB allocates large chunk of memory beforehand in mem-table / page cache / reader cache with default configs. You can get some detailed information from this KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB Guozhang On Wed, May 27, 2020 at 8:44 PM Pushkar Deole < pdeole2015@gmail.com > wrote: > Hello All, > > I am using Stream DSL API just to create a GlobalKTable backed by a topic. > The topology is simple, just create a global table from a topic and that's > it (pasted below code snippet), when I run this service on K8S cluster > (container in a pod), the service gets OutOfMemoryError during > kafkaStreams.start() method call (exception trace pasted below). Note that > the topic is newly...

NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

Hello All, I am using Stream DSL API just to create a GlobalKTable backed by a topic. The topology is simple, just create a global table from a topic and that's it (pasted below code snippet), when I run this service on K8S cluster (container in a pod), the service gets OutOfMemoryError during kafkaStreams.start() method call (exception trace pasted below). Note that the topic is newly created so there is no data in the topic. POD memory was set initially to 500MiB which I doubled to 1000MiB but no luck. kafka-streams and kafka-clients jar at 2.3.1 version. Broker might be a version ahead I think 2.4 but that should not be an issue. Any help would be appreciated since I am blocked at this point. Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers); StreamsBuilder streamsBuilder = new StreamsBuilder(); GlobalKTable<String, Map<String, St...

How to manually start ingesting in kafka source connector ?

Dear community . I would like to ask question related to source connector in kafka connect (2.4.0) . Is there a way to manually start source connector after registering to kafka connect ? Looking at the document , I found PAUSE API , https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-pause however, could not find set initial state for individual tasks in connector properties .. https://docs.confluent.io/current/connect/managing/configuring.html I appreciate if I could get some help. Best Regards, Yu Watanabe -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis

Re: Kafka Timeout Exception

-----BEGIN PGP SIGNATURE----- iQIzBAEBCAAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl7O2oYACgkQO4miYXKq /Ojx5Q//czy1UBZKiyL4FbYmYSC44wt2eQd30mc8+2cyEMLV2Zrefc7AXjSm3UJ9 iWtrUI6pGr9gQpCYDQWjq+bnntvn19abNpV5XZ888LMNhvezH8M+ksBNj5BLxsjb Qhtp4GUVPezet27oocr4srdmeyYu0i9VsSD+vsON8bq+zMJpIKewWCY0EgWF/Yp5 bSG6MLbfG+odIj5sZtHWIjALmYBteH0VtG0lP3nNj1S6TdbB2QZcPxf6VhIDdhm8 9h/0x0nBHDbvtmucHGaw5nsVcTXwyd9hUchX/ly45jLzzk5zecfymI44oB3DfA4o wKSOlkOcXwXQM/IiWg1FlskmExjsnYmSE2uHfO38oyIxNNlj6emg4eaNjEGq7nw1 5ERrv8uzyJ/E7oKFPPNOCZLBzrUJgzRmlejJ1k7yUQj1+mcxZhiLQjqtG6pMPxAJ 7YsVCTp38rwh+xr0CLvQ0BS+EL1coSf4iTP7iwpQmzOeYquCZiDjExZecIJ9dN5D Hej9yY1NXkRhux6uRtGs+fw7Yfo6h2YULIiVrxhpc0PhbbE5A7vmv0pzoTSYqwpa IF3jy8jEKaefEOJZAXOyl/tGA92Qn4yvPuVW6xXWBhY6U320l/BLrMrQkCT/MrnU j0w+jgUM7JpEXZ920e0zkroS4utj57JB7g07BtJm5wWtMKLeU9k= =aplH -----END PGP SIGNATURE----- Correct. The error implies that sending was not successful. You can retry sending by calling `Producer#send()` again. If you also increase the co...