Skip to main content

Posts

Showing posts from February, 2020

Re: KafkaStreams GroupBy with new key. Can I skip repartition?

Hi all, The KIP is accepted and implemented already, but is blocked on code review: https://github.com/apache/kafka/pull/7170 A quick note on the lack of recent progress... It's completely our fault, the reviews fell by the wayside during the 2.5.0 release cycle, and we haven't gotten back to it. The contributor, Levani, has been exceptionally patient with us and continually kept the PR up-to-date and mergeable since then. If you'd like to help get it across the line, Murilo, maybe you can give it a review? Thanks, John On Sat, Feb 29, 2020, at 20:52, Guozhang Wang wrote: > It is in progress, but I was not the main reviewer of that ticket so I > cannot say for sure. I saw the last update is on Jan/2019 so maybe it's a > bit loose now.. If you want to pick it up and revive the KIP completion > feel free to do so :) > > > Guozhang > > > On Fri, Feb 28, 2020 at 5:54 PM Murilo Tavares < murilofla@gmail.co...

Re: KafkaStreams GroupBy with new key. Can I skip repartition?

It is in progress, but I was not the main reviewer of that ticket so I cannot say for sure. I saw the last update is on Jan/2019 so maybe it's a bit loose now.. If you want to pick it up and revive the KIP completion feel free to do so :) Guozhang On Fri, Feb 28, 2020 at 5:54 PM Murilo Tavares < murilofla@gmail.com > wrote: > Guozhang > The ticket definitely describes what I'm trying to achieve. > And should I be hopeful with the fact it's in progress? :) > Thanks for pointing that out. > Murilo > > On Fri, Feb 28, 2020 at 2:57 PM Guozhang Wang < wangguoz@gmail.com > wrote: > > > Hi Murilo, > > > > Would this be helping your case? > > https://issues.apache.org/jira/browse/KAFKA-4835 > > > > > > Guozhang > > > > On Fri, Feb 28, 2020 at 7:01 AM Murilo Tavares < murilofla@gmail.com > > > wrote: > > > > > Hi > > > I a...

[VOTE] 2.5.0 RC0

Hello Kafka users, developers and client-developers, This is the first candidate for release of Apache Kafka 2.5.0. This is a major release of Kafka which includes many new features, improvements, and bug fixes including: * TLS 1.3 support (1.2 is now the default) * Co-groups for Kafka Streams * Incremental rebalance for Kafka Consumer * New metrics for better operational insight * Upgrade Zookeeper to 3.5.7 * Deprecate support for Scala 2.11 The full release notes for 2.5.0 can be found here: https://home.apache.org/~davidarthur/kafka-2.5.0-rc0/RELEASE_NOTES.html *** Please download, test and vote by Thursday, March 5th, 5pm PT Kafka's KEYS file containing PGP keys we use to sign the release: https://kafka.apache.org/KEYS * Release artifacts to be voted upon (source and binary): https://home.apache.org/~davidarthur/kafka-2.5.0-rc0/ * Maven artifacts to be voted upon: https://repository.apache.org/content/groups/staging/org/apache/kafka/ * J...

Re: KafkaStreams GroupBy with new key. Can I skip repartition?

Guozhang The ticket definitely describes what I'm trying to achieve. And should I be hopeful with the fact it's in progress? :) Thanks for pointing that out. Murilo On Fri, Feb 28, 2020 at 2:57 PM Guozhang Wang < wangguoz@gmail.com > wrote: > Hi Murilo, > > Would this be helping your case? > https://issues.apache.org/jira/browse/KAFKA-4835 > > > Guozhang > > On Fri, Feb 28, 2020 at 7:01 AM Murilo Tavares < murilofla@gmail.com > > wrote: > > > Hi > > I am currently doing a simple KTable groupby().aggregate() in > KafkaStreams. > > In the groupBy I do need to select a new key, but I know for sure that > the > > new key would still fall in the same partition. Because of this, I > believe > > the repartition would not be necessary, but my question is: is it > possible > > to do a groupBy, changing the key, and tell KafkaStreams to not create > the > ...

Re: KafkaStreams GroupBy with new key. Can I skip repartition?

Hi Murilo, Would this be helping your case? https://issues.apache.org/jira/browse/KAFKA-4835 Guozhang On Fri, Feb 28, 2020 at 7:01 AM Murilo Tavares < murilofla@gmail.com > wrote: > Hi > I am currently doing a simple KTable groupby().aggregate() in KafkaStreams. > In the groupBy I do need to select a new key, but I know for sure that the > new key would still fall in the same partition. Because of this, I believe > the repartition would not be necessary, but my question is: is it possible > to do a groupBy, changing the key, and tell KafkaStreams to not create the > repartition topic? > Thanks > Murilo > -- -- Guozhang

KafkaStreams GroupBy with new key. Can I skip repartition?

Hi I am currently doing a simple KTable groupby().aggregate() in KafkaStreams. In the groupBy I do need to select a new key, but I know for sure that the new key would still fall in the same partition. Because of this, I believe the repartition would not be necessary, but my question is: is it possible to do a groupBy, changing the key, and tell KafkaStreams to not create the repartition topic? Thanks Murilo

Re: when to expand cluster

Hi, I am now in the process of deciding partitions and replicas for my cluster. I am making use of perf test utilities and it really helps a lot. Just measure perf by creating multiple topics with same number of recodrs with diff partitions and replicas. Then compare the througput and also look at latency. If possible change network and io threads at broker level and then again do perf testing for above scenarios. Thanks, Sunil On Fri, 28 Feb 2020 at 11:40 AM, å¼ ē„„ < xiangzhang1128@gmail.com > wrote: > Thanks, it helps a lot. > > Peter Bukowinski < pmbuko@gmail.com > äŗŽ2020幓2月28旄周五 äøŠåˆ5:18å†™é“ļ¼š > > > No, it's not bad. Kafka is designed to serve data to many consumers at > the > > same time, whether they are independent of each other or in the same > > consumer group. > > > > I would encourage you to play with different partition counts and use > > kafka's performance testing tools (kafka-produce...

Re: when to expand cluster

Thanks, it helps a lot. Peter Bukowinski < pmbuko@gmail.com > äŗŽ2020幓2月28旄周五 äøŠåˆ5:18å†™é“ļ¼š > No, it's not bad. Kafka is designed to serve data to many consumers at the > same time, whether they are independent of each other or in the same > consumer group. > > I would encourage you to play with different partition counts and use > kafka's performance testing tools (kafka-producer-perf-test.sh and > kafka-consumer-perf-test.sh) to test throughput in different scenarios and > see the results for yourself. > > — > Peter > > > On Feb 27, 2020, at 1:28 AM, å¼ ē„„ < xiangzhang1128@gmail.com > wrote: > > > > I believe no matter the partition count exceeds the broker count, we can > > always have the same number of consumer instances as the partition count. > > > > So what I want to know is when two partition exists on the same broker, > two > > consumer instances will be talkin...

RE: HELP in Usage of JMX port in Kafka

Hi All, Sorry to bother you all. It was simple. 😊 Just put one line in required .sh file export JMX_PORT=<port number of your choice other than 9099> and it will run 😊 thanks, Sunil. From: Sunil CHAUDHARI Sent: Friday, February 28, 2020 10:06 AM To: users@kafka.apache.org Subject: HELP in Usage of JMX port in Kafka Hi all, I have used JMX_PORT 9099 in environment variable and started Kafka. There is not problem till now. I can see metrics on kafka-manager console. This is fine. However when I run kafka-consumer-perf-test.sh and kafka-producer-perf-test.sh and similar utility under /bin then I get error given at the end: I understand the root cause. But I don't know solution. Cant I make use of those test utilities given under /bin, when I enabled JMX? I want to measure performance of running kafka cluster and at the same time want to monitor metrics on kafka-manager. How can I skip JMX_port while running manual utilities? Or is there a way I c...

HELP in Usage of JMX port in Kafka

Hi all, I have used JMX_PORT 9099 in environment variable and started Kafka. There is not problem till now. I can see metrics on kafka-manager console. This is fine. However when I run kafka-consumer-perf-test.sh and kafka-producer-perf-test.sh and similar utility under /bin then I get error given at the end: I understand the root cause. But I don't know solution. Cant I make use of those test utilities given under /bin, when I enabled JMX? I want to measure performance of running kafka cluster and at the same time want to monitor metrics on kafka-manager. How can I skip JMX_port while running manual utilities? Or is there a way I can give different port for kafka-producer-perf-test.sh ? Please help me in this regards. Error: Exception thrown by the agent : java.rmi.server.ExportException: Port already in use: 9099; nested exception is: java.net.BindException: Address already in use (Bind failed) jdk.internal.agent.AgentConfigurationError: java.rmi.server.E...

Re: when to expand cluster

No, it's not bad. Kafka is designed to serve data to many consumers at the same time, whether they are independent of each other or in the same consumer group. I would encourage you to play with different partition counts and use kafka's performance testing tools (kafka-producer-perf-test.sh and kafka-consumer-perf-test.sh) to test throughput in different scenarios and see the results for yourself. — Peter > On Feb 27, 2020, at 1:28 AM, å¼ ē„„ < xiangzhang1128@gmail.com > wrote: > > I believe no matter the partition count exceeds the broker count, we can > always have the same number of consumer instances as the partition count. > > So what I want to know is when two partition exists on the same broker, two > consumer instances will be talking to same broker, is that bad ? > > å¼ ē„„ < xiangzhang1128@gmail.com > äŗŽ2020幓2月27旄周四 äø‹åˆ2:20å†™é“ļ¼š > >> Thanks. What influence does it have for consumers and producers when >...

Re: Mirror Maker 2 MirrorClient

Hey Carl, that's what my team has done for our internal tooling, and I designed MirrorClient with that in mind. Given a single mm2.properties file you can create MirrorClients for each cluster and those in turn give you Admin/Consumer/Producer clients if you need them. Our internal tooling essentially looks for an mm2.properties file in a system-wide location and lets you interrogate any given cluster by alias, which is nice when you manage a very large number of Kafka clusters :) Ryanne On Thu, Feb 27, 2020, 1:21 PM Carl Graving < cgraving@gmail.com > wrote: > All: > I was tinkering around with the MirrorClient and was curious about the > configs. I see that I can use the MirrorMaker config to pull in the same > configs that were used to spin up the MM2 cluster. Seeing as MM2 is started > with these configs in a property file and it is passed in on command line, > I take it is implying that I should read the same file to set up my ...

Mirror Maker 2 MirrorClient

All: I was tinkering around with the MirrorClient and was curious about the configs. I see that I can use the MirrorMaker config to pull in the same configs that were used to spin up the MM2 cluster. Seeing as MM2 is started with these configs in a property file and it is passed in on command line, I take it is implying that I should read the same file to set up my MirrorClient application to then feed into MirrorMakerConfig, to the get the clientConfig like in the code comments? Or am I reading too much into this and just configure for the cluster I am setting up the Client for? thanks, Carl

Re: subscribe kafka user mail

Hi there! To subscribe to the list, you have to email a different address: users-subscribe@kafka.apache.org . (see https://kafka.apache.org/contact.html ). This also applies to the message you sent to dev (should have been dev-subscribe@kafka.apache.org ). Thanks for joining the conversation! -John On Thu, Feb 27, 2020, at 08:36, Walker Xia wrote: > subscribe kafka user mail >

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

Hi, If there is an operation downstream that needs key co-location (e.g. aggregation), stream.transformValues(/*return null for values that don't need to be forwarded downstream*/).filter((k,v) -> return v !=null) would be more efficient, because for the stream.transform(/*return null for records that don't need to be forwarded downstream*/) approach a repartition step will be inserted. Without an operation that needs key co-location downstream, it is hard to say. Experiments would be needed. Best, Bruno On Thu, Feb 27, 2020 at 7:28 AM Sachin Mittal < sjmittal@gmail.com > wrote: > > Hi, > Yes using filter with transformValues would also work. > I have a question out of curiosity. which one would be more efficient? > stream.transform(/*return null for records that don't need to be forwarded > downstream*/) > or > stream.transformValues(/*return null for values that don't need to be > forwarded downstream*/)...

Re: when to expand cluster

I believe no matter the partition count exceeds the broker count, we can always have the same number of consumer instances as the partition count. So what I want to know is when two partition exists on the same broker, two consumer instances will be talking to same broker, is that bad ? å¼ ē„„ < xiangzhang1128@gmail.com > äŗŽ2020幓2月27旄周四 äø‹åˆ2:20å†™é“ļ¼š > Thanks. What influence does it have for consumers and producers when > partition number is more than broker number, which means at least one > broker serves two partitions for one topic ? performance wise. > > Peter Bukowinski < pmbuko@gmail.com > äŗŽ2020幓2月26旄周三 äø‹åˆ11:02å†™é“ļ¼š > >> Disk usage is one reason to expand. Another reason is if you need more >> ingest or output throughout for your topic data. If your producers aren't >> able to send data to kafka fast enough or your consumers are lagging, you >> might benefit from more brokers and more partitions. >> >...

Re: Are RocksDBWindowStore windows hopping or sliding?

Hi, Yes I get that when I am using the apis provided by kstream I can basically use both: - Tumbling time window (non-overlapping, gap-less windows) - Hopping time window (Time-based Fixed-size, overlapping windows) I wanted to know if I am using state store directly when created using a RocksDbWindowBytesStoreSupplier. In that case the RocksDBWindowStore created will always be of type Tumbling. ie any record put into that store will be part of one window only. Thanks Sachin On Thu, Feb 27, 2020 at 1:09 PM Matthias J. Sax < mjsax@apache.org > wrote: > What you call "sliding window" is called "hopping window" in Kafka Streams. > > And yes, you can use a windowed-store for this case: In fact, a > non-overlapping tumbling window is just a special case of a hopping > window with advance == window-size. > > In Kafka Streams we have a single implementation for hopping windows > (that we use for tumbling ...

Re: Are RocksDBWindowStore windows hopping or sliding?

-----BEGIN PGP SIGNATURE----- iQIzBAEBCAAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5XcjwACgkQO4miYXKq /OjP5w//bJmVYo/XJ8nwkaFkXC48/zgoEzDvA2ZZ+cGhM9D+rXaPr0HpxKZPgZ+a kjlhC+wwZTPbuEDezmmz4FyoyUOaT0sBr2sk/Pay4jOXo/FeI831+VxuexzuwMJf YCsFlAcqO8rGpnBzQgwO1QLt0ZI358LXCA5wgwhFl6LLpg5dA562jzggfnV0xcfh B/zm4loaKMCZCutCT3Dfswz7vkEvxTdGZo47ilfeLF37rD1z89EYaBnT67CaG8FI 7TpqaZ8HRQpC83d/S3SDJiK/Y+1/WD2bmmUKyr67a7lJdHfEcXYo03HWvHO5brhR xq/eb91w0+w1lhAWD+jD7ouvaMlpYqh2sADLthYbr0rnlm8M2gfCHKmMr45l5ANi 2LnwyKeiwsv4ahHf9a7wMsa/9HfEarfG4dIpGHJyFXuzREmnZEA0Lf3ssE34L0u6 0CdKm2E3Ty8//tls8WssamgKlsQDaCZCi3bt4fI8wq4S3gSrXtrKGUgXoqVfAV9X xP37NX6kstl6Z446MGTcOMIUwNOnSj0RaQeGsgSZOexvAGrLm72gHVvoeYzI63uT oIoHZFXRN7Df6E3os8GL4pYRv6Fe6A9paI4LOSovfU7szB8ZuWMmEJ2VNynibgFl CQTo/2o4xoI75xb09HvkK7uKRqtXkvA9mN+M7EeFeL8TMBFZ5dw= =+BzF -----END PGP SIGNATURE----- What you call "sliding window" is called "hopping window" in Kafka Streams. And yes, you can use a windowed-store for this case:...

Re: when to expand cluster

The effect for producers isn't very significant once your topic partition count exceeds your broker count. For consumers — especially if you are using consumer groups — the more partitions you have, the more consumer instances you can have in a single consumer group. (The maximum number of active consumers in a consumer group = the total number of topic partitions assigned to the group.) As long as you are not exceeding the broker's network and disk IO, your total consumer throughput goes up with more partitions. Additional network and disk IO are a benefit of additional brokers. -- Peter > On Feb 26, 2020, at 10:23 PM, å¼ ē„„ < xiangzhang1128@gmail.com > wrote: > > Thanks. What influence does it have for consumers and producers when > partition number is more than broker number, which means at least one > broker serves two partitions for one topic ? performance wise. > > Peter Bukowinski < pmbuko@gmail.com > äŗŽ2020幓2月26旄周三 äø‹åˆ11:0...

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

Hi, Yes using filter with transformValues would also work. I have a question out of curiosity. which one would be more efficient? stream.transform(/*return null for records that don't need to be forwarded downstream*/) or stream.transformValues(/*return null for values that don't need to be forwarded downstream*/).filter((k,v) -> return v !=null) Thanks Sachin On Tue, Feb 25, 2020 at 11:48 PM Bruno Cadonna < bruno@confluent.io > wrote: > Hi Sachin, > > I am afraid I cannot follow your point. > > You can still use a filter if you do not want to emit records > downstream w/o triggering any repartitioning. > > Best, > Bruno > > On Tue, Feb 25, 2020 at 6:43 PM Sachin Mittal < sjmittal@gmail.com > wrote: > > > > Hi, > > This is really getting interesting. > > Now if we don't want a record to be emitted downstream only way we can do > > is via transform or (flatTransfor...

PCF to kafka message sending

Hi, We have one case where we want to send messages from PCF to Kafka endpoints. Is it possible? How? Regards, Sunil. CONFIDENTIAL NOTE: The information contained in this email is intended only for the use of the individual or entity named above and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this message in error, please immediately notify the sender and delete the mail. Thank you.

Re: when to expand cluster

Thanks. What influence does it have for consumers and producers when partition number is more than broker number, which means at least one broker serves two partitions for one topic ? performance wise. Peter Bukowinski < pmbuko@gmail.com > äŗŽ2020幓2月26旄周三 äø‹åˆ11:02å†™é“ļ¼š > Disk usage is one reason to expand. Another reason is if you need more > ingest or output throughout for your topic data. If your producers aren't > able to send data to kafka fast enough or your consumers are lagging, you > might benefit from more brokers and more partitions. > > -- Peter > > > On Feb 26, 2020, at 12:56 AM, å¼ ē„„ < xiangzhang1128@gmail.com > wrote: > > > > In documentation, it is described how to expand cluster: > > > https://kafka.apache.org/20/documentation.html#basic_ops_cluster_expansion > . > > But I am wondering what the criteria for expand is. I can only think of > > disk usage threshold. For example,...

Are RocksDBWindowStore windows hopping or sliding?

Hi, So far how I have understood is that when we create a rocksdb window store; we specify a window size and retention period. So windows are created from epoch time based on size, say size if 100 then windows are: [0, 100), [100, 200), [200, 300) ... Windows are retained based on retention period and after which it is dropped. Also a window is divided into segments which is implemented using a treemap. Please confirm if my understanding is correct. Also looks from all this is that windows are always hopping. Is there a case of sliding windows that can be created? If yes how? Example of sliding window would be: [0, 100), [75, 175), [150, 250) ... Thanks Sachin

Re: [ANNOUNCE] New committer: Konstantine Karantasis

Congrats Konstantine! On Thu, Feb 27, 2020 at 7:46 AM Matthias J. Sax < mjsax@apache.org > wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > Congrats! > > On 2/27/20 2:21 AM, Jeremy Custenborder wrote: > > Congrats Konstantine! > > > > On Wed, Feb 26, 2020 at 2:39 PM Bill Bejeck < bill@confluent.io > > > wrote: > >> > >> Congratulations Konstantine! Well deserved. > >> > >> -Bill > >> > >> On Wed, Feb 26, 2020 at 5:37 PM Jason Gustafson > >> < jason@confluent.io > wrote: > >> > >>> The PMC for Apache Kafka has invited Konstantine Karantasis as > >>> a committer and we are pleased to announce that he has > >>> accepted! > >>> > >>> Konstantine has contributed 56 patches and helped to review > >>> even more. His recent work includes a major overh...

Re: [ANNOUNCE] New committer: Konstantine Karantasis

-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Congrats! On 2/27/20 2:21 AM, Jeremy Custenborder wrote: > Congrats Konstantine! > > On Wed, Feb 26, 2020 at 2:39 PM Bill Bejeck < bill@confluent.io > > wrote: >> >> Congratulations Konstantine! Well deserved. >> >> -Bill >> >> On Wed, Feb 26, 2020 at 5:37 PM Jason Gustafson >> < jason@confluent.io > wrote: >> >>> The PMC for Apache Kafka has invited Konstantine Karantasis as >>> a committer and we are pleased to announce that he has >>> accepted! >>> >>> Konstantine has contributed 56 patches and helped to review >>> even more. His recent work includes a major overhaul of the >>> Connect task management system in order to support incremental >>> rebalancing. In addition to code contributions, Konstantine >>> helps the community in many other ways including tal...

Re: Use a single consumer or create consumer per topic

I am thinking of consumer count, not consuming group. What is the pros and cons to use one consumer one topic V.S. one consumer 50 topics? Ryanne Dolan < ryannedolan@gmail.com > äŗŽ2020幓2月27旄周四 äøŠåˆ12:48å†™é“ļ¼š > On an older cluster like that, rebalances will stop-the-world and kill your > throughput. Much better to have a bunch of consumer groups, one per topic, > so they can rebalance independently. > > On Wed, Feb 26, 2020, 1:05 AM Mark Zang < deepnighttwo@gmail.com > wrote: > > > Hi, > > > > I have a 20 brokers kafka cluster and there are about 50 topics to > consume. > > > > Between creating a consumer for each topic and creating a consumer for > all > > 50 topics, what is the pros and cons? > > > > What would be the suggested way if I enable auto commit for each 10 > > seconds? > > > > Kafka client version is 0.10.2 > > > > > > Thanks! ...

Re: [ANNOUNCE] New committer: Konstantine Karantasis

Congrats Konstantine! On Wed, Feb 26, 2020 at 2:39 PM Bill Bejeck < bill@confluent.io > wrote: > > Congratulations Konstantine! Well deserved. > > -Bill > > On Wed, Feb 26, 2020 at 5:37 PM Jason Gustafson < jason@confluent.io > wrote: > > > The PMC for Apache Kafka has invited Konstantine Karantasis as a committer > > and we > > are pleased to announce that he has accepted! > > > > Konstantine has contributed 56 patches and helped to review even more. His > > recent work includes a major overhaul of the Connect task management system > > in order to support incremental rebalancing. In addition to code > > contributions, Konstantine helps the community in many other ways including > > talks at meetups and at Kafka Summit and answering questions on > > stackoverflow. He consistently shows good judgement in design and a careful > > attention to details when it comes to c...

Re: [ANNOUNCE] New committer: Konstantine Karantasis

Congrats Konstantine! Guozhang On Wed, Feb 26, 2020 at 3:09 PM John Roesler < vvcephei@apache.org > wrote: > Congrats, Konstantine! Awesome news. > -John > > On Wed, Feb 26, 2020, at 16:39, Bill Bejeck wrote: > > Congratulations Konstantine! Well deserved. > > > > -Bill > > > > On Wed, Feb 26, 2020 at 5:37 PM Jason Gustafson < jason@confluent.io > > wrote: > > > > > The PMC for Apache Kafka has invited Konstantine Karantasis as a > committer > > > and we > > > are pleased to announce that he has accepted! > > > > > > Konstantine has contributed 56 patches and helped to review even more. > His > > > recent work includes a major overhaul of the Connect task management > system > > > in order to support incremental rebalancing. In addition to code > > > contributions, Konstantine helps the community in many other ways...

Re: [ANNOUNCE] New committer: Konstantine Karantasis

Congrats, Konstantine! Awesome news. -John On Wed, Feb 26, 2020, at 16:39, Bill Bejeck wrote: > Congratulations Konstantine! Well deserved. > > -Bill > > On Wed, Feb 26, 2020 at 5:37 PM Jason Gustafson < jason@confluent.io > wrote: > > > The PMC for Apache Kafka has invited Konstantine Karantasis as a committer > > and we > > are pleased to announce that he has accepted! > > > > Konstantine has contributed 56 patches and helped to review even more. His > > recent work includes a major overhaul of the Connect task management system > > in order to support incremental rebalancing. In addition to code > > contributions, Konstantine helps the community in many other ways including > > talks at meetups and at Kafka Summit and answering questions on > > stackoverflow. He consistently shows good judgement in design and a careful > > attention to details when it comes to code. ...

Re: [ANNOUNCE] New committer: Konstantine Karantasis

Congratulations Konstantine! Well deserved. -Bill On Wed, Feb 26, 2020 at 5:37 PM Jason Gustafson < jason@confluent.io > wrote: > The PMC for Apache Kafka has invited Konstantine Karantasis as a committer > and we > are pleased to announce that he has accepted! > > Konstantine has contributed 56 patches and helped to review even more. His > recent work includes a major overhaul of the Connect task management system > in order to support incremental rebalancing. In addition to code > contributions, Konstantine helps the community in many other ways including > talks at meetups and at Kafka Summit and answering questions on > stackoverflow. He consistently shows good judgement in design and a careful > attention to details when it comes to code. > > Thanks for all the contributions and looking forward to more! > > Jason, on behalf of the Apache Kafka PMC >

[ANNOUNCE] New committer: Konstantine Karantasis

The PMC for Apache Kafka has invited Konstantine Karantasis as a committer and we are pleased to announce that he has accepted! Konstantine has contributed 56 patches and helped to review even more. His recent work includes a major overhaul of the Connect task management system in order to support incremental rebalancing. In addition to code contributions, Konstantine helps the community in many other ways including talks at meetups and at Kafka Summit and answering questions on stackoverflow. He consistently shows good judgement in design and a careful attention to details when it comes to code. Thanks for all the contributions and looking forward to more! Jason, on behalf of the Apache Kafka PMC

Securing Kafka with zookeeper 3.5.5+ and mTLS

Hi, I was just wondering if the following article: https://docs.confluent.io/current/kafka/incremental-security-upgrade.html is still valid when using Zookeeper 3.5.5 with mTLS rather than kerberos? If it is still valid, what principle is used for the ACL? Thanks! ttyl Dima -- dbrodsky@salesforce.com "The price of reliability is the pursuit of the utmost simplicity. It is the price which the very rich find most hard to pay." (Sir Antony Hoare, 1980)

Re: Metrics for Topic/Partition size

awesome, thanks Gabriele On Wed, Feb 26, 2020 at 1:24 PM Gabriele Paggi < gabriele.paggi@gmail.com > wrote: > > Hi Richard, > > Yes, it's the size in bytes for all log segments for a given > topic/partition on a given broker, without the index files: > > [gpaggi@kafkalog001 ~]$ kafka-log-dirs.sh --bootstrap-server > $(hostname -f):9092 --describe --broker-list 1 --topic-list > access_logs | tail -n+3 | jq '.brokers[].logDirs[].partitions[] | > select(.partition == "access_logs-1")' > { > "partition": "access_logs-1", > "size": 385505944, > "offsetLag": 0, > "isFuture": false > } > [gpaggi@kafkalog001 ~]$ du -bc /var/lib/kafka/data/access_logs-1/*.log > 367401290 /var/lib/kafka/data/access_logs-1/00000000000245067429.log > 18104654 /var/lib/kafka/data/access_logs-1/00000000000246918163.log > 385505944 total > [gpagg...

Re: Metrics for Topic/Partition size

Hi Richard, Yes, it's the size in bytes for all log segments for a given topic/partition on a given broker, without the index files: [gpaggi@kafkalog001 ~]$ kafka-log-dirs.sh --bootstrap-server $(hostname -f):9092 --describe --broker-list 1 --topic-list access_logs | tail -n+3 | jq '.brokers[].logDirs[].partitions[] | select(.partition == "access_logs-1")' { "partition": "access_logs-1", "size": 385505944, "offsetLag": 0, "isFuture": false } [gpaggi@kafkalog001 ~]$ du -bc /var/lib/kafka/data/access_logs-1/*.log 367401290 /var/lib/kafka/data/access_logs-1/00000000000245067429.log 18104654 /var/lib/kafka/data/access_logs-1/00000000000246918163.log 385505944 total [gpaggi@kafkalog001 ~]$ On Wed, 26 Feb 2020 at 15:33, Richard Rossel < henhiskan@gmail.com > wrote: > > Thanks Gabriele, it turned out I didn't have that pattern deployed (facepalm). > After deploying ...

Re: Error handling guarantees in Kafka Streams

Hi Magnus, with exactly-once, the producer commits the consumer offsets. Thus, if the producer is not able to successfully commit a transaction, no consumer offsets will be successfully committed, too. Best, Bruno On Wed, Feb 26, 2020 at 1:51 PM Reftel, Magnus <Magnus.Reftel@skatteetaten.no.invalid> wrote: > > Hi, > > From my understanding, it is guaranteed that when a Kafka Streams application running with the exactly_once processing guarantee receives a record, it will either finish processing the record (including flushing any records generated as a direct result of processing the message and committing the transaction), invoke either the DeserializationExceptionHandler or the ProductionExceptionHandler exception handler, or retry processing the message. Is that correct, or are there cases where a record can be consumed (and the consumption committed) without the Kafka Streams application being able to either produce any output or handle an excepti...

Re: Use a single consumer or create consumer per topic

On an older cluster like that, rebalances will stop-the-world and kill your throughput. Much better to have a bunch of consumer groups, one per topic, so they can rebalance independently. On Wed, Feb 26, 2020, 1:05 AM Mark Zang < deepnighttwo@gmail.com > wrote: > Hi, > > I have a 20 brokers kafka cluster and there are about 50 topics to consume. > > Between creating a consumer for each topic and creating a consumer for all > 50 topics, what is the pros and cons? > > What would be the suggested way if I enable auto commit for each 10 > seconds? > > Kafka client version is 0.10.2 > > > Thanks! > > Mark >

Re: Use a single consumer or create consumer per topic

Hey Mark, you could use a consumer group (check the consumer #subscribe API) to consume from 50 topics in a dynamic fashion, as long as the data processing function is the same for all the records. Consumer group could provide basic guarantees for balancing the number of partitions for each consumer if you use default assignors. I couldn't think of any cons for managing one consumer group vs multiple of them. As for auto commit, I don't think I understand your question. You could turn it on by mutating consumer config. Boyang On Tue, Feb 25, 2020 at 11:05 PM Mark Zang < deepnighttwo@gmail.com > wrote: > Hi, > > I have a 20 brokers kafka cluster and there are about 50 topics to consume. > > Between creating a consumer for each topic and creating a consumer for all > 50 topics, what is the pros and cons? > > What would be the suggested way if I enable auto commit for each 10 > seconds? > > Kafka client version ...

Re: when to expand cluster

Disk usage is one reason to expand. Another reason is if you need more ingest or output throughout for your topic data. If your producers aren't able to send data to kafka fast enough or your consumers are lagging, you might benefit from more brokers and more partitions. -- Peter > On Feb 26, 2020, at 12:56 AM, 张焄 < xiangzhang1128@gmail.com > wrote: > > In documentation, it is described how to expand cluster: > https://kafka.apache.org/20/documentation.html#basic_ops_cluster_expansion . > But I am wondering what the criteria for expand is. I can only think of > disk usage threshold. For example, suppose several disk usage exceed 80%. > Is this correct and is there more ?

Re: How to write data from kafka to CSV file on a Mac

By default the output goes to standard output, but you can redirect that to a file: kafkacat -b your.broker.com:yourPORT -t yourtopic -c max-messages > /your/full/path/file.csv On Wed, Feb 26, 2020 at 4:45 AM Doaa K. Amin <doaaelkordy@yahoo.com.invalid> wrote: > > Hi Richard, > Thanks for answering. Just one more thing: do I add to the command that you've written the path and name of the CSV file that I want to write the data to? Please, advise. > Thanks,Doaa. > > Sent from Yahoo Mail on Android > > On Tue, Feb 25, 2020 at 6:20 PM, Richard Rossel< henhiskan@gmail.com > wrote: you can use kafka-console-consumer that comes with your kafka > deployment, or you can install kafkacat (which I found more simple to > use) > > brew install kafkacat > > kafkacat -b your.broker.com:yourPORT -t yourtopic -c max-messages > > > On Tue, Feb 25, 2020 at 9:03 AM Doaa K. Amin > <doaaelkordy@yahoo....

Re: Metrics for Topic/Partition size

Thanks Gabriele, it turned out I didn't have that pattern deployed (facepalm). After deploying it , worked right away. Now I'm struggling with understanding the size metric. Do you know if it's reporting the size (in bytes) of all segments for that broker/topic/partition? I'm trying to compare those values with the actual Log file, but doesn't seems to match exactly. Sorry for those questions, but can't find a good documentation and I'm lost reading core/src/main/scala/kafka/log/Log.scala Thanks for your help.- On Wed, Feb 26, 2020 at 6:02 AM Gabriele Paggi < gabriele.paggi@gmail.com > wrote: > > Hi Richard, > > The beans path is: > kafka.log:name=Size,partition=<partition>,topic=<topic>,type=Log > I don't have a jmx_exporter at hand to test it at the moment but I > don't see anything obviously wrong in your config, other than type: > GAUGE missing. > Did you try browsing the ...

Error handling guarantees in Kafka Streams

Hi, From my understanding, it is guaranteed that when a Kafka Streams application running with the exactly_once processing guarantee receives a record, it will either finish processing the record (including flushing any records generated as a direct result of processing the message and committing the transaction), invoke either the DeserializationExceptionHandler or the ProductionExceptionHandler exception handler, or retry processing the message. Is that correct, or are there cases where a record can be consumed (and the consumption committed) without the Kafka Streams application being able to either produce any output or handle an exception? Best Regards Magnus Reftel ________________________________ Denne e-posten og eventuelle vedlegg er beregnet utelukkende for den institusjon eller person den er rettet til og kan vaere belagt med lovbestemt taushetsplikt. Dersom e-posten er feilsendt, vennligst slett den og kontakt Skatteetaten. The contents of this email message and...

Re: Metrics for Topic/Partition size

Hi Richard, The beans path is: kafka.log:name=Size,partition=<partition>,topic=<topic>,type=Log I don't have a jmx_exporter at hand to test it at the moment but I don't see anything obviously wrong in your config, other than type: GAUGE missing. Did you try browsing the beans with jmxterm before configuring the exporter? Gabriele On Mon, 24 Feb 2020 at 23:01, Richard Rossel < henhiskan@gmail.com > wrote: > > Hi Gabriele, > I'm using Kafka 5.3.1, which is apache kafka 2.3 and I'm using JMX to > retrieve metrics from brokers. > The only metric I saw from "kafka.log" is LogFlushStats, but nothing > about Log.partition > This is the pattern I have deployed for kafka.log metrics, maybe I > need a different for partitions sizes? > > - pattern : kafka.log<type=Log, name=(.+), topic=(.+), > partition=(.+)><>Value > name: kafka_log_$1 > labels: > ...

Re: How to write data from kafka to CSV file on a Mac

Hi Richard, Thanks for answering. Just one more thing: do I add to the command that you've written the path and name of the CSV file that I want to write the data to? Please, advise. Thanks,Doaa. Sent from Yahoo Mail on Android On Tue, Feb 25, 2020 at 6:20 PM, Richard Rossel< henhiskan@gmail.com > wrote: you can use  kafka-console-consumer that comes with your kafka deployment, or you can install kafkacat (which I found more simple to use) brew install kafkacat kafkacat -b your.broker.com:yourPORT -t yourtopic -c max-messages On Tue, Feb 25, 2020 at 9:03 AM Doaa K. Amin <doaaelkordy@yahoo.com.invalid> wrote: > > Hello, > I'm new to kafka and I'd like to write data from kafka to a CSV file in a Mac. Please, advise. > Thank You & Kindest Regards,Doaa. -- Richard Rossel Atlanta - GA

when to expand cluster

In documentation, it is described how to expand cluster: https://kafka.apache.org/20/documentation.html#basic_ops_cluster_expansion . But I am wondering what the criteria for expand is. I can only think of disk usage threshold. For example, suppose several disk usage exceed 80%. Is this correct and is there more ?

synchronously flushing messages to disk

Hi I want to benchmark Kafka, configured such that a message that has been acked by the broker to the producer is guaranteed to have been persisted to disk. I changed the broker settings: log.flush.interval.messages=1 log.flush.interval.ms =0 (Is this the proper way to do it?) The impact is very noticeable. Whereas without these settings, the msg/sec rate (1 producer, 1 topic, async, enable.idempotence) was well north of 100k, with above settings it drops to below 5k on this dev box with ssd storage. This huge drop seems to indicate that Kafka is not doing any batch acking (which would allow it to do batch fsyncing). Is there a way to increase the msg/sec rate given the fsync constraint? It would seem that adding topics/partitions would help in case of a cluster, and the fsync load could be distributed to multiple machines. Is there perhaps also a way to increase the rate per node? I'm using the latest kafka 2.4.0. Best regards Eugen

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

Hello Sachin, I just read John / Bill's comment on that ticket (was not on KAFKA-9533 before so it was kinda new to me), and I think the besides the rationale of John which I agree since for KStream that returning null in value with a non-null key could still have a valid meaning, the behavior has actually been "forwarding to null" so far. We do, however, need to fix the javadoc if we believe that should be the right semantics. Guozhang On Tue, Feb 25, 2020 at 10:18 AM Bruno Cadonna < bruno@confluent.io > wrote: > Hi Sachin, > > I am afraid I cannot follow your point. > > You can still use a filter if you do not want to emit records > downstream w/o triggering any repartitioning. > > Best, > Bruno > > On Tue, Feb 25, 2020 at 6:43 PM Sachin Mittal < sjmittal@gmail.com > wrote: > > > > Hi, > > This is really getting interesting. > > Now if we don't want a record to be emi...

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

Hi Sachin, I am afraid I cannot follow your point. You can still use a filter if you do not want to emit records downstream w/o triggering any repartitioning. Best, Bruno On Tue, Feb 25, 2020 at 6:43 PM Sachin Mittal < sjmittal@gmail.com > wrote: > > Hi, > This is really getting interesting. > Now if we don't want a record to be emitted downstream only way we can do > is via transform or (flatTransform). > > Since we are now reverting the fix for null record in transformValues and > rather change the docs, doesn't this add bit of confusion for users. > Confluent docs says that: > transformValues is preferable to transform because it will not cause data > re-partitioning. > > So in many cases if just the record's value structure is sufficient to > determine whether we should emit it downstream or not, we would still be > forced to > use transform and unnecessarily cause data re-partitioning. ...