Skip to main content

Posts

Showing posts from October, 2020

Re: Isolation.level default behavior

I am not very sure about the isolation.level setting. However duplicates may cause due to the commit failed on the consumer side. Please do read about max.poll.interval.ms and max.poll.records settings. You may get some valuable inputs. Recently i solved duplicates issue in my consumer by tuning above values. Hope it helps. Regards, Sunil. On Thu, 29 Oct 2020 at 3:38 PM, usha rani < ushanarayana5@gmail.com > wrote: > Hi Team, > > Recently we ended up having huge duplicates sending out of the connector > due to the segment roll out ended in creating out of range issue. To avoid > the occurance of the above issue we made couple of changes as part of it we > are planning to change 'isolation.level' to 'read_committed'. > > So before making that change I wanted to understand why the default of > 'isolation.level' is 'read_uncommitted' and not 'read_committed'. When I > see read_uncommi...

Re: Partitioning per team

Hello Jan, One alternative approach you can consider is to use combo <team, user> as the key, hence it achieves the small aggregation, while customizing your partitioner for the repartition topic such that keys with the same <team> prefix always go to the same partition. Then when cleaning up data, similarly within the store you can do a range on prefix <team> and delete all entries of <team, user> when the team is removed. Guozhang On Mon, Oct 26, 2020 at 1:39 PM Jan Bols < janbols@telenet.be > wrote: > For a kafka-streams application, we keep data per team. Data from 2 teams > never meet but within a team, data is highly integrated. A team has team > members but also has several types of equipment. > A team has a lifespan of about 1-3 days after which the team is removed and > all data relating to that team should be evicted. > > How would you partition the data? > - Using the team id as key for all s...

Re: Isolation.level default behavior

Hi Team, Any update on the below query will be appreciated. Your answer helps us in making the changes to our connectors. Thanks, Usha On Thu, 29 Oct 2020, 11:08 usha rani, < ushanarayana5@gmail.com > wrote: > Hi Team, > > Recently we ended up having huge duplicates sending out of the connector > due to the segment roll out ended in creating out of range issue. To avoid > the occurance of the above issue we made couple of changes as part of it we > are planning to change 'isolation.level' to 'read_committed'. > > So before making that change I wanted to understand why the default of > 'isolation.level' is 'read_uncommitted' and not 'read_committed'. When I > see read_uncommitted end up in dirty read and producing duplicates. > > Thanks, > Usha >

Re: Topic partition reassignment fails because of non-monotonic offsets (Kafka 2.6)

We were ultimately able to solve this issue - mainly by sitting and waiting The issue was indeed that somewhen, somehow, the data on the leader of this __consumer_offset-18 partition got corrupted. This probably happened during the upgrade from Kafka 2.2 -> 2.6. We were doing this in a rather dangerous way, as we know now: we simply stopped all brokers, updated the SW on all, and restarted them. We thought since we can afford this outage on the weekend, this would be a safe way. But we will certainly never do that again. At least not unless we know 100% that all producers and all consumers are really stopped. This was NOT the case during that upgrade, we overlooked one consumer and left that running, and that consumer (group) was storing their offsets in the __consumer_offset-18 partition. So that action - taking all brokers down and upgrade them - while consumers/producers are still running, did probably cause the corruption Lesson learnt: never do that again, alw...

kafka Brokers Leader Skewed

Hi, I have 10 node kafka cluster. Both kafka broker and zookeeper are running on each node. Recently 2 node was down (2 and 4). I have a topic with 60 partitions and 3 replication. In kafka manager,  Brokers Skewed %  is showing as 50 and  Brokers Leader Skewed %  as 70. I have manually reassigned the partition from UI and  Brokers Skewed %  is 0 now but it didn't change  Brokers Leader Skewed % . I also ran the command: $ /kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file topic.json Warning: --zookeeper is deprecated and will be removed in a future version of Kafka. Use --bootstrap-server instead to specify a broker to connect to. Created preferred replica election path with ... Successfully started preferred replica election for partitions Set(.. but it didn't change anything. I can see on UI that brokers 8 and 10 have no leader. How can I rebalance leaders across all brokers evenly? I read that cluster rolling restart of all broker can s...

Re: MirrorMaker 2 Reload Configuration

Hi Folks, I've also ran a console-consumer on the `mm2-configs` kafka topic created by the mirror maker and found that even after the restart of the mirror maker 2 with new config, the config registered in the mm2-configs kafka topic is still pointing to a legacy mirror maker configuration. Thanks On Fri, Oct 30, 2020 at 7:18 AM Devaki, Srinivas < me@eightnoteight.space > wrote: > > Hi Folks, > > I'm running mirror maker as a dedicated cluster as given in the > mirrormaker 2 doc. but for some reason when I add new topics and > deploy the mirror maker it's not detecting the new topics at all, even > the config dumps in the mirror maker startup logs don't show the newly > added topics. > > I've attached the config that I'm using, initially I assumed that > there might be some refresh configuration option either in connect or > mirror maker, but the connect rest api doesn't seem to be working in...

MirrorMaker 2 Reload Configuration

Hi Folks, I'm running mirror maker as a dedicated cluster as given in the mirrormaker 2 doc. but for some reason when I add new topics and deploy the mirror maker it's not detecting the new topics at all, even the config dumps in the mirror maker startup logs don't show the newly added topics. I've attached the config that I'm using, initially I assumed that there might be some refresh configuration option either in connect or mirror maker, but the connect rest api doesn't seem to be working in this mode and also couldn't find any refresh configuration option. Any ideas on this? Thank you in advance ``` clusters = src-cluster, dst-cluster # disable topic prefixes src-cluster.replication.policy.separator = dst-cluster.replication.policy.separator = replication.policy.separator = source.cluster.alias = target.cluster.alias = # enable idemptotence source.cluster.producer.enable.idempotence = true target.cluster.producer.enable...

Isolation.level default behavior

Hi Team, Recently we ended up having huge duplicates sending out of the connector due to the segment roll out ended in creating out of range issue. To avoid the occurance of the above issue we made couple of changes as part of it we are planning to change 'isolation.level' to 'read_committed'. So before making that change I wanted to understand why the default of 'isolation.level' is 'read_uncommitted' and not 'read_committed'. When I see read_uncommitted end up in dirty read and producing duplicates. Thanks, Usha

Sending messages within ProducerInterceptor callback?

Hi, I could not find any documentation or posts about the behaviour of a ProducerInterceptor if the callback '*onSend*' is used to send additional events to Kafka, so I was hoping somebody here could help me understand its behaviour. I would like to send a subset of outgoing messages (being sent across different topics) to another topic based on Kafka record headers (In an application using the streaming api) This would mean that the implementation of ProducerInterceptor would be sending an additional Kafka message. Is this a reasonable way to use the producer interceptor, or could this cause problems ? Our streaming app is using transactional sends .. how do messages sent from within the 'onSend' interceptor call interact with the current transaction? (i.e. would they become part of the same transaction?) Thanks, Ross

Mechanism similar to __consumer_offsets but for consumer.endOffsets

Hey, Is there any way to continuously get end offsets for all partitions from the topic or across the whole cluster as with consumer offsets? Periodic polling consumer.endOffsets < https://github.com/apache/kafka/blob/dffc7f8c30824cb6bf38c05838a466488cbb1f81/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java#L239 > is one option, but maybe it can be done in a different way. -- BR, Michał Łowicki

Re: Kafka Streams application stuck rebalancing on startup

This block: @EmbeddedKafka( topics = { "WordCounts", "WordsForNumbers", "OutputTopic" } ) starts up an embedded Kafka in the test and creates the 3 topics (2 input and 1 output). By default it creates them with 2 partitions each, but changing to 1 partition didn't alter the endless-rebalancing behavior. We also see the endless rebalancing behavior in a real Kafka cluster, using input and output topics that have already been created (and are readily consumed from and written to). On Wed, Oct 28, 2020 at 12:45 PM Sophie Blee-Goldman < sophie@confluent.io > wrote: > Yeah there's definitely something weird going on (assuming this is the full > logs over that > time period). The last thing we see logged from the StreamThread is this > message from > around the start of the task assignment process: > > 2020-10-28 12:22:37.879 DEBUG 27226 --- [-StreamThread-1] ...

Re: Kafka Streams application stuck rebalancing on startup

Yeah there's definitely something weird going on (assuming this is the full logs over that time period). The last thing we see logged from the StreamThread is this message from around the start of the task assignment process: 2020-10-28 12:22:37.879 DEBUG 27226 --- [-StreamThread-1] o.a.k.s.p.i.StreamsPartitionAssignor : stream-thread [demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer] Constructed client metadata {81060bdc-c8cc-4350-85f8-d238267e264e=ClientMetadata{hostInfo=null, consumers=[demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer-976853d9-06ad-4515-abf3-2a7398c12006], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) prevOwnedPartitionsByConsumerId: ([]) capacity: 1]}} from the member subscriptions. which is at 12:22:37. Then there's nothing else from Streams until at least 12:25:00, where the logs end....

Re: Kafka Streams application stuck rebalancing on startup

2020-10-28 12:22:37.681 DEBUG 27226 --- [ SyncThread:0] o.a.z.server.FinalRequestProcessor : sessionid:0x1001183e8060000 type:create cxid:0x66 zxid:0x36 txntype:1 reqpath:n/a 2020-10-28 12:22:37.682 DEBUG 27226 --- [ 27.0.0.1:50628 )] org.apache.zookeeper.ClientCnxn : Reading reply sessionid:0x1001183e8060000, packet:: clientPath:/config/topics/__consumer_offsets serverPath:/config/topics/__consumer_offsets finished:false header:: 102,1 replyHeader:: 102,54,0 request:: '/config/topics/__consumer_offsets,#7b2276657273696f6e223a312c22636f6e666967223a7b22636f6d7072657373696f6e2e74797065223a2270726f6475636572222c22636c65616e75702e706f6c696379223a22636f6d70616374222c227365676d656e742e6279746573223a22313034383537363030227d7d,v{s{31,s{'world,'anyone}}},0 response:: '/config/topics/__consumer_offsets 2020-10-28 12:22:37.683 DEBUG 27226 --- [0 cport:50628):] o.a.zookeeper.server.SessionTrackerImpl : Checking session 0x1001183e8060000 2020-10-28 12:22:37.683 ...