Skip to main content

Posts

Showing posts from May, 2021

Emulating a cosumer with maximum consumption rate and its synchronization with the producer work.

Dear all, I am trying to run a Kafka experiment on Kubernetes where I would like to control the number of messages sent/produced to the broker per unit time/interval, and the number of messages being consumed by the consumer per unit time/interval. Controlling the number of messages sent/produced to the broker per unit time (according to a predetermined workload /trace) is easy. E.g., for a trace of 20 messages per 15 secs, and then 100 messages per next 15 secs, and finally, 45 msgs per the next 15 seconds…, the methodology is simple: send the 20 messages(in single batch), sleeping (the producer main thread) for 15 seconds, followed by sending the 100 messages, and sleeping the producer thread for 15 seconds and so on… I am sending the message in a single batch. However, controlling the number of consumed messages per unit time (say 15 seconds) looks more complex. In fact, to emulate a consumer with maximum rate of 100 messages every 15 seconds interval, I am setting max....

Zookeeper failure handling in Kafka

Hi All, If I have a 5 node zookeeper ensemble backing my 5 node Kafka cluster and if more than 2 of the zookeeper nodes shutdown, Kafka producer clients are still able to write to Kafka without any issues. Is this normal behaviour ? I always thought that zookeeper quorum should be available for the Kafka cluster to function. ( I also believed that zookeeper service will be unavailable if more than the majority goes down, but maybe I will ask that in a zookeeper forum). When will Kafka producer clients start experiencing issues when producing messages to Kafka, in case of an entire Zookeeper cluster not being available ? (Assume that the zookeeper cluster was up and running when the kafka cluster and the kafka clients started, but went down sometime later). Regards, Neeraj

Re: Modify kafka-connect api context path

No, is not what I looking for. When kafka connect starts, he will be available at http://localhost:8083/connectors I need to put a additional path to the host, something like: http://localhost:8083/my-service/connectors Em qua., 26 de mai. de 2021 às 17:57, Ran Lupovich < ranlupovich@gmail.com > escreveu: > --server.servlet.context-path="/kafdrop" > Something like this ? > https://github.com/obsidiandynamics/kafdrop/issues/9 > > > בתאריך יום ד׳, 26 במאי 2021, 23:44, מאת Fernando Moraes ‏< > fernandosdemoraes@gmail.com >: > > > Hello, I would like to know if it is possible to modify via config > > properties the kafka-connect context path. I have a scenario where the > > proxy redirects a request to a connect worker using a context path. > > > > I've already looked at the source code here, and it doesn't really seem > to > > have a point for configuration: ...

Re: Reading offset from one consumer group to use for another consumer group.

One more thought that you could think about, have two consumer groups 1 that starts every hour for you "db consumer" and 2 for near real time , the 2ed should run all the time and populate your "memory db" like Redis and the TTL could be arranged from Redis mechainsem בתאריך יום ו׳, 28 במאי 2021, 21:44, מאת Ran Lupovich ‏< ranlupovich@gmail.com >: > So I think, You should write to your db the partition and the offset, > while initing the real time consumer you'd read from database where to set > the consumer starting point, kind-of the "exactly once" programming > approach, > > בתאריך יום ו׳, 28 במאי 2021, 21:38, מאת Ronald Fenner ‏< > rfenner@gamecircus.com >: > >> That might work if my consumers were in the same process but the db >> consumer is a python job running under Airflow and the realtime consumer >> wold be running as a backend service on another server. >> ...

Re: Reading offset from one consumer group to use for another consumer group.

So I think, You should write to your db the partition and the offset, while initing the real time consumer you'd read from database where to set the consumer starting point, kind-of the "exactly once" programming approach, בתאריך יום ו׳, 28 במאי 2021, 21:38, מאת Ronald Fenner ‏< rfenner@gamecircus.com >: > That might work if my consumers were in the same process but the db > consumer is a python job running under Airflow and the realtime consumer > wold be running as a backend service on another server. > > Also how would I seed the realtime consumer at startup if the consumer > isn't running which would could be possible if it hit the end stream, > > The db consumer is designed to read until no new message is delivered then > exit till it's next spawned. > > Ronald Fenner > Network Architect > Game Circus LLC. > > rfenner@gamecircus.com > > > On May 28, 2021, at 12:05 AM, Ran Lu...

Re: Reading offset from one consumer group to use for another consumer group.

That might work if my consumers were in the same process but the db consumer is a python job running under Airflow and the realtime consumer wold be running as a backend service on another server. Also how would I seed the realtime consumer at startup if the consumer isn't running which would could be possible if it hit the end stream, The db consumer is designed to read until no new message is delivered then exit till it's next spawned. Ronald Fenner Network Architect Game Circus LLC. rfenner@gamecircus.com > On May 28, 2021, at 12:05 AM, Ran Lupovich < ranlupovich@gmail.com > wrote: > > https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long) > > בתאריך יום ו׳, 28 במאי 2021, 08:04, מאת Ran Lupovich ‏< ranlupovich@gmail.com >> : > >> While your DB consumer is running you get the access to the partition >> ${partition...

Re: Issue using Https with elasticsearch source connector

trustStore [image: Copy to clipboard] JAVA_OPTS=$JAVA_OPTS -Djavax.net.ssl.trustStore=/path/to/truststore.jks -Djavax.net.ssl.trustStoreType=jks -Djavax.net.ssl.trustStorePassword=changeit keyStore [image: Copy to clipboard] JAVA_OPTS=$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks -Djavax.net.ssl.keyStoreType=jks -Djavax.net.ssl.keyStorePassword=changeit Check for setting this in $KAFKA_OPTS < https://doc.nuxeo.com/nxdoc/trust-store-and-key-store-configuration/#adding-your-certificates-to-the-default-trust-store > בתאריך יום ו׳, 28 במאי 2021, 15:24, מאת sunil chaudhari ‏< sunilmchaudhari05@gmail.com >: > Yeah. > I am trying to add truststore in java keystore > Lets see > > On Fri, 28 May 2021 at 5:40 PM, Ran Lupovich < ranlupovich@gmail.com > > wrote: > > > Anyways you need to remmber it is a java application and you can pass > many > > variables that not formally supported by the app...

Re: Issue using Https with elasticsearch source connector

Yeah. I am trying to add truststore in java keystore Lets see On Fri, 28 May 2021 at 5:40 PM, Ran Lupovich < ranlupovich@gmail.com > wrote: > Anyways you need to remmber it is a java application and you can pass many > variables that not formally supported by the application as jvm input > setting or in the connector OPTS, does not have experience with this > specfic source connector did something similar as work arounf for the > mongodb sink connector before they fixed the support for ssl, so I do > beleive its possible , its matter of guess try and see , 😉 but i do > believe its possible > > בתאריך יום ו׳, 28 במאי 2021, 15:05, מאת sunil chaudhari ‏< > sunilmchaudhari05@gmail.com >: > > > Hello Ran, > > Whatever link you have provided is the supported SINK connector. > > It has all settings for SSL. > > > > The connector I am talking about is the Souce connector and its not > > su...

Re: Issue using Https with elasticsearch source connector

Anyways you need to remmber it is a java application and you can pass many variables that not formally supported by the application as jvm input setting or in the connector OPTS, does not have experience with this specfic source connector did something similar as work arounf for the mongodb sink connector before they fixed the support for ssl, so I do beleive its possible , its matter of guess try and see , 😉 but i do believe its possible בתאריך יום ו׳, 28 במאי 2021, 15:05, מאת sunil chaudhari ‏< sunilmchaudhari05@gmail.com >: > Hello Ran, > Whatever link you have provided is the supported SINK connector. > It has all settings for SSL. > > The connector I am talking about is the Souce connector and its not > supported by Confluent. > If you see the documentation you will find that there is no setting for SSL > certs. > > https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source > > > Thats where I am stuck...

Re: Issue using Https with elasticsearch source connector

Hello Ran, Whatever link you have provided is the supported SINK connector. It has all settings for SSL. The connector I am talking about is the Souce connector and its not supported by Confluent. If you see the documentation you will find that there is no setting for SSL certs. https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source Thats where I am stuck. Regards, Sunil. On Fri, 28 May 2021 at 9:34 AM, Ran Lupovich < ranlupovich@gmail.com > wrote: > > name=elasticsearch-sinkconnector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=1topics=test-elasticsearch-sinkkey.ignore=trueconnection.url= > https://localhost:9200type.name=kafka-connect > > elastic.security.protocol=SSLelastic.https.ssl.keystore.location=/home/directory/elasticsearch-6.6.0/config/certs/keystore.jkselastic.https.ssl.keystore.password=asdfasdfelastic.https.ssl.key.password=asdfasdfelastic.https.ssl.keystore.type=JKSelastic.h...

Re: Reading offset from one consumer group to use for another consumer group.

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long) בתאריך יום ו׳, 28 במאי 2021, 08:04, מאת Ran Lupovich ‏< ranlupovich@gmail.com >: > While your DB consumer is running you get the access to the partition > ${partition} @ offset ${offset} > > https://github.com/confluentinc/examples/blob/6.1.1-post/clients/cloud/nodejs/consumer.jswhen > setting your second consumers for real time just set them tostart from that > point > > > בתאריך יום ו׳, 28 במאי 2021, 01:51, מאת Ronald Fenner ‏< > rfenner@gamecircus.com >: > >> I'm trying to figure out how to pragmatically read a consumer groups >> offset for a topic. >> What I'm trying to do is read the offsets of our DB consumers that run >> once an hour and batch lad all new messages. I then would have another >> consumer that monitors the offsets tha...

Re: Reading offset from one consumer group to use for another consumer group.

While your DB consumer is running you get the access to the partition ${partition} @ offset ${offset} https://github.com/confluentinc/examples/blob/6.1.1-post/clients/cloud/nodejs/consumer.jswhen setting your second consumers for real time just set them tostart from that point בתאריך יום ו׳, 28 במאי 2021, 01:51, מאת Ronald Fenner ‏< rfenner@gamecircus.com >: > I'm trying to figure out how to pragmatically read a consumer groups > offset for a topic. > What I'm trying to do is read the offsets of our DB consumers that run > once an hour and batch lad all new messages. I then would have another > consumer that monitors the offsets that have been consumed and consume the > message not yet loaded storing them in memory to be able to send them to a > viewer. As messages get consumed they then get pruned from the in memory > cache. > > Basically I'm wanting to create window on the messages that haven't been > loa...

Re: Issue using Https with elasticsearch source connector

name=elasticsearch-sinkconnector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=1topics=test-elasticsearch-sinkkey.ignore=trueconnection.url=https://localhost:9200type.name=kafka-connect elastic.security.protocol=SSLelastic.https.ssl.keystore.location=/home/directory/elasticsearch-6.6.0/config/certs/keystore.jkselastic.https.ssl.keystore.password=asdfasdfelastic.https.ssl.key.password=asdfasdfelastic.https.ssl.keystore.type=JKSelastic.https.ssl.truststore.location=/home/directory/elasticsearch-6.6.0/config/certs/truststore.jkselastic.https.ssl.truststore.password=asdfasdfelastic.https.ssl.truststore.type=JKSelastic.https.ssl.protocol=TLS בתאריך יום ו׳, 28 במאי 2021, 07:03, מאת Ran Lupovich ‏< ranlupovich@gmail.com >: > https://docs.confluent.io/kafka-connect-elasticsearch/current/security.html > > בתאריך יום ו׳, 28 במאי 2021, 07:00, מאת sunil chaudhari ‏< > sunilmchaudhari05@gmail.com >: > >> The configurat...

Re: Issue using Https with elasticsearch source connector

https://docs.confluent.io/kafka-connect-elasticsearch/current/security.html בתאריך יום ו׳, 28 במאי 2021, 07:00, מאת sunil chaudhari ‏< sunilmchaudhari05@gmail.com >: > The configurations doesnt have provision for the truststore. Thats my > concern. > > > On Thu, 27 May 2021 at 10:47 PM, Ran Lupovich < ranlupovich@gmail.com > > wrote: > > > For https connections you need to set truststore configuration > parameters , > > giving it jks with password , the jks needs the contain the certficate of > > CA that is signing your certifcates > > > > בתאריך יום ה׳, 27 במאי 2021, 19:55, מאת sunil chaudhari ‏< > > sunilmchaudhari05@gmail.com >: > > > > > Hi Ran, > > > That problem is solved already. > > > If you read complete thread and see that last problem is about https > > > connection. > > > > > > > > > On Thu, 27 May 20...

Re: Issue using Https with elasticsearch source connector

The configurations doesnt have provision for the truststore. Thats my concern. On Thu, 27 May 2021 at 10:47 PM, Ran Lupovich < ranlupovich@gmail.com > wrote: > For https connections you need to set truststore configuration parameters , > giving it jks with password , the jks needs the contain the certficate of > CA that is signing your certifcates > > בתאריך יום ה׳, 27 במאי 2021, 19:55, מאת sunil chaudhari ‏< > sunilmchaudhari05@gmail.com >: > > > Hi Ran, > > That problem is solved already. > > If you read complete thread and see that last problem is about https > > connection. > > > > > > On Thu, 27 May 2021 at 8:01 PM, Ran Lupovich < ranlupovich@gmail.com > > > wrote: > > > > > Try setting es.port = "9200" without quotes? > > > > > > בתאריך יום ה׳, 27 במאי 2021, 04:21, מאת sunil chaudhari ‏< > > > sunilmchaudhari05@...

Re: Kafka contributor list request

Done, added you to Confluence and Jira so you should be able to self-assign tickets and create KIPs if necessary. Welcome to Kafka :) On Thu, May 27, 2021 at 4:28 PM Norbert Wojciechowski < wojciechowski.norbert.github@gmail.com > wrote: > Hello, > > Can I please be assigned to Kafka contributor list on Confluence/Jira, so I > can start contributing to Kafka and be able to work on issues? > > My Jira username is: erzbnif > > Thanks, > Norbert >

Reading offset from one consumer group to use for another consumer group.

I'm trying to figure out how to pragmatically read a consumer groups offset for a topic. What I'm trying to do is read the offsets of our DB consumers that run once an hour and batch lad all new messages. I then would have another consumer that monitors the offsets that have been consumed and consume the message not yet loaded storing them in memory to be able to send them to a viewer. As messages get consumed they then get pruned from the in memory cache. Basically I'm wanting to create window on the messages that haven't been loaded into the db. I've seen ways of getting it from the command line but I'd like to from with in code. Currently I'm using node-rdkafka. I guess as a last resort I could shell the command line for the offsets then parse it and get it that way. Ronald Fenner Network Architect Game Circus LLC. rfenner@gamecircus.com

Re: Kafka getting down every week due to log file deletion.

The main purpose of the /*tmp* directory is to temporarily store *files* when installing an OS or software. If any *files* in the /*tmp* directory have not been accessed for a while, they will be automatically *deleted* from the system בתאריך יום ה׳, 27 במאי 2021, 19:04, מאת Ran Lupovich ‏< ranlupovich@gmail.com >: > Seems you log dir is sending your data to tmp folder, if I am bot mistken > this dir automatically removing files from itself, causing the log deletuon > procedure of the kafka internal to fail and shutdown broker on file not > found > > בתאריך יום ה׳, 27 במאי 2021, 17:52, מאת Neeraj Gulia ‏< > neeraj.gulia@opsworld.in >: > >> Hi team, >> >> Our Kafka is getting down almost once or twice a month due to log file >> deletion failure. >> >> >> There is single node kafka broker is running in our system and gets down >> every time it tires to delete the log files as cle...

Re: Issue using Https with elasticsearch source connector

For https connections you need to set truststore configuration parameters , giving it jks with password , the jks needs the contain the certficate of CA that is signing your certifcates בתאריך יום ה׳, 27 במאי 2021, 19:55, מאת sunil chaudhari ‏< sunilmchaudhari05@gmail.com >: > Hi Ran, > That problem is solved already. > If you read complete thread and see that last problem is about https > connection. > > > On Thu, 27 May 2021 at 8:01 PM, Ran Lupovich < ranlupovich@gmail.com > > wrote: > > > Try setting es.port = "9200" without quotes? > > > > בתאריך יום ה׳, 27 במאי 2021, 04:21, מאת sunil chaudhari ‏< > > sunilmchaudhari05@gmail.com >: > > > > > Hello team, > > > Can anyone help me with this issue? > > > > > > > > > > > > https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/issues/44 > > > >...

Re: Issue using Https with elasticsearch source connector

Hi Ran, That problem is solved already. If you read complete thread and see that last problem is about https connection. On Thu, 27 May 2021 at 8:01 PM, Ran Lupovich < ranlupovich@gmail.com > wrote: > Try setting es.port = "9200" without quotes? > > בתאריך יום ה׳, 27 במאי 2021, 04:21, מאת sunil chaudhari ‏< > sunilmchaudhari05@gmail.com >: > > > Hello team, > > Can anyone help me with this issue? > > > > > > > https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/issues/44 > > > > > > Regards, > > Sunil. > > >

Re: Kafka getting down every week due to log file deletion.

Seems you log dir is sending your data to tmp folder, if I am bot mistken this dir automatically removing files from itself, causing the log deletuon procedure of the kafka internal to fail and shutdown broker on file not found בתאריך יום ה׳, 27 במאי 2021, 17:52, מאת Neeraj Gulia ‏< neeraj.gulia@opsworld.in >: > Hi team, > > Our Kafka is getting down almost once or twice a month due to log file > deletion failure. > > > There is single node kafka broker is running in our system and gets down > every time it tires to delete the log files as cleanup and fails. > > Sharing the Error Logs, we need a robust solution for this so that our > kafka broker doesn't gets down like this every time. > > Regards, > Neeraj Gulia > > Caused by: java.io.FileNotFoundException: > /tmp/kafka-logs/dokutopic-0/00000000000000000000.index (No such file or > directory) > at java.base/java.io.RandomAccessFile.open0(Nat...

does consumer thread wait for producer to return (synchronous) in normal consume-process-produce topology? And how it is handled in streams?

Hi, I am trying to understand few things: in a normal consumer-process-produce topology, consumer is polling records, then process each and then gives to producer to produce on destination topic. In this case, is the 'produce' a synchronous call i.e does it happen in the same consumer thread or produce takes place in a background producer thread asynchronously? If asynchronous, then how can consumer commit offset before produce happened successfully? If synchronous, then consumer thread gets held till produce happens, possibly increasing consumer lag?

Kafka getting down every week due to log file deletion.

Hi team, Our Kafka is getting down almost once or twice a month due to log file deletion failure. There is single node kafka broker is running in our system and gets down every time it tires to delete the log files as cleanup and fails. Sharing the Error Logs, we need a robust solution for this so that our kafka broker doesn't gets down like this every time. Regards, Neeraj Gulia Caused by: java.io.FileNotFoundException: /tmp/kafka-logs/dokutopic-0/00000000000000000000.index (No such file or directory) at java.base/java.io.RandomAccessFile.open0(Native Method) at java.base/java.io.RandomAccessFile.open(RandomAccessFile.java:345) at java.base/java.io.RandomAccessFile.<init>(RandomAccessFile.java:259) at java.base/java.io.RandomAccessFile.<init>(RandomAccessFile.java:214) at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:183) at kafka.log.AbstractIndex.resize(AbstractIndex.scala:176) at kafka.log.AbstractIndex.$anonfun$trimToV...

Re: Modify kafka-connect api context path

--server.servlet.context-path="/kafdrop" Something like this ? https://github.com/obsidiandynamics/kafdrop/issues/9 בתאריך יום ד׳, 26 במאי 2021, 23:44, מאת Fernando Moraes ‏< fernandosdemoraes@gmail.com >: > Hello, I would like to know if it is possible to modify via config > properties the kafka-connect context path. I have a scenario where the > proxy redirects a request to a connect worker using a context path. > > I've already looked at the source code here, and it doesn't really seem to > have a point for configuration: > ( > > https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L264 > ) >

Modify kafka-connect api context path

Hello, I would like to know if it is possible to modify via config properties the kafka-connect context path. I have a scenario where the proxy redirects a request to a connect worker using a context path. I've already looked at the source code here, and it doesn't really seem to have a point for configuration: ( https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L264 )

Kafka Stream custom join using state store

Hi, I didn't reach any people on StackOverflow, so I try here : https://stackoverflow.com/questions/67694907/kafka-stream-custom-join-using-state-store I'm really stuck on that part and I have the feeling only the Processor API can help me. But since the stream is really complex, I would prefer a confirmation before start wasting my time on this. Can anyone help me about this issue please ? Thanks

Re: Weird behavior of topic retention - some are cleaned up too often, some are not at all

Thank you, Matthias, for the ideas to verify next! Here is what I see: Topic 1 - that is not being cleaned up for 3 days already, but has retention set to 4 hrs: (I've truncated the payload but left the important details): 11:32:11 confluent@qv-ckafkama7 [~] $ /usr/bin/kafka-dump-log --print-data-log --files /apps/kafka-data/tracking.ap.client.traffic-9/00000000000000744551.log | tail -5 offset: 1294064 position: 319988493 CreateTime: 1622043140910 isvalid: true keysize: -1 valuesize: 633 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: {..."LOGSTASH_ID":"1f7bd3fe-123b-40c2-970e-c85356469eda","ACTIVITY_DATE":1622043139,"@timestamp":"2021-05-26T15:32:19.000Z"} offset: 1294065 position: 319989196 CreateTime: 1622043140911 isvalid: true keysize: -1 valuesize: 466 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactiona...