Skip to main content

Posts

Showing posts from January, 2021

Polling behaviour when a consumer assigned with multiple topics-partition

Hi, I want to know the polling behaviour when a consumer is assigned with multiple topic-partitions. 1. In a single poll will it get messages from multiple topic-partitions. Or in one poll only one topic-partition's messages will come? 2. How does it choose topic-partitions for next polling? Any default class/strategy assigned for this, that we can reconfigure? Thanks Upendra

Re: Send several Serialized Java objects under one Kafka Topic

Thanks you all for the answer, >Use Avro and a discriminated union / sum type to combine all the types. Can you share a bit more about this approach, plase? BR, Pater On Mon, Feb 1, 2021 at 12:54 AM Christopher Smith < cbsmith@gmail.com > wrote: > > Use Avro and a discriminated union / sum type to combine all the types. > > On Sun, Jan 31, 2021, 6:49 AM Peter Penzov < peter.penzov@gmail.com > wrote: > > > Hello All, > > I'm working on proof of concept for sending several Java Objects > > under one Kafka Topic. More about the requirements: > > > > https://stackoverflow.com/questions/65811681/design-kafka-consumers-and-producers-for-scalability > > > > I managed to implement this working concept: > > https://github.com/rcbandit111/skyobject_engine > > > > In this code example I send several Java Objects under one Kafka Topic > > and response is sent back int...

Re: Send several Serialized Java objects under one Kafka Topic

Use Avro and a discriminated union / sum type to combine all the types. On Sun, Jan 31, 2021, 6:49 AM Peter Penzov < peter.penzov@gmail.com > wrote: > Hello All, > I'm working on proof of concept for sending several Java Objects > under one Kafka Topic. More about the requirements: > > https://stackoverflow.com/questions/65811681/design-kafka-consumers-and-producers-for-scalability > > I managed to implement this working concept: > https://github.com/rcbandit111/skyobject_engine > > In this code example I send several Java Objects under one Kafka Topic > and response is sent back into another Kafka Topic. > > I would like to ask you what are the advantages and disadvantages in > this design of sending data between Consumer and Producer? I want to > use a serialized Java object, not JSON because I think it's more > optimized for performance. > > Can you share your feedback, please? > ...

Re: Send several Serialized Java objects under one Kafka Topic

Hi Peter, The biggest issue with Java serialization has always been compatibility between versions of the classes between producer and consumer, it can be avoided via very careful management, but it's a lot more painful (IMO) than other serialisation formats. If you're looking to avoid JSON, I recommend Avro and/or Protobuf, as the Schema Registry developed by Confluent and associated Kafka producer serializers/ consumer deserializers explicitly support JSON, Avro and Protobuf, and (once again, in my opinion) using the schema registry really takes the pain out of schema version compatibility management. Kind regards, Liam Clarke-Hutchinson On Mon, 1 Feb. 2021, 3:49 am Peter Penzov, < peter.penzov@gmail.com > wrote: > Hello All, > I'm working on proof of concept for sending several Java Objects > under one Kafka Topic. More about the requirements: > > https://stackoverflow.com/questions/65811681/design-kafka-consumers...

Re: Send several Serialized Java objects under one Kafka Topic

" I want to use a serialized Java object, not JSON because I think it's more optimized for performance." => Bad assumption. https://www.robert-franz.com/2013/12/09/comparing-serialization-performance/ Avro and protobuf are oflently used, Json is easier https://blog.softwaremill.com/the-best-serialization-strategy-for-event-sourcing-9321c299632b Le dim. 31 janv. 2021 Ơ 15:49, Peter Penzov < peter.penzov@gmail.com > a Ʃcrit : > Hello All, > I'm working on proof of concept for sending several Java Objects > under one Kafka Topic. More about the requirements: > > https://stackoverflow.com/questions/65811681/design-kafka-consumers-and-producers-for-scalability > > I managed to implement this working concept: > https://github.com/rcbandit111/skyobject_engine > > In this code example I send several Java Objects under one Kafka Topic > and response is sent back into another Kafka Topic. >...

Re: Event Sourcing with Kafka Streams and processing order of a re-entrant pipeline

Hi David, Thank you for the question. If I can confirm, it looks like the "operations" topic is the only input to the topology, and the topology reads the "operations" topic joined with the "account" table and generates a "movements" stream. It reads (and aggregates) the "movements" stream to create the "account" table. I think your concern is dead-on. The processing of incoming records from either the "operations" topic or the "movements" topic is synchronous, BUT the production of messages to the "movements" topic and subsequent consumption of those messages in the join is asynchronous. In other words, you can indeed get "insufficient funds" in your application. Following your scenario, this can happen: 1. consume Operation(10) 1. join (balance is $0) 1. produce Movement(10) 2. consume Operation(10) 2. join (balance is $0) 2. produce Movement(10) 3. c...

ReplicaManager fetch fails on leader due to long/integer overflow

Hi, I'm having what seems to be this issue: https://issues.apache.org/jira/browse/KAFKA-7656 < https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel > It happens when a transactional producer sends an event for the 1st time. ``` ERROR [ReplicaManager broker=1] Error processing fetch with max size -2147483648 from consumer on partition __consumer_offsets-19: (fetchOffset=4, logStartOffset=-1, maxBytes=-2147483648, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read from segment FileRecords(size=5150, file=/var/lib/kafka/__consumer_offsets-19/00000000000000000000.log, start=0, end=2147483647) ``` I checked the logs with TRACE level but it doesn't show anything new. I tried the default "max.partition.fetch.bytes" and setting it manually to the double. We use default consumers/producers with Spring-Kaf...

Send several Serialized Java objects under one Kafka Topic

Hello All, I'm working on proof of concept for sending several Java Objects under one Kafka Topic. More about the requirements: https://stackoverflow.com/questions/65811681/design-kafka-consumers-and-producers-for-scalability I managed to implement this working concept: https://github.com/rcbandit111/skyobject_engine In this code example I send several Java Objects under one Kafka Topic and response is sent back into another Kafka Topic. I would like to ask you what are the advantages and disadvantages in this design of sending data between Consumer and Producer? I want to use a serialized Java object, not JSON because I think it's more optimized for performance. Can you share your feedback, please? Also is there some better way to implement this? Like for example using Streaming API? BR, Peter

Event Sourcing with Kafka Streams and processing order of a re-entrant pipeline

I'm working on a project where I want to use Kafka Streams for Event Sourcing. General idea is that I have a "commands" topic/KStream, an "events" topic/KStream and a "snapshots" topic/KTable. Snapshots contains the current state of the entities. Commands are validated using the "snapshots" and transformed to "events". Group EVENTS stream by key and aggregate them to a SNAPSHOTS table. Left join COMMANDS stream with the SNAPSHOTS table and output new EVENTS. For example, to apply this pattern to a simple bank-account scenario I can have: - operations stream as "commands" (requests to deposit or withdraw an amount of money, eg. "deposit $10" => Operation(+10) ) - movements stream as "events" (actual deposit or withdraw event, eg. "$10 deposited" => Movement(+10) ) - account table as a "snapshots" (account balance, eg. "$20 in account balance...

Re: Clarify ā€œthe order of execution for the subtractor and adder is not definedā€

What Alex says. The order is hard-coded (ie not race condition), but there is no guarantee that we won't change the order in future releases without notice (ie, it's not a public contract and we don't need to do a KIP to change it). I guess there would be a Jira about it... But as a matter of fact, it does not really matter (detail below). For the three scenarios you mentioned, the 3rd one cannot happen though: We execute an aggregator in a single thread (per shard) and thus we either call the adder or subtractor first. > 1. Seems like an unusual design choice Why do you think so? > first with a Change<V> value that includes only > the old value and then the process function is called again with a Change<V> > value that includes only the new value. In general, both records might be processed by different threads and thus we cannot only send one record. It's just that the TTD simulates a single threaded executio...

Re: Clarify ā€œthe order of execution for the subtractor and adder is not definedā€

From the source code in KGroupedTableImpl, the subtractor is always called before the adder. By not guaranteeing the order, I think the devs meant that it might change on future versions of Kafka Streams (although I'd think it's unlikely to). I have use cases similars with your example, and that phrase worries me a bit too. :) On Thu, Jan 28, 2021, 22:31 Fq Public < fq.public5@gmail.com > wrote: > Hi everyone! I posted this same question on stackoverflow > < > https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined > > > a few days ago but didn't get any responses. Was hoping someone here might > be able to help clarify this part of the documentation for me :) > > On Thu, 28 Jan 2021 at 19:50, Fq Public < fq.public5@gmail.com > wrote: > > > The Streams DSL documentation > > < > https://kafka.apache.org/27/documentation/strea...

Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

Could you share your code around > com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91) That seems to be where NPE is thrown. On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert < nitay.k@ironsrc.com > wrote: > Hey, > *Without any code change*, just by bumping the kafka version from 2.5.1 to > 2.6.1 (clients only) - my stream application started throwing > NullPointerException (sometimes, not in a predicted pattern). > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM" conf > that was forgotten there from the older versions. > > We are using Scala 2.12, and the line that throws this exception is using > flatMapValues: > > > > inputStream.flatMapValues(_.split) # return type > > KStream[Windowed[String], SingleInputMessage] > > > Where inputStream is of type: KStream[Windowed[String], InputMessage] and > the split method splits this InputMessa...

Re: Clarify ā€œthe order of execution for the subtractor and adder is not definedā€

Hi everyone! I posted this same question on stackoverflow < https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined > a few days ago but didn't get any responses. Was hoping someone here might be able to help clarify this part of the documentation for me :) On Thu, 28 Jan 2021 at 19:50, Fq Public < fq.public5@gmail.com > wrote: > The Streams DSL documentation > < https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating > includes > a caveat about using the aggregate method to transform a KGroupedTable → > KTable, as follows (emphasis mine): > > When subsequent non-null values are received for a key (e.g., UPDATE), > then (1) the subtractor is called with the old value as stored in the table > and (2) the adder is called with the new value of the input record that was > just received. *The order of execution for the subtracto...

Clarify ā€œthe order of execution for the subtractor and adder is not definedā€

The Streams DSL documentation < https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating > includes a caveat about using the aggregate method to transform a KGroupedTable → KTable, as follows (emphasis mine): When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. *The order of execution for the subtractor and adder is not defined.* My interpretation of that last line implies that one of three things can happen: 1. subtractor can be called before adder 2. adder can be called before subtractor 3. adder and subtractor could be called at the same time Here is the question I'm looking to get answered: *Are all 3 scenarios above actually possible when using the aggregate method on a KGroupedTable?* Or am I misinterpreting the document...

Re: Re: hi,guys kafka 2.7.0 builld fail, any suggestion?

It should work with 6.6[1], or use the included gradlew script to bootstrap and use the right gradle version. Kind regards, Tom [1]: https://github.com/apache/kafka/blob/2.7.0/gradle/wrapper/gradle-wrapper.properties On Wed, Jan 27, 2021 at 9:52 AM felixzh < felixzh2020@126.com > wrote: > Hi, > Gradle 5.6.4 > > > > > > > > > > > > > > > > > > At 2021-01-27 17:40:17, "Liam Clarke-Hutchinson" < > liam.clarke@adscale.co.nz > wrote: > >Hi, > > > >What version of Gradle? > > > >Cheers, > > > >Liam > > > >On Wed, 27 Jan. 2021, 10:35 pm felixzh, < felixzh2020@126.com > wrote: > > > >> ~/Downloads/kafka-2.7.0# gradle clean > >> > >> > >> > >> > >> > Configure project : > >> > >> Building project 'core' with S...

Re:Re: hi,guys kafka 2.7.0 builld fail, any suggestion?

Hi, Gradle 5.6.4 At 2021-01-27 17:40:17, "Liam Clarke-Hutchinson" < liam.clarke@adscale.co.nz > wrote: >Hi, > >What version of Gradle? > >Cheers, > >Liam > >On Wed, 27 Jan. 2021, 10:35 pm felixzh, < felixzh2020@126.com > wrote: > >> ~/Downloads/kafka-2.7.0# gradle clean >> >> >> >> >> > Configure project : >> >> Building project 'core' with Scala version 2.13.3 >> >> >> >> >> FAILURE: Build failed with an exception. >> >> >> >> >> * Where: >> >> Build file '/root/Downloads/kafka-2.7.0/build.gradle' line: 471 >> >> >> >> >> * What went wrong: >> >> A problem occurred evaluating root project 'kafka-2.7.0'. >> >> > Failed to apply plugin [id 'org.gradle.scala'] >> >> > Could not find method scala() for arguments...

Re: hi,guys kafka 2.7.0 builld fail, any suggestion?

Hi, What version of Gradle? Cheers, Liam On Wed, 27 Jan. 2021, 10:35 pm felixzh, < felixzh2020@126.com > wrote: > ~/Downloads/kafka-2.7.0# gradle clean > > > > > > Configure project : > > Building project 'core' with Scala version 2.13.3 > > > > > FAILURE: Build failed with an exception. > > > > > * Where: > > Build file '/root/Downloads/kafka-2.7.0/build.gradle' line: 471 > > > > > * What went wrong: > > A problem occurred evaluating root project 'kafka-2.7.0'. > > > Failed to apply plugin [id 'org.gradle.scala'] > > > Could not find method scala() for arguments > [build_97rd0aprcab26i03s589xrhv1$_run_closure5$_closure74$_closure108@30f53a84] > on object of type org.gradle.api.plugins.scala.ScalaPlugin. > > > > > * Try: > > Run with --stacktrace option to get the s...

hi,guys kafka 2.7.0 builld fail, any suggestion?

~/Downloads/kafka-2.7.0# gradle clean > Configure project : Building project 'core' with Scala version 2.13.3 FAILURE: Build failed with an exception. * Where: Build file '/root/Downloads/kafka-2.7.0/build.gradle' line: 471 * What went wrong: A problem occurred evaluating root project 'kafka-2.7.0'. > Failed to apply plugin [id 'org.gradle.scala'] > Could not find method scala() for arguments [build_97rd0aprcab26i03s589xrhv1$_run_closure5$_closure74$_closure108@30f53a84] on object of type org.gradle.api.plugins.scala.ScalaPlugin. * Try: Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights. * Get more help at https://help.gradle.org BUILD FAILED in 1s

Re: Kafka transactions commit message consumability issues

Hi, What Boyang meant, is this consumer config: *isolation.level, *which default is *read_uncommitted*, so the consumer will be able to read all messages with or without committed. ref: https://kafka.apache.org/documentation/#consumerconfigs_isolation.level Thanks Luke On Wed, Jan 27, 2021 at 2:50 AM Boyang Chen < reluctanthero104@gmail.com > wrote: > Have you set consumer isolation level? If it was set to uncommitted, it > will be able to see messages you produced, without commitTransaction call > > On Tue, Jan 26, 2021 at 7:43 AM 积淀智慧 < dream_zph@163.com > wrote: > > > Hello, everybody, > > > > > > I'm running some tests while using Kafka transactions. > > > > > > test 1 : > > String msg = "matt test"; > > String topic = "test"; > > producer.beginTransaction(); > > producer.send(new ProducerRecord(topic, "0", msg.toString()))...

Re: Kafka transactions commit message consumability issues

Have you set consumer isolation level? If it was set to uncommitted, it will be able to see messages you produced, without commitTransaction call On Tue, Jan 26, 2021 at 7:43 AM 积淀智慧 < dream_zph@163.com > wrote: > Hello, everybody, > > > I'm running some tests while using Kafka transactions. > > > test 1 : > String msg = "matt test"; > String topic = "test"; > producer.beginTransaction(); > producer.send(new ProducerRecord(topic, "0", msg.toString())); > producer.send(new ProducerRecord(topic, "1", msg.toString())); > producer.send(new ProducerRecord(topic, "2", msg.toString())); > for (; ; ) { } > > > test result:Messages also arrive at the consumer without delay. > > > My question is, why can data be committed without commitTransaction. > > > thx > > > > > > > > > > > > > ...

Kafka transactions commit message consumability issues

Hello, everybody, I'm running some tests while using Kafka transactions. test 1 : String msg = "matt test"; String topic = "test"; producer.beginTransaction(); producer.send(new ProducerRecord(topic, "0", msg.toString())); producer.send(new ProducerRecord(topic, "1", msg.toString())); producer.send(new ProducerRecord(topic, "2", msg.toString())); for (; ; ) { } test result:Messages also arrive at the consumer without delay. My question is, why can data be committed without commitTransaction. thx

Re: ARM support for Kafka

Hi Kafka The email about whether Kafka supports arm ci has been send for a while and no objections were raised. Can we just keep moving forward ? @junrao < http://twitter.com/junrao > @ijuma < https://twitter.com/ijuma > , The PR I have submitted [ https://github.com/apache/kafka/pull/9872 ]. I don't have permission to configure it to work. Can you help me? Thanks Lei Peng Peng Lei < peng.8lei@gmail.com > äŗŽ2021幓1月18旄周一 äøŠåˆ10:43å†™é“ļ¼š > Hi Kafka > I'm pushing kafka to support arm ci. I think this is a very > meaningful work because of it helps kafka get better. If Kafka supports > arm, the following benefits are provided: > 1. Kafka can be easily used by arm users. They do not need to worry > about kafka being unavailable on the arm platform. With the arm CI, we can > ensure that each code integration can run properly on the arm platform. If > we do more, we can provide the official arm release package...

Suggestions on canarying traffic for kafka consumers.

Hi Folks, Canarying traffic is an excellent way of reducing the impact when releasing a new release with a bug. Such canarying is somewhat easier with a few queueing backends like sqs & redis. In sqs for example each application container/instance of canary can self regulate how much throughput they process after looking at how much throughput the rest of the containers processed using some quota logic. But coming to the kafka consumers, since partitions are assigned to application containers/instances, I'm finding it a bit hard to decide on a way to split the traffic between the canary & production application deployments. As of now these are a few thoughts in my mind. ### Kafka Consumer as Separate Deployment that makes RPC calls to application containers In this approach, I was thinking to run kafka consumer as a separate deployment that makes rpc calls to application containers via load balancer or envoy. The load balancer/envoy will help in sp...

Re: Kafka Consumer Consumption based on TimeStamp-based position

Thanks. Just realised that it was in the API since 0.11.0. Thanks Steve. On Sat, 23 Jan 2021 at 12:42, Steve Howard < steve.howard@confluent.io > wrote: > Hi, > > Yes, you can use the offsetsForTimes() method. See below for a simple > example that should get you started... > > import org.apache.kafka.clients.consumer.*; > import org.apache.kafka.common.config.ConfigException; > import org.apache.kafka.common.*; > import java.io.*; > import java.time.Duration; > import java.util.*; > import java.text.*; > > public class searchByTime { > static KafkaConsumer<String, String> c; > > public static void main(String args[]) throws Exception { > Properties props = new Properties(); > props.put("bootstrap.servers","localhost:9092"); > props.put("max.poll.records",1); > props.put("topic","yourtopicname"); > props.put(...

Re: Kafka Consumer Consumption based on TimeStamp-based position

Hi, Yes, you can use the offsetsForTimes() method. See below for a simple example that should get you started... import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.*; import java.io.*; import java.time.Duration; import java.util.*; import java.text.*; public class searchByTime { static KafkaConsumer<String, String> c; public static void main(String args[]) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers","localhost:9092"); props.put("max.poll.records",1); props.put("topic","yourtopicname"); props.put(" group.id ",UUID.randomUUID().toString()); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDes...

Kafka consumers in Oracle Weblogic over SSL

Hi Team- We have a Kafka configuration with SSL setup and our consumers are in weblogic servers. As per documentation, Kafka consumer api's support only file based keystore and truststores. We don't want to store the keystore/truststore on weblogic filesystem and looking for any alternate solution. Please let me know if anyone had implemented this anyother way.

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

If you can make the write idempotent, you should be fine. For this case, you might need to catch exceptions on INSERT INTO` and convert into `UPDATE` statement. -Matthias On 1/19/21 5:49 AM, Pushkar Deole wrote: > Is there also a way to avoid duplicates if the application consumer from > kafka topic and writes the events to database? > e.g. in case the application restarts while processing a batch read from > topic and few events already written to database, when application > restarts, those events are again consumed by another instance and written > back to database. > > Could this behavior be avoided somehow without putting constraints on > database table? > > On Wed, Jan 6, 2021 at 11:18 PM Matthias J. Sax < mjsax@apache.org > wrote: > >> Well, that is exactly what I mean by "it depends on the state store >> implementation". >> >> For this case, you cannot get exactly-once. ...