Skip to main content

Re: Transaction support multiple producer instance

Hello Wenxuan,

One KIP that we are considering so far is KIP-447:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics

It does not directly address your scenarios, but I'm wondering if you can
adjust your code to group the producers if they are in the consumer ->
producer pattern, in which case KIP-447 would help you making sure this
group of producers, although each would still has its own txn, can be
blocked on completing together.

Guozhang


On Fri, May 31, 2019 at 4:10 AM wenxuan <choose_home@126.com> wrote:

> Hi Sandeep,
>
> Thanks for your replay.
>
> I have split the large message, but the problem is the split message can't
> handled in one JVM or physical machine, even by multi-thread producer,
> cause CPU or other resource bottleneck.
>
> So I need make multiple producer instances in different physical machine
> as a distributed system. However sharing the same txn id is not supported.
> Is there any way to solve this.
>
> Thanks,
> Wenxuan
>
> On 2019/05/31 10:28:53, Sandeep Nemuri <n...@gmail.com> wrote:
> > How about splitting the large message and then produce?>
> >
> > On Fri, May 31, 2019 at 3:39 PM wenxuan <ch...@126.com> wrote:>
> >
> > > Hi Jonathan,>
> > >>
> > > Thanks for your reply.>
> > >>
> > > I get mass message to send beyond the limit of one JVM or physical>
> > > machine, so I need make more than one producer in the same
> transaction.>
> > >>
> > > Since multiple producer can't share the same transaction id, Is there
> way>
> > > to achieve multiple producer transaction described above.>
> > >>
> > > Thanks,>
> > > Wenxuan>
> > >>
> > > On 2019/05/31 08:34:14, Jonathan Santilli <j....@gmail.com> wrote:>
> > > > Hello Wenxuan, there reason of the Exception, by design the
> transaction>
> > > Id>>
> > > > must be unique per producer instance, this is from the Java docs:>>
> > > >>
> > > >>
> > >
> https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>>
>
> > >>
> > > >>
> > > > "The purpose of the transactional.id is to enable transaction
> recovery>>
> > > > across multiple sessions of a single producer instance. It would>
> > > typically>>
> > > > be derived from the shard identifier in a partitioned, stateful,>>
> > > > application. As such, it should be unique to each producer instance>
> > > running>>
> > > > within a partitioned application.">>
> > > >>
> > > > You must have a reason to instantiate multiple producers, however,
> have>
> > > you>>
> > > > try just to instantiate one producer?>>
> > > >>
> > > > "The producer is *thread safe* and sharing a single producer
> instance>>
> > > > across threads will generally be faster than having multiple>
> > > instances.">>
> > > >>
> > > > Hope that helps.>>
> > > >>
> > > >>
> > > > Cheers,>>
> > > > -->>
> > > > Jonathan>>
> > > >>
> > > >>
> > > >>
> > > > On Fri, May 31, 2019 at 5:47 AM wenxuan <ch...@126.com> wrote:>>
> > > >>
> > > > > Hey guys,>>
> > > > >>>
> > > > > I have problem in the below scenario.>>
> > > > >>>
> > > > > I hope to run multiple producer instances that send message>
> > > concurrently>>
> > > > > in the same transaction, and the transaction is committed when
> all>
> > > the>>
> > > > > producer send message successfully. Otherwise, if one producer>
> > > failed, the>>
> > > > > transaction is aborted and no message will be consumed.>>
> > > > >>>
> > > > > However, when multiple producer share the same txn id, throw the>
> > > following>>
> > > > > exception:>>
> > > > >>>
> > > > > org.apache.kafka.common.KafkaException: Cannot execute
> transactional>>
> > > > > method because we are in an error state>>
> > > > >>>
> > > > > at>>
> > > > >>
> > >
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)>>
>
> > >>
> > > > >>>
> > > > > at>>
> > > > >>
> > >
> org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215)>>
>
> > >>
> > > > >>>
> > > > > at>>
> > > > >>
> > >
> org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)>>
>
> > >>
> > > > >>>
> > > > > at>>
> > > > >>
> > >
> com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68)>>
>
> > >>
> > > > >>>
> > > > > Caused by:
> org.apache.kafka.common.errors.ProducerFencedException:>>
> > > > > Producer attempted an operation with an old epoch. Either there is
> a>
> > > newer>>
> > > > > producer with the same transactionalId, or the producer's
> transaction>
> > > has>>
> > > > > been expired by the broker.>>
> > > > >>>
> > > > > Please help us how to solve this, Thanks.>>
> > > > >>>
> > > >>
> > > >>
> > > > -- >>
> > > > Santilli Jonathan>>
> > > >>
> > >>
> >
> >
> > -- >
> > * Regards*>
> > * Sandeep Nemuri*>
> >
>


--
-- Guozhang

Comments