Skip to main content

Re: Rebalancing algorithm is extremely suboptimal for long processing

I'm looking forward to the incremental rebalancing protocol. In the
meantime, I've updated to Kafka 2.3.0 to take advantage of the static
group membership, and this has actually already helped tremendously.
However, unfortunately while it was working initially, some streams
are now unable to start at all, due to a code error in the broker
during the consumer join request:

[2019-07-25 08:14:11,978] ERROR [KafkaApi-1] Error when handling
request: clientId=x-stream-4a43d5d4-d38f-4cb0-8741-7a6c685abf15-StreamThread-1-consumer,
correlationId=6, api=JOIN_GROUP,
body={group_id=x-stream,session_timeout_ms=10000,rebalance_timeout_ms=300000,member_id=,group_instance_id=lcrzf-1,protocol_type=consumer,protocols=[{name=stream,metadata=java.nio.HeapByteBuffer[pos=0
lim=64 cap=64]}]} (kafka.server.KafkaApis)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:366)
at scala.None$.get(Option.scala:364)
at kafka.coordinator.group.GroupMetadata.generateMemberId(GroupMetadata.scala:368)
at kafka.coordinator.group.GroupCoordinator.$anonfun$doUnknownJoinGroup$1(GroupCoordinator.scala:178)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:209)
at kafka.coordinator.group.GroupCoordinator.doUnknownJoinGroup(GroupCoordinator.scala:169)
at kafka.coordinator.group.GroupCoordinator.$anonfun$handleJoinGroup$2(GroupCoordinator.scala:144)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:209)
at kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:136)
at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1389)
at kafka.server.KafkaApis.handle(KafkaApis.scala:124)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.base/java.lang.Thread.run(Thread.java:834)


I've put all the details in
https://issues.apache.org/jira/browse/KAFKA-8715. I don't see any
workarounds for this, so hopefully this can get resolved sooner rather
than later.

Regards,
Raman



On Mon, Jul 22, 2019 at 9:25 PM Guozhang Wang <wangguoz@gmail.com> wrote:
>
> Hello Raman, since you are using Consumer and you are concerning about the
> member-failure triggered rebalance, I think KIP-429 is most relevant to
> your scenario. As Matthias mentioned we are working on getting it in to the
> next release 2.4.
>
>
> Guozhang
>
> On Sat, Jul 20, 2019 at 6:36 PM Matthias J. Sax <matthias@confluent.io>
> wrote:
>
> > Static-Group membership ships with AK 2.3 (the open tickets of the KIP
> > are minor):
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> >
> > There is also KIP-415 for Kafka Connect in AK 2.3:
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> >
> >
> >
> > Currently WIP is KIP-429 and KIP-441:
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams
> >
> >
> >
> > On 7/19/19 12:31 PM, Jeff Widman wrote:
> > > I am also interested in learning how others are handling this.
> > >
> > > I also support several services where average message processing time
> > takes
> > > 20 seconds per message but p99 time is about 20 minutes and the
> > > stop-the-world rebalancing is very painful
> > >
> > > On Fri, Jul 19, 2019, 11:38 AM Raman Gupta <rocketraman@gmail.com>
> > wrote:
> > >
> > >> I've found
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/Incremental+Cooperative+Rebalancing:+Support+and+Policies
> > >> and
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/Incremental+Cooperative+Rebalancing+for+Streams
> > >> .
> > >> This is *exactly* what I need, right down to the Kubernetes pod
> > >> restart case. The number of issues with the current approach to
> > >> rebalancing elucidated in these documents is downright scary, and now
> > >> I am not surprised I am having tonnes of issues.
> > >>
> > >> Are there any plans to start implementing delayed imbalance and
> > >> standby bootstrap?
> > >>
> > >> Are there any short-term best practices that can help alleviate these
> > >> issues? My main problem right now is the "Instance Bounce" and
> > >> "Instance Failover" scenarios, and according to this wiki page,
> > >> num.standby.replicas should help with at least the former. Can someone
> > >> explain what this does?
> > >>
> > >> Regards,
> > >> Raman
> > >>
> > >> On Fri, Jul 19, 2019 at 12:53 PM Raman Gupta <rocketraman@gmail.com>
> > >> wrote:
> > >>>
> > >>> I have a situation in which the current rebalancing algorithm seems to
> > >>> be extremely sub-optimal.
> > >>>
> > >>> I have a topic with 100 partitions, and up to 100 separate consumers.
> > >>> Processing each message on this topic takes between 1 and 20 minutes,
> > >>> depending on the message.
> > >>>
> > >>> If any of the 100 consumers dies or drops out of the group, there is a
> > >>> huge amount of idle time as many consumers (up to 99 of them) finish
> > >>> their work and sit around idle, just waiting for the rebalance to
> > >>> complete.
> > >>>
> > >>> In addition, with 100 consumers, its not unusual for one to die for
> > >>> one reason or another, so these stop-the-world rebalances are
> > >>> happening all the time, making the entire system slow to a snail's
> > >>> pace.
> > >>>
> > >>> It surprises me that rebalance is so inefficient. I would have thought
> > >>> that partitions would just be assigned/unassigned to consumers in
> > >>> real-time without waiting for the entire consumer group to quiesce.
> > >>>
> > >>> Is there anything I can do to improve matters?
> > >>>
> > >>> Regards,
> > >>> Raman
> > >>
> > >
> >
> >
>
> --
> -- Guozhang

Comments