Skip to main content

Re: Kafka Streams 2.7.1 to 3.3.1 rolling upgrade

Hmmm... that's interesting...

It seems that Kafka Streams "version probing" does not play well static
group membership...

Sounds like a "bug" to me -- well, more like a missing integration. Not
sure right now, if/how we could fix it.

Can you file a ticket?

For now, I don't think you can do anything about it. Sorry. :(


-Matthias



On 2/27/23 6:50 AM, Vinoth Rengarajan wrote:
> Hi Team,
>
> I am trying to upgrade my Kaka Streams application from 2.7.1 to 3.3.1.
> Brokers are running on Kafka 2.7.1. The plan is to upgrade the clients
> first and then then brokers
>
> I have already enabled the static membership in our application so that we
> I am not expecting a rebalance. Below are the configs *(Stream Config &
> Consumer Config)*.
>
> As mentioned earlier, the application is running on Kafka 2.7.1. I deployed
> the latest version of the app with 3.3.1 streams libraries, and configured
> the '*upgrade.from' *property to 2.7 (based on the upgrade documentation
> available here
> https://kafka.apache.org/33/documentation/streams/upgrade-guide). When I
> do a rolling bounce with the latest changes, I can see a rebalance being
> triggered on other instances in the cluster.
>
> I can see the below logs on the instance which is being bounced, forcing a
> rebalance on others. Am I missing something? How can I avoid other
> instances in the cluster from rebalancing?
>
>
> *Logs:*
> INFO 2023-02-27 09:52:16.805 | streams.KafkaStreams stream-client
> [kafka_upgrade.Kafka_Upgrade_Test] State transition from CREATED to
> REBALANCING
> INFO 2023-02-27 09:52:16.946 | internals.ConsumerCoordinator [Consumer
> instanceId=kafka_upgrade.Kafka_Upgrade_Test-4,
> clientId=kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer,
> groupId=kafka_upgrade.Kafka_Upgrade_Test] Notifying assignor about the new
> Assignment(partitions=[kafka_upgrade.Kafka_Upgrade_Test-version-updates-11,
> kafka_upgrade.Kafka_Upgrade_Test-version-updates-23], userDataSize=56)
> INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer]
> Sent a version 11 subscription and got version 8 assignment back
> (successful version probing). Downgrade subscription metadata to commonly
> supported version 8 and trigger new rebalance.
> INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer]
> Sent a version 11 subscription and got version 8 assignment back
> (successful version probing). Downgrade subscription metadata to commonly
> supported version 8 and trigger new rebalance.
> INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer]
> Sent a version 11 subscription and got version 8 assignment back
> (successful version probing). Downgrade subscription metadata to commonly
> supported version 8 and trigger new rebalance.
> INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer]
> Sent a version 11 subscription and got version 8 assignment back
> (successful version probing). Downgrade subscription metadata to commonly
> supported version 8 and trigger new rebalance.
> INFO 2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer]
> Requested to schedule immediate rebalance due to version probing.
> INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer]
> Requested to schedule immediate rebalance due to version probing.
> INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer]
> Requested to schedule immediate rebalance due to version probing.
> INFO 2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer]
> Requested to schedule immediate rebalance due to version probing.
>
> *Streams Config:*
>
> acceptable.recovery.lag = 10000
> application.id = Kafka_Upgrade_Test
> application.server =
> bootstrap.servers = [broker1, broker2, broker3]
> buffered.records.per.partition = 1000
> built.in.metrics.version = latest
> cache.max.bytes.buffering = 10485760
> client.id = kafka_upgrade.Kafka_Upgrade_Test
> commit.interval.ms = 30000
> connections.max.idle.ms = 540000
> default.deserialization.exception.handler = class
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
> default.dsl.store = rocksDB
> default.key.serde = null
> default.list.key.serde.inner = null
> default.list.key.serde.type = null
> default.list.value.serde.inner = null
> default.list.value.serde.type = null
> default.production.exception.handler = class
> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
> default.timestamp.extractor = class
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> default.value.serde = null
> max.task.idle.ms = 0
> max.warmup.replicas = 2
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> num.standby.replicas = 0
> num.stream.threads = 4
> poll.ms = 100
> probing.rebalance.interval.ms = 600000
> processing.guarantee = at_least_once
> rack.aware.assignment.tags = []
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> repartition.purge.interval.ms = 30000
> replication.factor = 3
> request.timeout.ms = 120000
> retries = 0
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 600000
> state.dir = /mnt/store/yukon-apps
> task.timeout.ms = 300000
> topology.optimization = none
> * upgrade.from = 2.7*
> window.size.ms = null
> windowed.inner.class.serde = null
> windowstore.changelog.additional.retention.ms = 86400000
>
> *Consumer Config:*
>
> allow.auto.create.topics = true
> auto.commit.interval.ms = 10000
> auto.offset.reset = none
> bootstrap.servers = [server1, server2, server3]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1
> client.rack =
> connections.max.idle.ms = 540000
> default.api.timeout.ms = 60000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = Kafka_Upgrade_Test
> group.instance.id = Kafka_Upgrade_Test-1
> heartbeat.interval.ms = 5000
> interceptor.classes = []
> internal.leave.group.on.close = false
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 1073741823
> max.poll.records = 2000
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy =
> [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 120000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.connect.timeout.ms = null
> sasl.login.read.timeout.ms = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.login.retry.backoff.max.ms = 10000
> sasl.login.retry.backoff.ms = 100
> sasl.mechanism = GSSAPI
> sasl.oauthbearer.clock.skew.seconds = 30
> sasl.oauthbearer.expected.audience = null
> sasl.oauthbearer.expected.issuer = null
> sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
> sasl.oauthbearer.jwks.endpoint.url = null
> sasl.oauthbearer.scope.claim.name = scope
> sasl.oauthbearer.sub.claim.name = sub
> sasl.oauthbearer.token.endpoint.url = null
> security.protocol = PLAINTEXT
> security.providers = null
> send.buffer.bytes = 131072
> session.timeout.ms = 300000
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.2
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> Regards,
> Vinoth
>

Comments