Hey guys,
I have problem in the below scenario.
I hope to run multiple producer instances that send message concurrently in the same transaction, and the transaction is committed when all the producer send message successfully. Otherwise, if one producer failed, the transaction is aborted and no message will be consumed.
However, when multiple producer share the same txn id, throw the following exception:
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215)
at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
at com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Please help us how to solve this, Thanks.
I have problem in the below scenario.
I hope to run multiple producer instances that send message concurrently in the same transaction, and the transaction is committed when all the producer send message successfully. Otherwise, if one producer failed, the transaction is aborted and no message will be consumed.
However, when multiple producer share the same txn id, throw the following exception:
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215)
at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
at com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Please help us how to solve this, Thanks.
Comments
Post a Comment