Skip to main content

Posts

Showing posts from February, 2025

Re: Picking up the thread on emit-on-change and at-least-once

We did split out KIP-1035 (already accepted), that is a requirement/blocker for KIP-892. Progress is slow, but there is WIP to implement KIP-1035. After it's done, we can continue/pick-up KIP-892 implementation. -Matthias On 2/28/25 11:20 AM, Steven Schlansker wrote: > Thank you for your response Matthias. > >> On Feb 26, 2025, at 8:32 PM, Matthias J. Sax < mjsax@apache.org > wrote: >> >> Hey, >> >> the problem you describe it not totally fundamental, but there is no workaround in KS currently... So right now, only EOS can fix it. >> >> In the end, what you would need is a way to first write to the output topic (and flush the write), before you update the state store. >> > > Yes, I reached the same conclusion. > >> Using the PAPI you could conceptually do `context.forward()` before you call `store.put(...)`, but the KS runtime hides too many low level controls that it would gu...

Re: Picking up the thread on emit-on-change and at-least-once

Thank you for your response Matthias. > On Feb 26, 2025, at 8:32 PM, Matthias J. Sax < mjsax@apache.org > wrote: > > Hey, > > the problem you describe it not totally fundamental, but there is no workaround in KS currently... So right now, only EOS can fix it. > > In the end, what you would need is a way to first write to the output topic (and flush the write), before you update the state store. > Yes, I reached the same conclusion. > Using the PAPI you could conceptually do `context.forward()` before you call `store.put(...)`, but the KS runtime hides too many low level controls that it would guarantee that `context.forward(...)` resulted in successful write into the sink downstream, before it returns and `state.put(...)` is executed... There is a missing `producer.flush()` step in-between that you cannot perform. > > Does this make sense? Yes, I thought of the same thing, but wasn't able to figure out the flush step ...

Re: Support for other OAuth2 grant types in Kafka

Hi Subra, On Thu, Feb 27, 2025, at 5:41 AM, Subra I wrote: > Thanks for the response Kirk. I will look at the links shared as well. > > It is clear from the earlier links that as of now, Kafka only supports > client credentials grant type by default. Am I right? (Unless we do a > custom implementation as mentioned by you) That is correct. If you do end up writing another implementation, it would be great if it could be contributed back to the community. I'd love to see more additions in this area. Thanks, Kirk > On Wed, Feb 26, 2025 at 11:46 PM Kirk True < kirk@kirktrue.pro > wrote: > > > Hi Subra, > > > > I'm one of the authors of the OAuth support in Kafka. Answers to your > > questions are below... > > > > On Tue, Feb 25, 2025, at 3:05 AM, Subra I wrote: > > > Hello All, > > > > > > I see that Kafka by itself supports client credentials as grant type for...

Re: Support for other OAuth2 grant types in Kafka

Thanks for the response Kirk. I will look at the links shared as well. It is clear from the earlier links that as of now, Kafka only supports client credentials grant type by default. Am I right? (Unless we do a custom implementation as mentioned by you) On Wed, Feb 26, 2025 at 11:46 PM Kirk True < kirk@kirktrue.pro > wrote: > Hi Subra, > > I'm one of the authors of the OAuth support in Kafka. Answers to your > questions are below... > > On Tue, Feb 25, 2025, at 3:05 AM, Subra I wrote: > > Hello All, > > > > I see that Kafka by itself supports client credentials as grant type for > > OAuth2. I see this mentioned in one of the kafka KIP as well: > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575 > > > > Is there a way to support other grant types as well? I came across the > > following page: > > > https://cwiki.apache.org/confluence/display/K...

Re: Picking up the thread on emit-on-change and at-least-once

Hey, the problem you describe it not totally fundamental, but there is no workaround in KS currently... So right now, only EOS can fix it. In the end, what you would need is a way to first write to the output topic (and flush the write), before you update the state store. Using the PAPI you could conceptually do `context.forward()` before you call `store.put(...)`, but the KS runtime hides too many low level controls that it would guarantee that `context.forward(...)` resulted in successful write into the sink downstream, before it returns and `state.put(...)` is executed... There is a missing `producer.flush()` step in-between that you cannot perform. Does this make sense? Down the line though, I think that TX state-stores can fix it (KIP-892). I believe that we can (and should) use this, even for ALOS mode, allowing us to roll back state of error, ie, all write to the store are "pending" until we do a commit, and for this case we first flu...

Re: Why Kafka client disconnect TCP connection when one request is timeout?

> why if a request is timeout, the tcp connection will be closed? As described in the wiki, the typical cause of request timeout is due to the leader failover, which requires the client to discover the new leader and reconnect. > not close the connection until connection.max.idle.ms The connection is actually closed on request timeout. Rather, the purpose of connection.max.idle.ms is to close connections which are no longer necessary. The typical case is to close bootstrap server connections after discovering the interested leader from bootstrap servers. Without this, any client may keep connections with bootstrap servers which might cause the broker's open fds to be exhausted. 2025年2月27日(木) 6:46 Hongshun Wang < loserwang1024@gmail.com >: > Each reconnection will need to authenticate. It cost a lot. > > On Tue, Feb 25, 2025 at 5:28 PM Hongshun Wang < loserwang1024@gmail.com > > wrote: > > > Hi, devs, > > ...

Re: Support for other OAuth2 grant types in Kafka

Hi Subra, I'm one of the authors of the OAuth support in Kafka. Answers to your questions are below... On Tue, Feb 25, 2025, at 3:05 AM, Subra I wrote: > Hello All, > > I see that Kafka by itself supports client credentials as grant type for > OAuth2. I see this mentioned in one of the kafka KIP as well: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575 > > Is there a way to support other grant types as well? I came across the > following page: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-XXXX%3A+Add+support+for+OAuth+jwt-bearer+grant+type > > Here it says that there is a proposal to support jwt bearer grant type as > well, but no details are mentioned and looks like it may be out only in > future. Yes, that KIP is a work in progress. I'm planning to submit a reviewable version of that KIP in the next couple of weeks. I'd love to get your input on it, so watch this mailin...

why if a request is timeout, the tcp connection will be closed

Hi, devs, I am curious about why if a request is timeout, the tcp connection will be closed?[1] ? When close a tcp connection, all the pending request will be return TimeoutException. Isn't a waste? I notice that connection.max.idle.ms [2] will close idle connections after the number of milliseconds specified. Why we just return TimeoutException for the target request, and not close the connection until connection.max.idle.ms ? I do want to know the design idea. Much appreciate if anyone can answer me? Best, Hongshun

Picking up the thread on emit-on-change and at-least-once

Hi kafka-users, We are implementing a Kafka Streams app that computes various streaming statistics over a corpus of data stored in Kafka topics. While some aggregates update often, others like 'min', 'max', or histogram buckets could have relatively few distinct updates relative to the input data. With our first implementation, we discovered that the majority of our time during catch-up processing is computing the same data redundantly over and over, at various stages of our pipeline. We then found that KIP-557, Emit-on-change support https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams is reverted and doesn't seem to be coming back in the short term. So, we've implemented a work-around, where we trade additional storage to suppress redundant updates. record Holder<V> (V val, boolean changed) {} inputStream .mapValues(v -> new Holder<>(v, true)) .aggregate( ...

Re: Why Kafka client disconnect TCP connection when one request is timeout?

Each reconnection will need to authenticate. It cost a lot. On Tue, Feb 25, 2025 at 5:28 PM Hongshun Wang < loserwang1024@gmail.com > wrote: > Hi, devs, > > I am curious about why if a request is timeout, the tcp connection will be > closed?[1] ? When close a tcp connection, all the pending request will be > return TimeoutException. Isn't a waste? > > I notice that connection.max.idle.ms [2] will close idle connections after > the number of milliseconds specified. Why we just return TimeoutException > for the target request, and not close the connection until > connection.max.idle.ms ? > > I do want to know the design idea. Much appreciate if anyone can answer me? > > > Best, > Hongshun > > > > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=55154824#KIP19AddarequesttimeouttoNetworkClient-Actionsafterrequesttimeout > [2] > https://docs.confluent....

Support for other OAuth2 grant types in Kafka

Hello All, I see that Kafka by itself supports client credentials as grant type for OAuth2. I see this mentioned in one of the kafka KIP as well: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575 Is there a way to support other grant types as well? I came across the following page: https://cwiki.apache.org/confluence/display/KAFKA/KIP-XXXX%3A+Add+support+for+OAuth+jwt-bearer+grant+type Here it says that there is a proposal to support jwt bearer grant type as well, but no details are mentioned and looks like it may be out only in future. 1. Any idea when support for jwt bearer grant type will be available? 2. Is there a way to support other grant types? Any references for the same? Thanks, Subra

Why Kafka client disconnect TCP connection when one request is timeout?

Hi, devs, I am curious about why if a request is timeout, the tcp connection will be closed?[1] ? When close a tcp connection, all the pending request will be return TimeoutException. Isn't a waste? I notice that connection.max.idle.ms < http://connection.max.idle.ms/ >[2] will close idle connections after the number of milliseconds specified. Why we just return TimeoutException for the target request, and not close the connection until connection.max.idle.ms < http://connection.max.idle.ms/ >? I do want to know the design idea. Much appreciate if anyone can answer me? Best, Hongshun [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=55154824#KIP19AddarequesttimeouttoNetworkClient-Actionsafterrequesttimeout < https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=55154824#KIP19AddarequesttimeouttoNetworkClient-Actionsafterrequesttimeout > [2] https://docs.confluent.io/platform/current/installation/configu...

Re: [VOTE] 4.0.0 RC0

Thanks David! A few ideas of things to test: 1. Test clients and streams with Java 11: this is the first time we use a different Java version for clients/streams vs the rest, so it would be good to ensure the generated artifacts are good when it comes to this. 2. Similarly, test brokers and connect (including some community connectors) with Java 17. 3. Test community clients (librdkafka & related, kafka-python, sarama, segment io, etc.) with Apache 4.0. 4. Check that logging is as you expect both with your existing log4j config and the new config that is now included with the distribution - the new config uses a native log4j2 config while the existing one would rely on the compatibility layer. 5. Run the quickstarts and look for zk specific things in the quickstarts or other documentation. 6. Follow the upgrade documentation and verify that upgrades work from older versions (3.3 or higher). 7. Check that the examples still work (i.e. they were not relying on depr...

[VOTE] 4.0.0 RC0

Hello Kafka users, developers and client-developers, This is the first candidate for release of Apache Kafka 4.0.0. We still have some remaining blockers but we figured that getting a first release candidate will help the community to test this major release. - This is the first release without Apache Zookeeper - The Next Generation of the Consumer Rebalance Protocol is Generally Available - The Transactions Server-Side Defense (Phase 2) is Generally Available - Queues for Kafka is in Early Access - Kafka uses log4j2 - Drop broker and tools support for Java 11 - Remove old client protocol API versions Release notes for the 4.0.0 release: https://dist.apache.org/repos/dist/dev/kafka/4.0.0-rc0/RELEASE_NOTES.html *** Please download and test the release. Voting is not necessary as we still have blockers. Kafka's KEYS file containing PGP keys we use to sign the release: https://kafka.apache.org/KEYS * Release artifacts to be voted upon (source and binary)...

Re: Kafka Producer send offsets to transaction method confusion

It would be best if you would comment about it on the Jira ticket and we can take it from there. I cannot (and to be honest, also don't want to) make a call on this, but hope that you and the other person can figure it out, who works on it... Happy to moderate the discussion of course, if necessary. -Matthias On 2/20/25 2:24 AM, Paweł Szymczyk wrote: > https://github.com/apache/kafka/pull/18977 > > czw., 20 lut 2025 o 07:48 Paweł Szymczyk < pawel.szymczyk90@gmail.com > > napisał(a): > >> Sure, I would love to continue working on this task and provide an >> interface, I see that someone else has been assigned to Jira recently, >> please let him know that I like to handle it on my own 🙂 >> >> >> Dnia 20 lutego 2025 03:51:02 CET, "Matthias J. Sax" < mjsax@apache.org > >> napisał/a: >> >>> Thanks for reaching out. This is actually very good feedback. >...

Re: Kafka Producer send offsets to transaction method confusion

https://github.com/apache/kafka/pull/18977 czw., 20 lut 2025 o 07:48 Paweł Szymczyk < pawel.szymczyk90@gmail.com > napisał(a): > Sure, I would love to continue working on this task and provide an > interface, I see that someone else has been assigned to Jira recently, > please let him know that I like to handle it on my own 🙂 > > > Dnia 20 lutego 2025 03:51:02 CET, "Matthias J. Sax" < mjsax@apache.org > > napisał/a: > >> Thanks for reaching out. This is actually very good feedback. >> >> In general, it seems that `ConsumerGroupMetadata` should be an interface, not a class, making it impossible for users to create an instance. >> >> Of course, we cannot do anything for older releases, and it seems the JavaDocs do explain it correctly... >> >> But I did file https://issues.apache.org/jira/browse/KAFKA-18836 -- feel free to pick it up if you have interest to contribute a fix for...

Re: Kafka Producer send offsets to transaction method confusion

Sure, I would love to continue working on this task and provide an interface, I see that someone else has been assigned to Jira recently, please let him know that I like to handle it on my own 🙂 Dnia 20 lutego 2025 03:51:02 CET, "Matthias J. Sax" < mjsax@apache.org > napisał/a: >Thanks for reaching out. This is actually very good feedback. > >In general, it seems that `ConsumerGroupMetadata` should be an interface, not a class, making it impossible for users to create an instance. > >Of course, we cannot do anything for older releases, and it seems the JavaDocs do explain it correctly... > >But I did file https://issues.apache.org/jira/browse/KAFKA-18836 -- feel free to pick it up if you have interest to contribute a fix for Apache Kafka 4.1 release. > > >-Matthias > > >On 2/19/25 9:17 AM, Paweł Szymczyk wrote: >> Hello! >> >> Recently, I had an opportunity to work with legacy code that was u...

Re: Kafka Producer send offsets to transaction method confusion

Thanks for reaching out. This is actually very good feedback. In general, it seems that `ConsumerGroupMetadata` should be an interface, not a class, making it impossible for users to create an instance. Of course, we cannot do anything for older releases, and it seems the JavaDocs do explain it correctly... But I did file https://issues.apache.org/jira/browse/KAFKA-18836 -- feel free to pick it up if you have interest to contribute a fix for Apache Kafka 4.1 release. -Matthias On 2/19/25 9:17 AM, Paweł Szymczyk wrote: > Hello! > > Recently, I had an opportunity to work with legacy code that was using an > old version of the Kafka native producer transactional API. Our goal was to > upgrade the code to the newest version and benefit from the upgraded > transactional API protocol. Initially, we found code that was highlighted > as deprecated ( > https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/Producer.ht...

Kafka Producer send offsets to transaction method confusion

Hello! Recently, I had an opportunity to work with legacy code that was using an old version of the Kafka native producer transactional API. Our goal was to upgrade the code to the newest version and benefit from the upgraded transactional API protocol. Initially, we found code that was highlighted as deprecated ( https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/Producer.html#sendOffsetsToTransaction-java.util.Map-java.lang.String- ): > kafkaProducer.sendOffsetsToTransaction(Map.of(new > TopicPartition(record.topic(), record.partition()), new > OffsetAndMetadata(record.offset())), groupId); > Our first approach was to use the non-deprecated API: kafkaProducer.sendOffsetsToTransaction(Map.of(new > TopicPartition(record.topic(), record.partition()), new > OffsetAndMetadata(record.offset())), new ConsumerGroupMetadata(groupId)); A few days later, after an extensive testing session, we finally read the new sendOffsetsToTr...

Re: 回复: Kafka 3.8.1 /tmp exec permission issue also has impact on Kafka client

You have to ser this variable in your start up parAmeters java.io.tmpdir Example export KAFKA_OPTS="-Djava.io.tmpdir=/your/custom/tmpdir" On Tue, 18 Feb 2025 at 1:26 PM, Bruno Cadonna < cadonna@apache.org > wrote: > Hi Benson, > > you got a reply from Josep on Feb 17th. > > Copying Josep's e-mail here for simplicity: > > Hi Xiong, > > The release notes are also in Github (both github.com/apache/kafka/ and > github.com/apache/kafka-site ) and we can fix them with a PR. Regarding > your > findings, what you say makes sense, and indeed the `/tmp` folder might need > exec permissions. > > Would you be up for a PR? > > Best, > > > > Best, > Bruno > > On 18.02.25 02:23, Xiong Benson wrote: > > + users@kafka.apache.org <mailto: users@kafka.apache.org > > > > > Dear Kafka community admins, > > > > Could you please reply to my...

Re: 回复: Kafka 3.8.1 /tmp exec permission issue also has impact on Kafka client

Hi Benson, you got a reply from Josep on Feb 17th. Copying Josep's e-mail here for simplicity: Hi Xiong, The release notes are also in Github (both github.com/apache/kafka/ and github.com/apache/kafka-site ) and we can fix them with a PR. Regarding your findings, what you say makes sense, and indeed the `/tmp` folder might need exec permissions. Would you be up for a PR? Best, Best, Bruno On 18.02.25 02:23, Xiong Benson wrote: > + users@kafka.apache.org <mailto: users@kafka.apache.org > > > Dear Kafka community admins, > > Could you please reply to my below question? The issue I mentioned below exposed a notable issue in kafka 3.8.1 client which has not been explicitly stated in any of the release note document. > > Thanks, > Benson > ________________________________ > 发件人: Xiong Benson < bo.xiong@outlook.com > > 发送时间: 2025年2月17日 22:50 > 收件人: dev@kafka.apache.org < dev@kafka.apache....

回复: Kafka 3.8.1 /tmp exec permission issue also has impact on Kafka client

+ users@kafka.apache.org <mailto: users@kafka.apache.org > Dear Kafka community admins, Could you please reply to my below question? The issue I mentioned below exposed a notable issue in kafka 3.8.1 client which has not been explicitly stated in any of the release note document. Thanks, Benson ________________________________ 发件人: Xiong Benson < bo.xiong@outlook.com > 发送时间: 2025年2月17日 22:50 收件人: dev@kafka.apache.org < dev@kafka.apache.org > 主题: Kafka 3.8.1 /tmp exec permission issue also has impact on Kafka client Dear Kafka devs, The Kafka's 3.8.1 release note mentioned [KAFKA-17227< https://issues.apache.org/jira/browse/KAFKA-17227 >] - Apache Kafka 3.8.0 /tmp exec permission where it brings me to https://kafka.apache.org/documentation/#upgrade_381_notable whose content is specifically for the kafka cluster(server). But this is not precise enough for the issue. Below are my summarized missing points that are not covered by http...

Re: Infinite Loop Running Kafka Connect/MM2 on Source Kafka Cluster

Greg! You're my hero, this was exactly what I was looking for. Apologies - my nicely formatted table got turned into very confusing plaintext __ I was assuming this was a bug and provided some detail on different versions I had tested on, but the problem was behind the keyboard! I even found this called out in the docs once I knew what I was looking for: https://kafka.apache.org/documentation/#connect_running On 2/13/25, 12:12, "Greg Harris" <greg.harris@aiven.io.INVA <mailto: greg.harris@aiven.io.INVA >LID> wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Mehrtens, The MirrorSourceConnector/Task relies on the Connect framework to instantiate the producer used for mirroring. The "target.cluster.bootstrap.servers" is indeed ineffective for changing the framework client configuration, that shou...

Re: Infinite Loop Running Kafka Connect/MM2 on Source Kafka Cluster

Hi Mehrtens, The MirrorSourceConnector/Task relies on the Connect framework to instantiate the producer used for mirroring. The "target.cluster.bootstrap.servers" is indeed ineffective for changing the framework client configuration, that should only affect clients instantiated within the task (admin clients, offset-syncs producers, etc) I think the configuration that you need to provide is "producer.override.bootstrap.servers", in addition to the configuration you already have. Some of your producer tuning may also need to be specified via "producer.override.*" in order to affect the mirrored records. I don't understand your test cases exactly, are you observing some version dependence here? I don't recall any recent changes which would invalidate the above information. Hope this helps, Greg On Thu, Feb 13, 2025 at 9:35 AM Mehrtens, Mazrim <mmehrten@amazon.com.invalid> wrote: > I've found that Kafka Conne...

Infinite Loop Running Kafka Connect/MM2 on Source Kafka Cluster

I've found that Kafka Connect never respects the "target.cluster.bootstrap.servers" configuration in the MirrorMaker2 task config. It always uses the Kafka Connect broker information instead. Running Kafka Connect on the source cluster causes an infinite loop of messages read from the source cluster, then written back to the same topic on the source cluster when using an IdentityReplicationPolicy. Running Kafka Connect on a third cluster causes the messages to get written to the Kafka Connect cluster, not the configured target cluster. Below are the scenarios I tested, and an example of the Kafka Connect task settings used. The only scenario that produced the correct result is running Kafka Connect on the target server. Is this a hard requirement? Am I misunderstanding how the MM2 configs get used in Kafka Connect? We generally recommend that for MirrorMaker2 applications, users run Kafka Connect against the "target" Kafka cluster to help minimize network lat...