Skip to main content

Posts

Showing posts from May, 2022

Re: KStreams State Store - state.dir does not have .checkpoint file

Hi Neeraj, Thanks for all that detail! Your expectation is correct. You should see the checkpoint files after a _clean_ shutdown, and then you should not see it bootstrap from the beginning of the changelog on the next startup. How are you shutting down the application? You'll want to call KafkaStreams#stop and wait for it to complete before stopping the java process. I hope this helps, -John On Tue, May 31, 2022, at 23:09, Neeraj Vaidya wrote: > Hi All, > I have a KStreams application running inside a Docker container which > uses a persistent key-value store. > > I have configured state.dir with a value of /tmp/kafka-streams (which > is the default). > > When I start this container using "docker run", I mount > /tmp/kafka-streams to a directory on my host machine which is, say for > example, /mnt/storage/kafka-streams. > > My application.id is "myapp". I have 288 partitions in my input topic ...

Re: number of request

Hi, I can't understand why i see too much "fetch, version=xx request on grafana dashboard. Do you have any idea? Because I see over 20M req/s request fetch version=13 Best Regards 31 May 2022 Sal 21:55 tarihinde aydemir kala < aydemir.kala@gmail.com > şunu yazdı: > Hi, > I can't understand why i see too much "fetch, version=xx request. I > attached screenshoot. Do you have any idea? Because I see over 20M req/s > request fetch version=13 > > Best Regards > >

Re: Newbie how to get key/value pojo out of a stream?

Thanks Luca This is exactly what I was looking for. On a related note let's say I stop and restart my application. What would I have to do so that the I do not re process events? I am still working through the kstreams 101 tutorial. I have not gotten to the DSL tutorials yet Andy On 5/30/22, 11:16 PM, "Luca" < ciao@lucapette.me > wrote: Hi Andy, If I understand your problem correctly, you want a "foreach" terminal operation. You can check out the API here: https://kafka.apache.org/32/documentation/streams/developer-guide/dsl-api.html Luca On Tue, May 31, 2022, at 6:37 AM, Andy wrote: > All the Kstream examples I have found demonstrate how to use map, filter, > and join on streams. The last step they typically user to() to > publish/produce the results to a new stream > > How can I get the data out of the stream? For example I need to send the > data to a legacy da...

RE: Connection to node 432 (localhost/127.0.0.1:9094) could not be established. Broker may not be available

Hi Richard, I use *listeners* now only and it's working, thank you! I wondered why *advertised.listeners* works only with default port 9092... Best regards, Evgeny Business Application Support VTB Capital Telephone.: +7 (495) 960 9999 (ext.264423) Mobile: +7 (916) 091-8939 -----Original Message----- From: Richard Bosch < richard.bosch@axual.com > Sent: 31 Š¼Š°Ń 2022 г. 11:55 To: users@kafka.apache.org Subject: Re: Connection to node 432 (localhost/ 127.0.0.1:9094 ) could not be established. Broker may not be available Hi Evgeny, In your configuration file the *advertised.listeners* property is set to the new port number. For Kafka you need to set the *listeners* property to listen on the port number. The *advertised.listeners* property is used to specify another name/port number in case you use routing services or other hostnames to connect from other networks. If you comment out the *advertised.listeners* and use *listeners* instead with the ...

Re: Connection to node 432 (localhost/127.0.0.1:9094) could not be established. Broker may not be available

Hi Evgeny, In your configuration file the *advertised.listeners* property is set to the new port number. For Kafka you need to set the *listeners* property to listen on the port number. The *advertised.listeners* property is used to specify another name/port number in case you use routing services or other hostnames to connect from other networks. If you comment out the *advertised.listeners* and use *listeners* instead with the new port number it should work. Kind regards, Richard Bosch On Mon, May 30, 2022 at 5:01 PM Ivanov, Evgeny < Evgeny.Ivanov@vtbcapital.ru > wrote: > Hi! > > Could you please advise why I can't run Kafka broker with different port > number ? > It's working fine with default port 9092, but when I change it to 9093 or > 9094 for instance, I get the following errors: > > [2022-05-30 13:16:28,456] DEBUG [BrokerToControllerChannelManager > broker=432 name=forwarding]: Controller isn't cac...

Connection to node 432 (localhost/127.0.0.1:9094) could not be established. Broker may not be available

Hi! Could you please advise why I can't run Kafka broker with different port number ? It's working fine with default port 9092, but when I change it to 9093 or 9094 for instance, I get the following errors: [2022-05-30 13:16:28,456] DEBUG [BrokerToControllerChannelManager broker=432 name=forwarding]: Controller isn't cached, looking for local metadata changes (kafka.server.BrokerToControllerRequestThread) [2022-05-30 13:16:28,456] DEBUG [BrokerToControllerChannelManager broker=432 name=forwarding]: No controller defined in metadata cache, retrying after backoff (kafka.server.BrokerToControllerRequestThread) [2022-05-30 13:16:28,457] INFO [/kafka-acl-extended-changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread) [2022-05-30 13:16:28,469] INFO [Controller id=432, targetBrokerId=432] Node 432 disconnected. (org.apache.kafka.clients.NetworkClient) [2022-05-30 13:16:28,471] WARN [Controller id=432, targetBroke...

Reset Kafka TestContainers after each junit test without destroying container

Cross-posting from stackoverflow < https://stackoverflow.com/questions/72420966/reset-kafka-testcontainers-without-clearing-and-recreating-the-testcontainer-aft > < https://stackoverflow.com/posts/72420966/timeline > I am using kafka testcontainers with JUnit5. Can someone let me know how can I delete data from Kafka testcontainers after each test so that I don't have to destroy and recreate the kafka testcontainer every time. - Test Container Version - 1.6.2 - Docker Kafka Image Name - confluentinc/cp-kafka:5.2.1

unsubscribed from all topics when adding a KTable

Hi, I'm trying to write a very basic Kafka streams consumer in Java. Once I add a KTable, I see a message in the server log that I have been unsubscribed from all topics. Doing the same with a KStream instead of KTable works fine for me. I'm using Kafka version 3.2.0 (kafka_2.13-3.2.0) and am running on raspbian OS. I tried modifying the group.initial.rebalance.delay.ms in the server properties but this did not help. The message I get in the server log is: [2022-05-28 00:29:43,989] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group streams-wiki-created-table in Empty state. Created a new member id streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator) [2022-05-28 00:29:44,055] INFO [GroupCoordinator 0]: Preparing to rebalance group streams-wiki-created-table in state Prepari...

How it is safe to break message ordering but not idempotency after getting an OutOfOrderSequenceException?

The docs say "This exception indicates that the broker received an unexpected sequence number from the producer, which means that data may have been lost. If the producer is configured for idempotence only (i.e. if enable.idempotence is set and no transactional.id is configured), it is possible to continue sending with the same producer instance, but doing so risks reordering of sent record" Isn't the broker using the monotonically increasing sequence number to dedup messages? So how can it break message ordering without breaking idempotency? I can't see an example scenario where this could happen, I guess the OutOfOrderSequenceException can only happen with max.in.flight.requests.per.connection > 1, but even in that case why are not going to keep getting an OutOfOrderSequenceException but instead a success that broke message ordering? Thanks.

Can CooperativeStickyAssignor be used with transactions?

This is the scenario I have in mind 1. Client A gets assigned partitions P1 and P2. 2. Client A polls a message with offset X from P1, opens a transaction and produces to some output topic. 3. Client B joins the group and gets assigned P2 4. Client A tries to sendOffsets with group metadata but is fenced (due to the rebalance that bumped the generation.id ). At this moment I can't use that producer anymore so I will just "ignore" that message, since the broker will abort the transaction. 5. Client A polls again and will get the message with offset X + 1, since that partition wasn't revoked due to the cooperative rebalance. In an eager rebalance this can't happen since the rebalance that bumped the generation.id and fenced my producer will also revoke the partitions and force my consumer to fetch the offset again and poll the message with offset X again. I couldn't test this locally yet since isn't trivial, but could this happen in theo...

why includeMetadataInTimeout could be set only in deprecated api

Hi kafka team After check the kafka 2.8.0 code, seems org.apache.kafka.clients.consumer.KafkaConsumer#poll(org.apache.kafka.common.utils.Timer, boolean) allow user to keep poll records during consumer-rebalance when use CooperativeStickyAssignor, and allow this behaivor must set includeMetadataInTimeout = false, am I right? But seems this parameter only could be set use the deprecated api poll(final long timeoutMs), I want to know why do such design, does set includeMetadataInTimeout=false not a recommend idea? thx.

[ANNOUNCE] Call for Speakers is open for Current 2022: The Next Generation of Kafka Summit

Hi everyone, We're very excited to announce our Call for Speakers for Current 2022: The Next Generation of Kafka Summit! With the permission of the ASF, Current will include Kafka Summit as part of the event. We're looking for talks about all aspects of event-driven design, streaming technology, and real-time systems. Think about Apache KafkaĀ® and similar technologies, and work outwards from there. Whether it's a data engineering talk with real-time data, software engineering with message brokers, or event-driven architectures—if there's data in motion, then it's going to be relevant. The talk tracks are as follows: - Developing Real-Time Applications - Streaming Technologies - Fun and Geeky - Architectures You've Always Wondered About - People & Culture - Data Development Life Cycle (including SDLC for data, data mesh, governance, schemas) - Case Studies - Operations and Observability - Pipelines Done Right - Real-Time An...

Re: Connect task configuration limits?

Hi UrbƔn, I observed that if I update log4j.properties and set "log4j.logger.org.apache.kafka.clients=DEBUG" I see an ERROR swallowed in a DEBUG message: [2022-05-23 16:23:25,272] DEBUG [Producer clientId=producer-3] Exception occurred during message send: (org.apache.kafka.clients.producer.KafkaProducer:1000) org.apache.kafka.common.errors.RecordTooLargeException: The message is 3503993 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. This confirms what you suggested. After adding "max.request.size=5242880" inside config/connect-distributed.properties and setting the "max.message.bytes" property of the connect-configs topic the connector now works. Thanks for the help, Ryan On Sat, May 21, 2022 at 10:21 AM UrbƔn DƔniel < urb.daniel7@gmail.com > wrote: > Hi Ryan, > > There are some limits, as the configs are stored inside an internal...

Re: Request to include me to contributors list

Thanks for your interest! Please let me know what's your id? Guozhang On Sun, May 22, 2022 at 6:05 AM Kumud Kumar Srivatsava Tirupati < kumudkumartirupati@gmail.com > wrote: > Hi team, > I am willing to work on > https://issues.apache.org/jira/browse/KAFKA-13926 please > add me to the contributors list so that I can assign the ticket to myself. > > *---* > *Thanks and Regards,* > *Kumud Kumar Srivatsava Tirupati* > *Ph : +91-8686073938* > -- -- Guozhang

Re: Request to include me to contributors list

Hi, I've added you to the contribution list, and assigned this ticket to you. Thanks for the interest in Apache Kafka. Luke On Mon, May 23, 2022 at 12:52 AM Kumud Kumar Srivatsava Tirupati < kumudkumartirupati@gmail.com > wrote: > Hi team, > I am willing to work on https://issues.apache.org/jira/browse/KAFKA-13926 > please add me to the contributors list so that I can assign the ticket to > myself. > *---* > *Thanks and Regards,* > *Kumud Kumar Srivatsava Tirupati* >

Re: Connect task configuration limits?

Hi Ryan, There are some limits, as the configs are stored inside an internal topic of Connect. So the usual message size and producer request size limitations apply. You can reconfigure the internal topic to allow larger messages than the default (I think it's 1MB), and the producer max request size for the Connect worker. Besides that, there is one extra bottleneck - Connect followers might forward the task configs to the leader through the REST API. The default request size limit of the REST client can also cause an issue, not sure if there is a way to reconfigure that. I think you should be able to find some kind of a related error message, probably in the leader of the cluster, or the worker which was hosting the connector itself. Daniel 2022. 05. 19. 17:01 keltezƩssel, Ryan Slominski ƭrta: > Hi, > Are there limits to the size of configuration data passed via the > taskConfigs method of the Connector class? I'm ob...

Re: kafka stream - sliding window - getting unexpected output

Not sure atm. It seems you are printing the timestamp extracted from the payload: > out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp())); Does this timestamp really map to the window? You remove the window information so maybe you are looking at the wrong data? > .map((Windowed<String> key, OutputPojo out) -> { > return new KeyValue<>(key.key(),out) ; > }) For the input: Do you use a custom timestamp extractor and use the payload timestamp? If not, does the record timestamp and the payload timestamp match? -Matthias On 5/18/22 11:32 PM, Shankar Mane wrote: > @Matthias J. Sax / All > > Have added below line : > >> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) >> >> > > Here is the output : (for uuid (*2cbef750-325b-4a2f-ac39-b2c23fa0313f)*, > expecting single output but that is not the case here. Which 1 is the final ...

Re: [ANNOUNCE] Apache Kafka 3.2.0

Hi James, Thank you! I corrected the mistake. Best, Bruno On 19.05.22 21:30, James Cheng wrote: > Bruno, > > Congrats on the release! > > There is a small typo on the page. >> KIP-791 < https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context > adds method recordMetada() to the StateStoreContext, > > Should be >> KIP-791 < https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context > adds method recordMetadata() to the StateStoreContext, > > I know that the page has already been published, but should we fix that typo? > > Thanks! > -James > > >> On May 17, 2022, at 9:01 AM, Bruno Cadonna < cadonna@apache.org > wrote: >> >> The Apache Kafka community is pleased to announce the release for Apache Kafka 3.2.0 >> >> * log4j 1.x is replaced with reload4j (KAFKA...

Re: [ANNOUNCE] Apache Kafka 3.2.0

Bruno, Congrats on the release! There is a small typo on the page. > KIP-791 < https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context > adds method recordMetada() to the StateStoreContext, Should be > KIP-791 < https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context > adds method recordMetadata() to the StateStoreContext, I know that the page has already been published, but should we fix that typo? Thanks! -James > On May 17, 2022, at 9:01 AM, Bruno Cadonna < cadonna@apache.org > wrote: > > The Apache Kafka community is pleased to announce the release for Apache Kafka 3.2.0 > > * log4j 1.x is replaced with reload4j (KAFKA-9366) > * StandardAuthorizer for KRaft (KIP-801) > * Send a hint to the partition leader to recover the partition (KIP-704) > * Top-level error code field in DescribeLogDirsResponse (KIP-784) ...

Connect task configuration limits?

Hi, Are there limits to the size of configuration data passed via the taskConfigs method of the Connector class? I'm observing a situation where if I use a large configuration no tasks are created, and no log messages appear in the connect log file. Using a smaller configuration works. If there are limits, can I increase them? Also, it's probably a good idea if Kafka were to log a warning message of some kind in this scenario instead of silently failing. I'm using a custom Source Connector and I have documented steps to reproduce the issue using Docker compose here: https://github.com/JeffersonLab/epics2kafka/issues/11 Thanks for any insights! Ryan

Re: Leverage multiple disks for kafka streams stores

Hi Adrian, Thank you for the additional information! One reason to have a single folder is that Streams also stores metadata that refers to all state stores in the state directory. That could be changed if we have a good reason. If you have a good idea to solve this issue, please feel free to open a KIP. Would be glad to discuss such a KIP. Best, Bruno On 19.05.22 15:40, Adrian Tubio wrote: > Hi Bruno, > > Thanks a lot for your answer. > > I have tried to tune store by store to the best of my ability, and indeed I > have managed to improve considerably. We even changed the disk to a much > faster one. But it's still not enough. > > Yes we can try dividing the application up into sub applications to make > use of different disks, but it feels like an artificial solution. > > There might be reasons I don't know of to have a single folder for all > stores, but it feels limiting, especially if you conside...

Re: Leverage multiple disks for kafka streams stores

Hi Bruno, Thanks a lot for your answer. I have tried to tune store by store to the best of my ability, and indeed I have managed to improve considerably. We even changed the disk to a much faster one. But it's still not enough. Yes we can try dividing the application up into sub applications to make use of different disks, but it feels like an artificial solution. There might be reasons I don't know of to have a single folder for all stores, but it feels limiting, especially if you consider that you can plugin other types of stores instead of rocks db which doesn't even use local disk. If my CPU is ok, my memory is ok and the only limiting factor is Disk, why not allow the usage of multiple disks instead? Especially in cloud deployments in which you can arbitrarily attach multiple volumes, sometimes it is cheaper to use several cheaper volumes in parallel than a single very expensive one. I personally believe that this should be considered for a ...

Re: Leverage multiple disks for kafka streams stores

Hi Adrian, I am afraid that you cannot set the state directory for a single state store to a different directory than all other stores. Maybe the following blog post can help you debug and solve your issue: https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance Specifically look at the section "High disk I/O and write stalls": https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance/#write-stalls Best, Bruno On 19.05.22 10:56, Adrian Tubio wrote: > Hi there, > > My kafka streams topology has one store that is particularly busy, that > alongside other stores in the same topology is exhausting I/O which leads > to write stalls and increased latency. > > The amount of compaction that this store does with regards to others is > about 3/4 times more, so we were wondering if, since we have more > disks/volumes available, would it be possible to set a dif...

Final reminder: ApacheCon North America call for presentations closing soon

[Note: You're receiving this because you are subscribed to one or more Apache Software Foundation project mailing lists.] This is your final reminder that the Call for Presetations for ApacheCon North America 2022 will close at 00:01 GMT on Monday, May 23rd, 2022. Please don't wait! Get your talk proposals in now! Details here: https://apachecon.com/acna2022/cfp.html --Rich, for the ApacheCon Planners

Leverage multiple disks for kafka streams stores

Hi there, My kafka streams topology has one store that is particularly busy, that alongside other stores in the same topology is exhausting I/O which leads to write stalls and increased latency. The amount of compaction that this store does with regards to others is about 3/4 times more, so we were wondering if, since we have more disks/volumes available, would it be possible to set a different path for this store so it falls into a different disk? I don't seem to be able to find any way to do it, ideally it should be done via RocksDbConfigSetter, but that doesn't seem to offer that possibility as it seems the state store comes from StateStoreContext which is initialized from the STATE_DIR_CONFIG global setting. Has anyone done something similar? Best regards, Adrian Tubio

Re: kafka stream - sliding window - getting unexpected output

@Matthias J. Sax / All Have added below line : > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > Here is the output : (for uuid (*2cbef750-325b-4a2f-ac39-b2c23fa0313f)*, expecting single output but that is not the case here. Which 1 is the final output from those 2 rows for the same uuid ? [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, sum=10.0, > strTime=2022-05-19 11:48:08.128, uuid=fb6bea5f-8fd0-4c03-8df3-aaf392f04a5a) [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0, > strTime=2022-05-19 11:48:10.328, uuid=b4ab837f-b10a-452d-a663-719215d2992f) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0, > strTime=2022-05-19 11:48:12.527, uuid=8fa1b621-c967-4770-9f85-9fd84999c97c) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0, > strTime=2022-05-19 11:48:14.726, uuid=1fc21253-7859-45ef-969e-82ed596c4fa0) > [K...

Re: kafka stream - sliding window - getting unexpected output

Emitting intermediate result is by-design. If you don't want to get intermediate result, you can add `suppress()` after the aggregation and configure it to only "emit on window close". -Matthias On 5/17/22 3:20 AM, Shankar Mane wrote: > Hi All, > > Our use case is to use sliding window. (for e.g. at any point, whenever >> user performs any actions at time [ t1 ], we would like to see his activity >> in [ t1 - last 24 hours]. Using this, to show the user some recommendations. > > > > -- I have code ready and it works without any errors. > -- aggregations happen as expected. > -- but the output generated is unexpected. As windows gets slides, i am > getting mixed output which includes intermediate aggregated records also > coming with final aggregated outputs. > > Could someone please help me here ? what can I do here to get ONLY final > aggregated output. > > > Code snippet...