Thanks a lot for all the suggestions!
Streams API is not really what I need, but I looked into Streams sources
and found that it initializes transactional Producers in
ConsumerRebalanceListener.onPartitionsAssigned which is called in
KafkaConsumer.poll before fetching records.
Looks like this approach solves the race I was talking about.
Sergi
чт, 31 окт. 2019 г. в 12:10, Matthias J. Sax <matthias@confluent.io>:
> I would recommend to read the Kafka Streams KIP about EOS:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics
>
> Fencing is the most critical part in the implementation. Kafka Streams
> basically uses a dedicated `transactional.id` per input topic-partition.
> Hence, if a rebalance happens and a partition is re-assigned, it's
> ensure that only one "instance" of a consumer-producer pair can commit
> the transactions successfully, and the "new producer" would use the same
> associated `transactional.id` as the "original producer".
>
> There is actually a KIP in progress that will make using transactions
> simpler, as it basically improves fencing:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
>
>
> But I agree with Alex, if you can, it's recommended to use Streams API
> that solves those problems for you. There is no need to re-implement it
> from scratch.
>
>
>
> Hope this helps.
>
>
> -Matthias
>
> On 10/30/19 8:30 PM, Alex Brekken wrote:
> > Sergi, have you looked at using the Kafka Streams API? If you're just
> > consuming from a topic, transforming the data, then writing it out to a
> > sink topic, you can probably do that relatively easily using the Streams
> > DSL. No need to manually subscribe, poll, or commit offsets. Then if you
> > want to get exactly once guarantees, you just set the
> processing.guarantee
> > property to "exactly_once" and you're all set. However, since it sounds
> > like your application isn't stateful then I think your only concern is
> with
> > producing a duplicate message(s) to the sink topic right? (you don't
> have
> > any internal state that could get messed up in the event of crashes,
> > network failures, etc) Do you have control over who/what consumes the
> sink
> > topic? If so, can you make that consumer tolerant of duplicate messages?
> > Exactly once works well in my experience, but there is overhead involved
> so
> > only use it if you need it. :)
> >
> > Alex
> >
> > On Wed, Oct 30, 2019 at 10:04 PM Kidong Lee <mykidong@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> It may be not for your case, but I have implemented an example about
> kafka
> >> transaction:
> >>
> >> https://medium.com/@mykidong/kafka-transaction-56f022af1b0c
> >>
> >> , in this example, offsets are saved to external db.
> >>
> >> - Kidong
> >>
> >>
> >>
> >>
> >>
> >> 2019년 10월 31일 (목) 오전 11:39, Sergi Vladykin <sergi.vladykin@gmail.com>님이
> >> 작성:
> >>
> >>> Ok, so what is the advice? Not to use Kafka transactions ever because
> >> they
> >>> are unusable in real life?
> >>> Can you please provide a recipe how to make it work in the simple
> >> scenario:
> >>> no databases, just two topics, no admin actions.
> >>>
> >>> Sergi
> >>>
> >>> ср, 30 окт. 2019 г. в 22:39, Jörn Franke <jornfranke@gmail.com>:
> >>>
> >>>> Please note that for exactlyOnce it is not sufficient to set simply an
> >>>> option. The producer and consumer must individually make sure that
> they
> >>>> only process the message once. For instance, the consumer can crash
> and
> >>> it
> >>>> may then resend already submitted messages or the producer might crash
> >>> and
> >>>> might write the same message twice to a database etc.
> >>>> Or due to a backup and restore or through a manual admin action all
> >> these
> >>>> things might happen.
> >>>> Those are not "edge" scenarios. In operations they can happen quiet
> >>> often,
> >>>> especially in a Containerized infrastructure.
> >>>> This you have to consider for all messaging solutions (not only Kafka)
> >> in
> >>>> your technical design.
> >>>>
> >>>>> Am 30.10.2019 um 20:30 schrieb Sergi Vladykin <
> >>> sergi.vladykin@gmail.com
> >>>>> :
> >>>>>
> >>>>> Hi!
> >>>>>
> >>>>> I investigate possibilities of "exactly once" Kafka transactions for
> >>>>> consume-transform-produce pattern. As far as I understand, the logic
> >>> must
> >>>>> be the following (in pseudo-code):
> >>>>>
> >>>>> var cons = createKafkaConsumer(MY_CONSUMER_GROUP_ID);
> >>>>> cons.subscribe(TOPIC_A);
> >>>>> for (;;) {
> >>>>> var recs = cons.poll();
> >>>>> for (var part : recs.partitions()) {
> >>>>> var partRecs = recs.records(part);
> >>>>> var prod = getOrCreateProducerForPart(MY_TX_ID_PREFIX +
> >> part);
> >>>>> prod.beginTransaction();
> >>>>> sendAllRecs(prod, TOPIC_B, partRecs);
> >>>>> prod.sendOffsetsToTransaction(singletonMap(part,
> >>>>> lastRecOffset(partRecs) + 1),
> >>>>>
> >>>>> MY_CONSUMER_GROUP_ID);
> >>>>> prod.commitTransaction();
> >>>>> }
> >>>>> }
> >>>>>
> >>>>> Is this right approach?
> >>>>>
> >>>>> Because it looks to me there is a possible race here and the same
> >>> record
> >>>>> from topic A can be committed to topic B more than once: if
> >> rebalancing
> >>>>> happens after our thread polled a record and before creating a
> >>> producer,
> >>>>> then another thread will read and commit the same record, after that
> >>> our
> >>>>> thread will wake up, create a producer (and fence the other one) and
> >>>>> successfully commit the same record second time.
> >>>>>
> >>>>> Can anyone explain please how to do "exactly once" in Kafka right?
> >>>>> Examples would be very helpful.
> >>>>>
> >>>>> Sergi
> >>>>
> >>>
> >>
> >
>
>
Streams API is not really what I need, but I looked into Streams sources
and found that it initializes transactional Producers in
ConsumerRebalanceListener.onPartitionsAssigned which is called in
KafkaConsumer.poll before fetching records.
Looks like this approach solves the race I was talking about.
Sergi
чт, 31 окт. 2019 г. в 12:10, Matthias J. Sax <matthias@confluent.io>:
> I would recommend to read the Kafka Streams KIP about EOS:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics
>
> Fencing is the most critical part in the implementation. Kafka Streams
> basically uses a dedicated `transactional.id` per input topic-partition.
> Hence, if a rebalance happens and a partition is re-assigned, it's
> ensure that only one "instance" of a consumer-producer pair can commit
> the transactions successfully, and the "new producer" would use the same
> associated `transactional.id` as the "original producer".
>
> There is actually a KIP in progress that will make using transactions
> simpler, as it basically improves fencing:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
>
>
> But I agree with Alex, if you can, it's recommended to use Streams API
> that solves those problems for you. There is no need to re-implement it
> from scratch.
>
>
>
> Hope this helps.
>
>
> -Matthias
>
> On 10/30/19 8:30 PM, Alex Brekken wrote:
> > Sergi, have you looked at using the Kafka Streams API? If you're just
> > consuming from a topic, transforming the data, then writing it out to a
> > sink topic, you can probably do that relatively easily using the Streams
> > DSL. No need to manually subscribe, poll, or commit offsets. Then if you
> > want to get exactly once guarantees, you just set the
> processing.guarantee
> > property to "exactly_once" and you're all set. However, since it sounds
> > like your application isn't stateful then I think your only concern is
> with
> > producing a duplicate message(s) to the sink topic right? (you don't
> have
> > any internal state that could get messed up in the event of crashes,
> > network failures, etc) Do you have control over who/what consumes the
> sink
> > topic? If so, can you make that consumer tolerant of duplicate messages?
> > Exactly once works well in my experience, but there is overhead involved
> so
> > only use it if you need it. :)
> >
> > Alex
> >
> > On Wed, Oct 30, 2019 at 10:04 PM Kidong Lee <mykidong@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> It may be not for your case, but I have implemented an example about
> kafka
> >> transaction:
> >>
> >> https://medium.com/@mykidong/kafka-transaction-56f022af1b0c
> >>
> >> , in this example, offsets are saved to external db.
> >>
> >> - Kidong
> >>
> >>
> >>
> >>
> >>
> >> 2019년 10월 31일 (목) 오전 11:39, Sergi Vladykin <sergi.vladykin@gmail.com>님이
> >> 작성:
> >>
> >>> Ok, so what is the advice? Not to use Kafka transactions ever because
> >> they
> >>> are unusable in real life?
> >>> Can you please provide a recipe how to make it work in the simple
> >> scenario:
> >>> no databases, just two topics, no admin actions.
> >>>
> >>> Sergi
> >>>
> >>> ср, 30 окт. 2019 г. в 22:39, Jörn Franke <jornfranke@gmail.com>:
> >>>
> >>>> Please note that for exactlyOnce it is not sufficient to set simply an
> >>>> option. The producer and consumer must individually make sure that
> they
> >>>> only process the message once. For instance, the consumer can crash
> and
> >>> it
> >>>> may then resend already submitted messages or the producer might crash
> >>> and
> >>>> might write the same message twice to a database etc.
> >>>> Or due to a backup and restore or through a manual admin action all
> >> these
> >>>> things might happen.
> >>>> Those are not "edge" scenarios. In operations they can happen quiet
> >>> often,
> >>>> especially in a Containerized infrastructure.
> >>>> This you have to consider for all messaging solutions (not only Kafka)
> >> in
> >>>> your technical design.
> >>>>
> >>>>> Am 30.10.2019 um 20:30 schrieb Sergi Vladykin <
> >>> sergi.vladykin@gmail.com
> >>>>> :
> >>>>>
> >>>>> Hi!
> >>>>>
> >>>>> I investigate possibilities of "exactly once" Kafka transactions for
> >>>>> consume-transform-produce pattern. As far as I understand, the logic
> >>> must
> >>>>> be the following (in pseudo-code):
> >>>>>
> >>>>> var cons = createKafkaConsumer(MY_CONSUMER_GROUP_ID);
> >>>>> cons.subscribe(TOPIC_A);
> >>>>> for (;;) {
> >>>>> var recs = cons.poll();
> >>>>> for (var part : recs.partitions()) {
> >>>>> var partRecs = recs.records(part);
> >>>>> var prod = getOrCreateProducerForPart(MY_TX_ID_PREFIX +
> >> part);
> >>>>> prod.beginTransaction();
> >>>>> sendAllRecs(prod, TOPIC_B, partRecs);
> >>>>> prod.sendOffsetsToTransaction(singletonMap(part,
> >>>>> lastRecOffset(partRecs) + 1),
> >>>>>
> >>>>> MY_CONSUMER_GROUP_ID);
> >>>>> prod.commitTransaction();
> >>>>> }
> >>>>> }
> >>>>>
> >>>>> Is this right approach?
> >>>>>
> >>>>> Because it looks to me there is a possible race here and the same
> >>> record
> >>>>> from topic A can be committed to topic B more than once: if
> >> rebalancing
> >>>>> happens after our thread polled a record and before creating a
> >>> producer,
> >>>>> then another thread will read and commit the same record, after that
> >>> our
> >>>>> thread will wake up, create a producer (and fence the other one) and
> >>>>> successfully commit the same record second time.
> >>>>>
> >>>>> Can anyone explain please how to do "exactly once" in Kafka right?
> >>>>> Examples would be very helpful.
> >>>>>
> >>>>> Sergi
> >>>>
> >>>
> >>
> >
>
>
Comments
Post a Comment