Skip to main content

Posts

Showing posts from November, 2021

Re: Kafka Streams - left join behavior

Hi Miguel, > Is there a way to force the behavior I need, meaning... using left join and a JoinWindows output only one message (A,B) or (A, null) I think you can try to achieve it by using *KStream-GlobalKTable left join*, where the GlobalKTable should read all records at the right topic, and then doing the left join operation. This should then output either (A,B), or (A, null). Thank you. Luke On Tue, Nov 30, 2021 at 1:23 AM Miguel González < miguel.gonzalez@klar.mx > wrote: > Hello > > I have been developing a Kafka Streams app that takes as input two topics > as KStreams, processes them in some way and joins them and sends the > combined message to an output topic. > > Here's some code, > > final StreamJoined<String, TransactionEvent, BalanceEvent> joinParams = > StreamJoined.with( > STRING_SERDE, > StreamSerdeConstants.TRANSACTION_EVENT_SERDE, > StreamSerdeConst...

Kafka Streams - left join behavior

Hello I have been developing a Kafka Streams app that takes as input two topics as KStreams, processes them in some way and joins them and sends the combined message to an output topic. Here's some code, final StreamJoined<String, TransactionEvent, BalanceEvent> joinParams = StreamJoined.with( STRING_SERDE, StreamSerdeConstants.TRANSACTION_EVENT_SERDE, StreamSerdeConstants.BALANCE_EVENT_SERDE); JoinWindows joinWindows = JoinWindows .of(Duration.ofSeconds(streamsProperties.getJoinWindowDuration())) .grace(Duration.ofSeconds(streamsProperties.getJoinGraceDuration())); ValueJoiner<TransactionEvent, BalanceEvent, BalanceHistoryEvent> valueJoiner = (transactionEvent, balanceEvent) -> buildMessage(balanceEvent, transactionEvent); transactions // TODO: change to leftJoin .join(beWithTransaction, valueJoiner, joinWindows, joinParams) It's pretty simple, but for my use case I need to proc...

Kafka last produced and committed offsets during rebalancing.

Dear all, The below code snippet uses kafka admin client to retrieve the last committed and produced offsets of all partitions for a certain consumer group namely CONSUMER_GROUP : Map<TopicPartition, OffsetAndMetadata> offsets = admin.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get(); Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>(); for(TopicPartition tp: offsets.keySet()) { requestLatestOffsets.put(tp, OffsetSpec.latest()); } Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets = admin.listOffsets(requestLatestOffsets).all().get(); for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) { String topic = e.getKey().topic(); int partition = e.getKey().partition(); long committedOffset = e.getValue().offset(); long latestOffset = latestOffsets.get(e.getKey()).offset(); System.out.println("Consumer group " + CONSUME...

Facing split brain type of behaviour in Kafka Connect Mirrormaker

Hello community, Recently my team faced an issue with our Kafka Connect Mirrormaker cluster in which 1 partition was getting consumed and produced twice. The twice consumption and production scenarios were also confirmed by checking the BytesIn and BytesOut metrics from brokers. What happened was we added a couple of servers to our connect cluster and then we did face a network partition with the rack of the new allocated servers. After some time the connectivity was restored and the system was working fine. But after a couple of hours, we observed that one topic was receiving twice the amount of data it usually gets and all the messages were getting repeated twice. The same topic also had twice the consumption rate from the source cluster. At this point, we thought that the issue might be because of the mirrormaker and restarted the connector. Even after the restart, the issue was still there and the messages were still getting duplicated. At this point, I checked o...

Re: TLS certificate error with PEM AND KAFKA

Hi Sai Chandra Mouli T, Glad you found out the PKCS#8 pbe-sha1-rc4-128 (-v1) works well. In Kafka's unit test, we use the algorithm: pbeWithSHA1And3-KeyTripleDES-CBC (v1) to encrypt the key to do test. Actually, the v1/v2 means the algorithm in PCKS#5 v1/v2 (check here: https://www.openssl.org/docs/man1.1.1/man1/pkcs8.html ). I checked the PCKS#5 version 2 spec: https://datatracker.ietf.org/doc/html/rfc2898#section-6.2 , it said the v2 supported algorithm is "PBES2". If you check the sunJCE.java in openJDK source code: https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/com/sun/crypto/provider/SunJCE.java , you'll find the number in your error message: *1.2.840.113549.1.5.13 SecretKeyFactory not available, *and the it maps to "OID_PKCS5_PBES2" name. That is the new algorithm for PCKS#5 v2: "PBES2". Unfortunately, it is not supported in "SecretKeyFactory" engine. You can check the j...

TLS certificate error with PEM AND KAFKA

[2021-11-25 10:28:11,732] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2021-11-25 10:28:12,042] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util) [2021-11-25 10:28:12,166] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler) [2021-11-25 10:28:12,170] INFO starting (kafka.server.KafkaServer) [2021-11-25 10:28:12,171] INFO Connecting to zookeeper on localhost:2182 (kafka.server.KafkaServer) [2021-11-25 10:28:12,188] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2182. (kafka.zookeeper.ZooKeeperClient) [2021-11-25 10:28:12,192] INFO Client environment:zookeeper.version=3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 20:03 GMT (org.apache.zookeeper.ZooKeeper) [2021-11-25 10:28:12,192] INFO Client environment: host.name =lenovo-ideapad.sachmo...

Kafka Streams app process records until certain date

Hello For my use case I need to work with a chuck of records, let's say per month... We have over two years of data... and we are testing if we can deploy it to production, but we need to test in small batches. I have built a Kafka Streams app that processes two input topics and output to one topic. I would like to process the first two months of data. Is that possible? - I have tried blocking the consumer thread using .map and comparing the timestamp on the message and a timestamp I get from another system that would tell me until what time I should process on the two KStreams I have but I have noticed.I also increased MAX_POLL_INTERVAL_MS_CONFIG but I have noticed the messages that are in range do not get processed and sent to the output topic. - I have also seen a Spring Cloud library apparently offer a pause-resume feature. https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.5/reference/html/spring-cloud-stream-bind...

Re: Consumer failure after rolling Broker upgrade

Luke, We did not upgrade to resolve the issue. We simply restarted the failing clients. Regards, James. On 23/11/2021, at 16:10, Luke Chen < showuon@gmail.com <mailto: showuon@gmail.com >> wrote: Hi James, > Bouncing the clients resolved the issue Could you please describe which version you upgrade to, to resolve this issue? That should also help other users encountering the same issue. And the code snippet you listed, existed since 2018, I don't think there is any problem there. Maybe there are bugs existed in other places, and got fixed indirectly. Thank you. Luke On Tue, Nov 23, 2021 at 10:27 AM James Olsen < james@inaseq.com <mailto: james@inaseq.com >> wrote: We had a 2.5.1 Broker/Client system running for some time with regular rolling OS upgrades to the Brokers without any problems. A while ago we upgraded both Broker and Clients to 2.7.1 and now on the first rolling OS upgrade to the 2.7.1 Brokers we encountered some Co...

Re: Kafka mTLS authentication

Hi Yingjie, No worries! Glad to help! Luke On Tue, Nov 23, 2021 at 5:52 PM yingjie zou < yingjiezou1@gmail.com > wrote: > Hi Luke, > > This solved my problem. > I'm sorry to trouble you because I didn't read the document carefully. > Thank you very much. > > > Yingjie Zou > > On Tue, Nov 23, 2021 at 2:20 PM Luke Chen < showuon@gmail.com > wrote: > > > Hi Yingjie, > > > However, I meet a problem. If I need to add, remove or renew the > > certificate to Kafka's truststore, Kafka requires a reboot which would > > impact the service available for other teams. > > > > > So I want to know if there is a better way to support the change of > > Kafka's > > certificate without impacting the service availability? > > > > Yes, Kafka supports dynamically updating broker's configuration. Please > > check here: https://kafka.apache.o...

Re: Kafka mTLS authentication

Hi Luke, This solved my problem. I'm sorry to trouble you because I didn't read the document carefully. Thank you very much. Yingjie Zou On Tue, Nov 23, 2021 at 2:20 PM Luke Chen < showuon@gmail.com > wrote: > Hi Yingjie, > > However, I meet a problem. If I need to add, remove or renew the > certificate to Kafka's truststore, Kafka requires a reboot which would > impact the service available for other teams. > > > So I want to know if there is a better way to support the change of > Kafka's > certificate without impacting the service availability? > > Yes, Kafka supports dynamically updating broker's configuration. Please > check here: https://kafka.apache.org/documentation/#dynamicbrokerconfigs , > there's a section talking about "Updating SSL Truststore of an Existing > Listener", which should be what you're looking for. > > Good luck. > > Thank you....

Re: Kafka mTLS authentication

Hi Yingjie, > However, I meet a problem. If I need to add, remove or renew the certificate to Kafka's truststore, Kafka requires a reboot which would impact the service available for other teams. > So I want to know if there is a better way to support the change of Kafka's certificate without impacting the service availability? Yes, Kafka supports dynamically updating broker's configuration. Please check here: https://kafka.apache.org/documentation/#dynamicbrokerconfigs , there's a section talking about "Updating SSL Truststore of an Existing Listener", which should be what you're looking for. Good luck. Thank you. Luke On Tue, Nov 23, 2021 at 1:12 PM yingjie zou < yingjiezou1@gmail.com > wrote: > Hi, > > Currently, we are going to provide Kafka services to 20+ development teams > in my company, we'd like to provide that as multi-tenancy - the different > team has different authentication. An...

Re: EOL clarification

Hi Piyush, Sorry for the late reply. Yes, there is a high possibility that there won't be any support for 2.x release unleash high severity issue, after v3.1 released. Thank you. On Tue, Nov 16, 2021 at 7:05 PM Piyush Mittal < piyush.ml20@gmail.com > wrote: > Hi Luke, > > One small clarification with regards to 2 most recent releases - Once v3.1 > is released, there won't be any support for 2.x.x unless special case like > 2.6.3 https://lists.apache.org/thread/rj82rxch4tz19tjxj70v4o9kb7hsnrhb and > teams should plan for major upgrade from 2.x.x to 3.x.x for next calendar > year. Is that correct? > > Thanks and Regards > Piyush Mittal > > > On Tue, Nov 16, 2021 at 1:41 PM Luke Chen < showuon@gmail.com > wrote: > > > Hi Piyush, > > > As an example, let's say I am on 2.4.1 right now. If it's receiving > > security and bug fixes then I don't see any need to upgrade to...

Kafka mTLS authentication

Hi, Currently, we are going to provide Kafka services to 20+ development teams in my company, we'd like to provide that as multi-tenancy - the different team has different authentication. And we try to use the Kafka mTLS solution. However, I meet a problem. If I need to add, remove or renew the certificate to Kafka's truststore, Kafka requires a reboot which would impact the service available for other teams. So I want to know if there is a better way to support the change of Kafka's certificate without impacting the service availability? Any help is appreciated. Thanks. Yingjie Zou

Re: Pause/Restart a Kafka streams app

You can only close() the Kafka Streams client and create a new one to resume (offsets are committed on close() and thus would be picked up on restart). Closing and restarting would result in rebalancing thought, so to really pause/resume you would need to close() all instances. There is no API to pause()/resume() similar to what the KafkaConsumer offers. -Matthias On 11/22/21 2:10 PM, Miguel González wrote: > Hello there > > Is it possible to pause/restart a Kafka streams app? I have only found this > discussion > https://groups.google.com/g/confluent-platform/c/Nyj3eN-3ZlQ/m/lMH-bFx-AAAJ > about using map to call an external service and loop until some condition > completes > > regards > - Miguel >

Pause/Restart a Kafka streams app

Hello there Is it possible to pause/restart a Kafka streams app? I have only found this discussion https://groups.google.com/g/confluent-platform/c/Nyj3eN-3ZlQ/m/lMH-bFx-AAAJ about using map to call an external service and loop until some condition completes regards - Miguel

delete.retention.ms=0 impact on changelog topics

Hello! We are currently getting hit by https://issues.apache.org/jira/browse/KAFKA-8522 . We've tried setting the delete.retention.ms to be very low, but it still doesn't allow our topic to be cleansed of deletes. Setting it to 0 appears to be the only solution. Our suppress and window changelog topics have min.cleanable.dirty.ratio set to 0.05 (the topic can get pretty big, which makes restores painfully slow), and a segment.ms of 12 hours to allow the topic to stay as small as possible without stressing the log cleaner tooo much. With this background, my question it: will setting delete.retention.ms to 0 cause any unforeseen side effects for our restoring windows/suppress stores? Would it be possible to consume a valid message and then miss the following tombstone with these settings? And then maybe somehow cause our suppress store to emit an old should-have-already-been-deleted record? Or is the only downside the extra work we are causing in the log cleaner? Than...

Re: uneven distribution of events across kafka topic partitions for small number of unique keys

I'm sorry. I misread your message. I thought you were asking about increasing the number of partitions on a topic after there were keyed events in it. > On Nov 22, 2021, at 3:07 AM, Pushkar Deole < pdeole2015@gmail.com > wrote: > > Dave, > > i am not sure i get your point... it is not about lesser partitions, the > issue is about the duplicate hash caused by default partitioner for 2 > different string, which might be landing the 2 different keys into same > partition > >> On Sun, Nov 21, 2021 at 9:33 PM Dave Klein < daveklein@usa.net > wrote: >> >> Another possibility, if you can pause processing, is to create a new topic >> with the higher number of partitions, then consume from the beginning of >> the old topic and produce to the new one. Then continue processing as >> normal and all events will be in the correct partitions. >> >> Regards, >> Dave >> ...