Skip to main content

Posts

Showing posts from November, 2022

Re: Copying data from one topic into another

I like it i was just kidding I'll subscribe there too for sure!! I was just adoringly teasing you all 😛 On Wed, Nov 30, 2022 at 12:04 PM nagpavan chilakam < nagpavan.chilakam@gmail.com > wrote: > Maybe you can even explore `kafka-console-consumer` to get the messages > from a certain offset > > On Wed, Nov 30, 2022 at 8:11 PM Tecno Brain < cerebrotecnologico@gmail.com > > wrote: > > > By the way, we use Avro for the messages. > > > > > > > > On Wed, Nov 30, 2022 at 9:35 AM Tecno Brain < > cerebrotecnologico@gmail.com > > > wrote: > > > > > Hi, > > > I'm using Kafka 2.8 > > > I have a topic with an original retention of 1 day with a large > amount > > > of messages. > > > > > > We had a problem with some messages being corrupted and we decided to > > > skip about 3.5 hours of messages; but not all o...

Re: Copying data from one topic into another

Maybe you can even explore `kafka-console-consumer` to get the messages from a certain offset On Wed, Nov 30, 2022 at 8:11 PM Tecno Brain < cerebrotecnologico@gmail.com > wrote: > By the way, we use Avro for the messages. > > > > On Wed, Nov 30, 2022 at 9:35 AM Tecno Brain < cerebrotecnologico@gmail.com > > wrote: > > > Hi, > > I'm using Kafka 2.8 > > I have a topic with an original retention of 1 day with a large amount > > of messages. > > > > We had a problem with some messages being corrupted and we decided to > > skip about 3.5 hours of messages; but not all of them are corrupted. So, > we > > would like to process this period of time again with a modified version > of > > our consumer application. > > > > In order to avoid the messages being deleted, we extended the retention > > to 3 days. > > But we are now again close to ...

[ANNOUNCE] Call for papers: Kafka Summit London 2023

Hi everyone, The call for papers ( https://sessionize.com/kafka-summit-london-2023/ ) is now open for Kafka Summit London 2023, and you are all welcome to submit a talk. We are looking for the most interesting, most informative, most advanced, and most generally applicable talks on Apache Kafka® and the tools, technologies, and techniques in the Kafka ecosystem. People from all industries, backgrounds, and experience levels are encouraged to submit! If you have any questions about submitting, reach out to Danica Fine, the program chair, at dfine@confluent.io . The call for papers closes on Monday, January 9 2022 at 23:59 GMT. Thanks, Chris

Re: Copying data from one topic into another

This is kind of boring I'm sorry I subscribed but I'll keep them because I might learn from you On Wed, Nov 30, 2022 at 9:41 AM Tecno Brain < cerebrotecnologico@gmail.com > wrote: > By the way, we use Avro for the messages. > > > > On Wed, Nov 30, 2022 at 9:35 AM Tecno Brain < cerebrotecnologico@gmail.com > > wrote: > > > Hi, > > I'm using Kafka 2.8 > > I have a topic with an original retention of 1 day with a large amount > > of messages. > > > > We had a problem with some messages being corrupted and we decided to > > skip about 3.5 hours of messages; but not all of them are corrupted. So, > we > > would like to process this period of time again with a modified version > of > > our consumer application. > > > > In order to avoid the messages being deleted, we extended the retention > > to 3 days. > > But we are now again...

Re: Copying data from one topic into another

By the way, we use Avro for the messages. On Wed, Nov 30, 2022 at 9:35 AM Tecno Brain < cerebrotecnologico@gmail.com > wrote: > Hi, > I'm using Kafka 2.8 > I have a topic with an original retention of 1 day with a large amount > of messages. > > We had a problem with some messages being corrupted and we decided to > skip about 3.5 hours of messages; but not all of them are corrupted. So, we > would like to process this period of time again with a modified version of > our consumer application. > > In order to avoid the messages being deleted, we extended the retention > to 3 days. > But we are now again close to have the messages being deleted and we > cannot extend the retention anymore (we would go over the limit of the disk > space). > > So, I would like to extract the messages between the offsets that cover > the 3.5h > > Is there a quick way to extract these messages and...

Copying data from one topic into another

Hi, I'm using Kafka 2.8 I have a topic with an original retention of 1 day with a large amount of messages. We had a problem with some messages being corrupted and we decided to skip about 3.5 hours of messages; but not all of them are corrupted. So, we would like to process this period of time again with a modified version of our consumer application. In order to avoid the messages being deleted, we extended the retention to 3 days. But we are now again close to have the messages being deleted and we cannot extend the retention anymore (we would go over the limit of the disk space). So, I would like to extract the messages between the offsets that cover the 3.5h Is there a quick way to extract these messages and copy them into another topic ? (After that, I want to set the retention back to the original 1 day) Would kcat help?

Incorrect declared batch size, premature EOF reached

Hello, I receive that exception while polling data with Kafka consumer. I can't find how to recover from it, it prevents the consumer from consuming new data. Do you know what can be done to fix that? I pasted the stack trace below, removing non significant data or replacing it with generic patterns (like TOPIC-PARTITION). Any help would be appreciated. Sébastien Rebecchi Received exception when fetching the next record from TOPIC-PARTITION. If needed, please seek past the record to continue consumption. org.apache.kafka.common.KafkaException: Received exception when fetching the next record from TOPIC-PARTITION. If needed, please seek past the record to continue consumption. at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1473) at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetch...

Network threads monitoring

Hi, How can I know if I have enough network threads configured with num.network.threads? I see Producers are slower to produce when we move partitions between brokers, so I'm thinking maybe there isn't enough network threads to process the requests from both the consumers and from the other brokers to create the new replicas. Cheers, Peter

Re: same keys appearing in state stores on different pods when using branches in kafka streams

Hi John, I am not sure I understood it correctly, even with branching that uses a different state store, the key of incoming event is still the same, so we expect it to land in the local state store on the same pod. e.g. an event with OPEN status, with key xyz came in and processed through one branch and it is stored in state store 'totals', state maintained on local state store on same pod 2nd event with OPEN status, with key xyz came in and again processed and stored in 'totals'. State maintained on local state store on same pod 3rd event with CLOSED status, with key xyz came in and processed. The state is stored in 'record' state store, it is expected to be stored in state store on same pod. Why it would go to some other pod? On Wed, Nov 23, 2022 at 8:50 PM John Roesler < vvcephei@apache.org > wrote: > Hi Pushkar, > > Thanks for the question. I think that what's happening is that, even > though both branches use ...

Re: Hi! An observation about your site and software!

I like the way you written it without fullstop…. Lol On Mon, 28 Nov 2022 at 8:48 PM, Schehrzade < schehrzade.syed.1@gmail.com > wrote: > I like the author Kafka and I was so impressed someone had written code or > whatever I don't know because I'm not from this country and I don't know > stuff about science and all but it was really cool and even the logo and > font and then I started to watch your video and the idea was sooooooooo > creative and interesting and sounded so useful and I thought aw how cute > some like little start up thing and then I went to see who uses it and I > saw Salesforce and I'm in customer service and I love Salesforce soooo much > and I'm sooo happy with it it makes things so much more convenient and then > for me to realize I was like one degree of separation away from and not > even that because I'm literally using Kafka atm at work in a way that's > insane but I felt so hon...

Hi! An observation about your site and software!

I like the author Kafka and I was so impressed someone had written code or whatever I don't know because I'm not from this country and I don't know stuff about science and all but it was really cool and even the logo and font and then I started to watch your video and the idea was sooooooooo creative and interesting and sounded so useful and I thought aw how cute some like little start up thing and then I went to see who uses it and I saw Salesforce and I'm in customer service and I love Salesforce soooo much and I'm sooo happy with it it makes things so much more convenient and then for me to realize I was like one degree of separation away from and not even that because I'm literally using Kafka atm at work in a way that's insane but I felt so honoured and happy and then I saw 80% of the biggest whatever's we're using it and man I was so emotional that someone was keeping kafkas name alive and had kept him remembered and he was just thi...

Re: Stream sinks are not constructed when application starts up before Kafka broker

Hi Alexander, I'm sorry to hear that. It certainly sounds like a hard one to debug. To clarify, do you mean that when you observe this problem, the sink node is not in the topology at all, or that it is in the topology, but does not function properly? Also, are you using Spring to construct the topology, or are you calling the Kafka Streams library directly to build the topology? If the problem is that the sink node is missing completely, it's hard to imagine how the problem could be Streams. When you are building the topology in Streams, there is no connection to Kafka at all. Then again, I've seen enough heisenbugs to know not to trust intuition too much. If you can try just using the Streams builder directly to create the topology, maybe you can see if you can still reproduce the issue? I hope this helps! -John On Tue, Nov 22, 2022, at 14:07, Alexander Kau wrote: > My team is building a set of services that use Kafka Connect and Debezium > ...

Re: same keys appearing in state stores on different pods when using branches in kafka streams

Hi Pushkar, Thanks for the question. I think that what's happening is that, even though both branches use the same grouping logic, Streams can't detect that they are the same. It just sees two group-bys and therefore introduces two repartitions, with a separate downstream task for each. You might want to print out the topology description and visualize it with https://zz85.github.io/kafka-streams-viz/ . That will show you whether the stores wind up in the same task or not. The visualization will also show you the names of the input topics for those two partitions, which you can use in conjunction with the metadata methods on your KafkaStreams instance to query for the location of the keys in both stores. I suspect that with some tweaks you can re-write the topology to just have one downstream task, if that's what you prefer. By the way, I think you could propose to add an optimization to make the groupBy behave the way you expected. If that's interesting...

Active <> Active MirrorMaker2 setup via dedicated Kafka Connect cluster

Hi, I am trying to set up active <> active mm2 via Kafka connect distributed cluster. It seems not possible because of the limitations like *bootstrap.servers *property. And also as per this KIP https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-RunningMirrorMakerinaConnectcluster , it is not possible to have sink connector. It means only one way is possible. I have already tried other ways. Wanted to try this out using Kafka connect distributed cluster setup. Please kindly help me if I am doing anything wrong here and also throw some light on how to set up active <> active mm2 via Kafka connect cluster if it is possible. I really appreciate any help you can provide. -- *Sriram G* *Tech*

same keys appearing in state stores on different pods when using branches in kafka streams

Hi All, I have a stream application that creates 2 branches. Each branch includes a state store where the status field of the kafka message determines the branch, and therefore the state store used: Status OPEN = State store name totals Status CLOSED = State store name records I'm seeing that the streams application is running on a pod; however I'm getting the exception: org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, records, may have migrated to another instance. If I physically access the pod and check the Rocksdb folders I do not see the state store folder. If I check the keys in the totals state store on this pod, I can find the key in the records state store on another pod. I had assumed that because the key of the events are the same, the same partition would be used for the two branches and therefore the same keys in these two state store would be created on the same Kubernetes pod. This is not an issue...

Kafka 4.x release date

Good morning,   we are using for our product (Fare Collection System Ticketing) Apache Kafka software.   https://www.scheidt-bachmann.de/en/fare-collection-systems/home   One of task is update logging from version1 to version2.   https://issues.apache.org/jira/browse/KAFKA-9366   Based on this link information’s – it will be implanted in Apache Kafka Release 4.x   Would you be so kind and provide us with proposal release date for 4.x release?   Thanks and Best regards,   Juraj Moravcik – Software Developer     Scheidt & Bachmann Slovensko s.r.o. Fare collection systems – Integration Back Office Dúbravská cesta 4 – 841 04 Bratislava – Slovak Republic Phone: +421 41 5060-943 moravcik.juraj@scheidt-bachmann.sk www.scheidt-bachmann.sk   Follow us:            Court of Record: Obchodný reg...

Stream sinks are not constructed when application starts up before Kafka broker

My team is building a set of services that use Kafka Connect and Debezium to forward data changes from our Postgres database to Kafka, and then use Kafka Streams (via Spring Cloud Stream) to process this data and output an aggregate of the source data. We have been trying to track down an issue where the stream processors are not correctly configured when the application starts up before Kafka is up. Specifically, all of the processor nodes are correctly created except for the KTABLE-SINK-000000000# node. The result of this is that the services consume messages but do not forward their changes to their output topics. Therefore, data processing stops while the consumer offset continues to be incremented, so we lose messages and have to roll back the offsets and reprocess a large amount of data. This happens in both our Kubernetes environment and our Chef-managed environment. In the Chef environment, simply restarting the server is enough to trigger this issue, since Ka...

The Metrics ReplicationBytesInPerSec does not have label of topic

Hi, I found that the metcis kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec only exposes on allTopicsStates, so that it cannot expose the metrics in Topic level. brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) def updateReplicationBytesIn(value: Long): Unit = { allTopicsStats.replicationBytesInRate.foreach { metric => metric.mark(value) } } May I ask if there is any concern to expose this kind of metrics for every topic? Because it seems there exists stats for every topic in the class BrokerTopicStats. Thanks and Best Regards Lucent Wong

Can not start python kafka consumer using SASL/SCRAM

I'm having an issue with Kafka. Whenever the consumer use SASL_SSL or SASL_PLAINTEXT, it can't start. I've tried to change the consumer to use PLAINTEXT or SSL (without SASL), and its working fine Below are my configurations and error: *config/server.properties:* listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095 ssl.keystore.location=/root/zoo-kaf/kafka_2.12-3.2.3/ssl/KeyStore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=/root/zoo-kaf/kafka_2.12-3.2.3/ssl/truststore.jks ssl.truststore.password=123456 sasl.enabled.mechanisms=SCRAM-SHA-512,PLAIN listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="alice" \ password="alice-secret" \ user_alice="alice-secret"; listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.secur...

Re: Kafka Streams possible partitioner bug

Yes, we tried to plug the custom partition in via the `partitioner.class` ProducerConfig as you thought.   We confirmed that implementing the StreamPartitioner interface, and passing it to our Topology definition indeed does work, thank you for your help!   Agreed, updating the documentation will help for now, and we will look out for the new config when it comes out.   Thanks for the quick help, Upesh   Upesh Desai ​  |  Senior Software Developer  |  udesai@itrsgroup.com www.itrsgroup.com From: Sophie Blee-Goldman <sophie@confluent.io.INVALID> Date: Friday, November 18, 2022 at 7:35 PM To: users@kafka.apache.org <users@kafka.apache.org> Subject: Re: Kafka Streams possible partitioner bug Hey Upesh, are you trying to plug in the custom partitioner via the `partitioner.class` ProducerConfig? That won't work in Streams for the exact reason you highlighted, which is why Str...

Re: Kafka 3.3.1, unable to start services in cygwin

Hi, Are you running on windows? If Yes, please check documentation once. There are different executables for windows under /bin Also make sure you are using correct version of jdk for windows. Regards, Sunil. On Mon, 21 Nov 2022 at 2:26 AM, ravi r < ravi480000@gmail.com > wrote: > I downloaded > > kafka_2.13-3.3.1.tgz > > and am unable to start services from a cygwin bash shell on my desktop > using zookeeper. Relevant error > > $ zookeeper-server-start.sh $KAFKA/config/zookeeper.properties > [0.003s][error][logging] Invalid decorator > '/cygwin64/usr/local/kafka/kafka_2.13-3.3.1/logs/zookeeper-gc.log'. > Invalid -Xlog option > > '-Xlog:gc*:file=C:/cygwin64/usr/local/kafka/kafka_2.13-3.3.1/logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M', > see error log for > details. > Error: Could not create the Java Virtual Machine. > Error: A fatal exception has occurred. Program will...

Kafka 3.3.1, unable to start services in cygwin

I downloaded kafka_2.13-3.3.1.tgz and am unable to start services from a cygwin bash shell on my desktop using zookeeper. Relevant error $ zookeeper-server-start.sh $KAFKA/config/zookeeper.properties [0.003s][error][logging] Invalid decorator '/cygwin64/usr/local/kafka/kafka_2.13-3.3.1/logs/zookeeper-gc.log'. Invalid -Xlog option '-Xlog:gc*:file=C:/cygwin64/usr/local/kafka/kafka_2.13-3.3.1/logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M', see error log for details. Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.

Re: Kafka Streams possible partitioner bug

Hey Upesh, are you trying to plug in the custom partitioner via the `partitioner.class` ProducerConfig? That won't work in Streams for the exact reason you highlighted, which is why Streams has its own version of the interface called StreamPartitioner -- this is what you need to implement instead. Unfortunately there is currently no config for Streams that will be applied across the application, so you will have to plug in the custom partitioner by passing it in directly to the operators. If you look at the various APIs of the DSL you'll notice many have an overload which takes in this parameter (eg see "Produced") As it turns out however I am currently working on a KIP for a default.stream.partitioner config that you will be able to set once rather than carefully passing it in across the topology. I'll take this as good evidence of the usefulness of this feature -- unfortunately you'll have to wait for a bit as it will not be available ...

Kafka Streams possible partitioner bug

Hello all,   We have been working on implementing a custom partitioner for our producer within a simple stream application, that will partition the records by a member field when sending them to the output topic. By looking at the contract of the partition() method in the Partitioner interface, it would seem that the value Object would be in its deserialized form when this method is called:   /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);   For a regular producer that's instantiated, this seems to work correc...

TLS security configuration for kraft controllers and brokers

Hi, We have a request concerning the setup of a kafka cluster using version 3.3.1 in Kraft mode with controllers and brokers. We have managed to setup TLS security for inter broker communication. We would also like to have TLS  security for the controllers (communication inter controllers and controller-broker). Would you be able to tell us if doable and if so give us an example of configuration for a controller. (controller.properties) Kind regards Arnaud     Arnaud D’haene • Engineering Manager • Euroclear UK & International Ltd Tel. : +4420784413449 Mobile: +447828269088 Arnaud.dhaene@euroclear.com • www.euroclear.com    

Re: Cannot send in state COMMITTING_TRANSACTION

Hi Sophie, Thank you for your response. Unfortunately, I am unable to file the report because it requires membership I don't have. However, I obtained the permission to share logs with you. We had 5 different kafka streams applications running in parallel on two servers: indicators1 and indicators2. I'm attaching logs from both servers. The error occurred on indicators1 and had not occurred since. śr., 16 lis 2022 o 04:31 Sophie Blee-Goldman <sophie@confluent.io.invalid> napisał(a): Interesting, this does look like it could be a bug in Streams and I'm not aware of any known or already-fixed issues resembling this. Could you file a bug report over here < https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA > and include as much context/information as possible? Providing logs from around the time leading up to this exception in particular would greatly help in debugging this On Tue, Nov 15, 2022 at 2:15 AM Tomasz Gac < tomasz.g...