Hello Kafka Team,
i am using a three node kafka broker cluster but
recently compression issues occur on brocker side. Please provide guillines.
broker property:
process.roles=broker,controller
node.id=0
controller.quorum.voters=0@kafka-0-0.kafka.prod.svc.cluster.local
:9093,1@kafka-1-0.kafka.prod.svc.cluster.local
:9093,2@kafka-2-0.kafka.prod.svc.cluster.local:9093
listeners=PLAINTEXT://kafka-0-0.kafka.prod.svc.cluster.local:9092,CONTROLLER://kafka-0-0.kafka.prod.svc.cluster.local:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka-0-0.kafka.prod.svc.cluster.local:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=/mnt/kafka/0
num.partitions=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
group.initial.rebalance.delay.ms=7000
enable.auto.extend=true
default.replication.factor=3
compression.type=snappy
request.timeout.ms=3000
auto.leader.rebalance.enable=true
delete.topic.enable=true
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.retention.hours=168
log.cleaner.enable=true
log.cleaner.delete.retention.ms=86400
log.cleaner.threads=1
log.cleaner.backoff.ms=30000
log.cleanup.policy=delete
num.recovery.threads.per.data.dir=1
num.network.threads=9
message.max.bytes=40000000
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
replica.fetch.max.bytes=104857600
max.in.flight.requests.per.connection=1
controller.quorum.election.timeout.ms=1000
controller.quorum.election.backoff.max.ms=1000
controller.quorum.fetch.timeout.ms=2000
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
queued.max.requests=500
num.replica.fetchers=3
reserved.broker.max.id=999999999
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
initial.broker.registration.timeout.ms=120000
broker.rack=1
issue :
org.apache.kafka.common.KafkaException: Failed to decompress record stream
at org.apache.kafka.common.record.DefaultRecordBatch$StreamRecordIterator.readNext(DefaultRecordBatch.java:642)
at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:603)
at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:572)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1(LogValidator.scala:424)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:409)
at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:114)
at kafka.log.UnifiedLog.append(UnifiedLog.scala:844)
at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1167)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1155)
at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:957)
at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
at scala.collection.mutable.HashMap.map(HashMap.scala:35)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:945)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:603)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:674)
at kafka.server.KafkaApis.handle(KafkaApis.scala:183)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:478)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:517)
at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:439)
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466)
at java.base/java.io.DataInputStream.readByte(DataInputStream.java:270)
at org.apache.kafka.common.utils.ByteUtils.readUnsignedVarint(ByteUtils.java:170)
at org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:205)
at org.apache.kafka.common.record.DefaultRecord.readPartiallyFrom(DefaultRecord.java:371)
at org.apache.kafka.common.record.DefaultRecordBatch$1.doReadRecord(DefaultRecordBatch.java:289)
at org.apache.kafka.common.record.DefaultRecordBatch$StreamRecordIterator.readNext(DefaultRecordBatch.java:638)
... 21 more
i am using a three node kafka broker cluster but
recently compression issues occur on brocker side. Please provide guillines.
broker property:
process.roles=broker,controller
node.id=0
controller.quorum.voters=0@kafka-0-0.kafka.prod.svc.cluster.local
:9093,1@kafka-1-0.kafka.prod.svc.cluster.local
:9093,2@kafka-2-0.kafka.prod.svc.cluster.local:9093
listeners=PLAINTEXT://kafka-0-0.kafka.prod.svc.cluster.local:9092,CONTROLLER://kafka-0-0.kafka.prod.svc.cluster.local:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka-0-0.kafka.prod.svc.cluster.local:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=/mnt/kafka/0
num.partitions=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
group.initial.rebalance.delay.ms=7000
enable.auto.extend=true
default.replication.factor=3
compression.type=snappy
request.timeout.ms=3000
auto.leader.rebalance.enable=true
delete.topic.enable=true
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.retention.hours=168
log.cleaner.enable=true
log.cleaner.delete.retention.ms=86400
log.cleaner.threads=1
log.cleaner.backoff.ms=30000
log.cleanup.policy=delete
num.recovery.threads.per.data.dir=1
num.network.threads=9
message.max.bytes=40000000
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
replica.fetch.max.bytes=104857600
max.in.flight.requests.per.connection=1
controller.quorum.election.timeout.ms=1000
controller.quorum.election.backoff.max.ms=1000
controller.quorum.fetch.timeout.ms=2000
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
queued.max.requests=500
num.replica.fetchers=3
reserved.broker.max.id=999999999
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
initial.broker.registration.timeout.ms=120000
broker.rack=1
issue :
org.apache.kafka.common.KafkaException: Failed to decompress record stream
at org.apache.kafka.common.record.DefaultRecordBatch$StreamRecordIterator.readNext(DefaultRecordBatch.java:642)
at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:603)
at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:572)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1(LogValidator.scala:424)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:409)
at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:114)
at kafka.log.UnifiedLog.append(UnifiedLog.scala:844)
at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1167)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1155)
at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:957)
at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
at scala.collection.mutable.HashMap.map(HashMap.scala:35)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:945)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:603)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:674)
at kafka.server.KafkaApis.handle(KafkaApis.scala:183)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:478)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:517)
at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:439)
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466)
at java.base/java.io.DataInputStream.readByte(DataInputStream.java:270)
at org.apache.kafka.common.utils.ByteUtils.readUnsignedVarint(ByteUtils.java:170)
at org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:205)
at org.apache.kafka.common.record.DefaultRecord.readPartiallyFrom(DefaultRecord.java:371)
at org.apache.kafka.common.record.DefaultRecordBatch$1.doReadRecord(DefaultRecordBatch.java:289)
at org.apache.kafka.common.record.DefaultRecordBatch$StreamRecordIterator.readNext(DefaultRecordBatch.java:638)
... 21 more
Comments
Post a Comment