Skip to main content

Kafka Streams REPLACE_THREAD recovery delayed by session.timeout.ms since 4.0 — dying consumer no longer sends LeaveGroup (intentional?)

Hi all, We're upgrading a Kafka Streams service from 3.9.0 to 4.x and ran into a behavioral change in REPLACE_THREAD recovery that we'd like to understand whether it's intended. Summary When the StreamsUncaughtExceptionHandler returns REPLACE_THREAD, recovery time jumps from sub-second (3.9.x) to ~`session.timeout.ms(45 s default) starting with4.0. We've traced this to the dying consumer no longer sending a LeaveGroup` request on shutdown — the broker therefore has to wait for the session to expire before it triggers a rebalance. This appears to be the combined effect of KIP-1092 (which added the GroupMembershipOperation filter in AbstractCoordinator) and Streams' replaceStreamThread() passing REMAIN_IN_GROUP to the consumer's close path. We understand the rationale (avoid partition bouncing for stateful apps where the new thread starts in the same JVM), but the 45 s floor undermines KIP-671's promise of "fast in-place recovery from transient errors". The behavior change does not appear to be called out in the 4.0 upgrade notes. Reproduction - Kafka Streams 4.1.2 (also reproduces on 4.2.0; 4.3.0 release notes show no relevant fix). - Dynamic membership (no group.instance.id), >= 2 stream threads. - Uncaught-exception handler returns REPLACE_THREAD. - Throw any RuntimeException from inside a processor or punctuator. Log timeline on 4.1.2: T+0.000 ERROR Replacing thread in the streams uncaught exception handler T+0.001 INFO Adding StreamThread-3 T+0.112 INFO StreamThread-3 polled 1 times ── 42-second silence ── T+42.314 INFO StreamsPartitionAssignor: No followup rebalance requested T+42.315 INFO State transition from RUNNING to REBALANCING T+42.696 INFO State transition from REBALANCING to RUNNING Total recovery: 42,696 ms Same test on 3.9.0: T+0.000 ERROR Replacing thread in the streams uncaught exception handler T+0.001 INFO Adding StreamThread-3 T+0.029 INFO Member ... sending LeaveGroup request to coordinator due to "the consumer unsubscribed from all topics" T+0.116 INFO StreamsPartitionAssignor: 1 client, 2 consumers participating T+0.128 INFO State transition from RUNNING to REBALANCING T+0.466 INFO State transition from REBALANCING to RUNNING Total recovery: 466 ms Same broker (cp-kafka:6.2.1) for both runs; we also re-ran with cp-kafka:8.2.0 (KRaft) and got the same 42 s gap on 4.1.2, so the broker is not the variable. Where the change lives 1. kafka-clients AbstractCoordinator. 3.9.0 had: public synchronized RequestFuture<Void> maybeLeaveGroup(String reason) { // sends LeaveGroup if isDynamicMember() && coordinator known && state != UNJOINED // && generation has memberId } 4.1.2 introduced a GroupMembershipOperation parameter and an additional gate: private boolean shouldSendLeaveGroupRequest(GroupMembershipOperation op) { return !coordinatorUnknown() && state != UNJOINED && generation.hasMemberId() && (op == LEAVE_GROUP || (op == DEFAULT && isDynamicMember())); }     So unless the caller passes LEAVE_GROUP (or DEFAULT on a dynamic member, which is what most user-facing close paths get), no LeaveGroup is sent. 2. kafka-streams KafkaStreams#replaceStreamThread. In 4.2.0 bytecode it explicitly passes REMAIN_IN_GROUP into the shutdown chain (verified via javap -p -c): 60: getstatic CloseOptions$GroupMembershipOperation.REMAIN_IN_GROUP 64: invokevirtual StreamThread.shutdown(GroupMembershipOperation) 67: invokevirtual addStreamThread() In 4.1.2 the call site is the no-arg shutdown() and downstream the consumer close still ends up not sending LeaveGroup (confirmed by DEBUG-level ConsumerCoordinator capture — the Sending LeaveGroup request log line that appears in 3.9.0 is absent). What we measured  Setup Recovery time Kafka 3.9.0 client + cp-kafka:6.2.1 (local test) ~466 ms Kafka 3.9.0 client (production, 2× ECS tasks × 9 retry threads each) ~7 s Kafka 4.1.2 client + cp-kafka:6.2.1 (local test) ~42 s Kafka 4.1.2 client + cp-kafka:8.2.0 KRaft (local test) ~42 s Kafka 4.1.2 client + consumer.session.timeout.ms=10000 ~7 s The session.timeout.ms override confirms the broker-side wait is the bottleneck. Our questions 1. Is the REMAIN_IN_GROUP choice in replaceStreamThread() intentional? The design rationale (avoid partition bouncing to other instances and back during in-JVM thread replacement) makes sense in theory, but it now requires session.timeout.ms time to take effect, which seems to nullify the optimisation in practice. 2. Is this expected to be documented as a behavioural change in the 4.0 upgrade notes? We didn't find a mention; the closest reference is KAFKA-16514, but that's about the user-facing KafkaStreams.close(CloseOptions) API rather than the internal REPLACE_THREAD path. 3. Would the project consider exposing this as a public Streams config? Something like streams.replace-thread.leave-group=true|false (default false = current behaviour) would let users on stateful, multi-thread apps opt into the 3.9 behaviour without the cross-cutting side effects of lowering session.timeout.ms on every consumer in the JVM. Related references: - KIP-1092: Extend Consumer#close with an option to leave the group or not - KIP-1153: Refactor Kafka Streams CloseOptions to Fluent API Style - KIP-671: Streams-specific UncaughtExceptionHandler — defines REPLACE_THREAD. - KAFKA-16514 — closest existing ticket, resolved as duplicate. - KAFKA-18185 — removed internal.leave.group.on.close, but the explicit REMAIN_IN_GROUP in replaceStreamThread() overrides the new generic default. Thanks, Giorgos A.

Comments