Skip to main content

Re: Exactly once transactions

-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIzBAEBCgAdFiEE8osu2CcCCF5douGQu8PBaGu5w1EFAl26pQwACgkQu8PBaGu5
w1HOdQ/+LNU2MiHdLm0dJzuxZsi83xQSZPv9eM+ZxTDWAjwCYjr5I2R/kvpubVQi
xmJxx/b/BuvuvY7+x3BfR8qK1eNdbd3ZNbpzjk1kgWVJNwGOvqPo+cfwNg2damJu
Wt5Xv0hc+f3/TKC3pvDgsdYw3wo2hf2SBpRoA+m+uySgupmx2a4SYZGhhrP69eWD
fomqW9jMp23s17AHyuJI7rVbeMSranCm95ZKY9xMlRTxq4afvBV4lGRs8aZi6ygR
292IpojGMa0VHArN5k41J+DJfo+KY/fDxyA36b34jSiFG/Z4hIMX7GZ6fQ4UQNtd
Fa9Vq8Mejj17VQA1Y5eQ75EhpRtG8AE13hQ0Txey4H6VuWo9IUXtLV0RVzP0BjOV
W8KfKdMAAjYXNLVPTIvQwd5KaRUz/Sm5FqTfDrZltC21d21vtpcDQR6k5W2zY6EW
87vAE5K/OvkY+9EXSP+ohc7wqDR9L804rqhfydDMCIUNbIEEbL8GMotVXa22aHYb
6rvkf9SwM+5jFRDtT1HnkOyLFczsmaGg9D/1D5heeL8i55xJ2jXWi+VuwG+9hd7b
V5HDJyQ+yPoZdDJld5F1gtUp0if1F5Xfy4ZtepSV7g/UmQSWLy4f7GCWNFFNxc/m
WpcL0ymAcVv75xhddxqEuemHvkjhlg7bFgdPOLgAZeQp2KW+J40=
=+YfE
-----END PGP SIGNATURE-----
I would recommend to read the Kafka Streams KIP about EOS:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics

Fencing is the most critical part in the implementation. Kafka Streams
basically uses a dedicated `transactional.id` per input topic-partition.
Hence, if a rebalance happens and a partition is re-assigned, it's
ensure that only one "instance" of a consumer-producer pair can commit
the transactions successfully, and the "new producer" would use the same
associated `transactional.id` as the "original producer".

There is actually a KIP in progress that will make using transactions
simpler, as it basically improves fencing:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics


But I agree with Alex, if you can, it's recommended to use Streams API
that solves those problems for you. There is no need to re-implement it
from scratch.



Hope this helps.


-Matthias

On 10/30/19 8:30 PM, Alex Brekken wrote:
> Sergi, have you looked at using the Kafka Streams API? If you're just
> consuming from a topic, transforming the data, then writing it out to a
> sink topic, you can probably do that relatively easily using the Streams
> DSL. No need to manually subscribe, poll, or commit offsets. Then if you
> want to get exactly once guarantees, you just set the processing.guarantee
> property to "exactly_once" and you're all set. However, since it sounds
> like your application isn't stateful then I think your only concern is with
> producing a duplicate message(s) to the sink topic right? (you don't have
> any internal state that could get messed up in the event of crashes,
> network failures, etc) Do you have control over who/what consumes the sink
> topic? If so, can you make that consumer tolerant of duplicate messages?
> Exactly once works well in my experience, but there is overhead involved so
> only use it if you need it. :)
>
> Alex
>
> On Wed, Oct 30, 2019 at 10:04 PM Kidong Lee <mykidong@gmail.com> wrote:
>
>> Hi,
>>
>> It may be not for your case, but I have implemented an example about kafka
>> transaction:
>>
>> https://medium.com/@mykidong/kafka-transaction-56f022af1b0c
>>
>> , in this example, offsets are saved to external db.
>>
>> - Kidong
>>
>>
>>
>>
>>
>> 2019년 10월 31일 (목) 오전 11:39, Sergi Vladykin <sergi.vladykin@gmail.com>님이
>> 작성:
>>
>>> Ok, so what is the advice? Not to use Kafka transactions ever because
>> they
>>> are unusable in real life?
>>> Can you please provide a recipe how to make it work in the simple
>> scenario:
>>> no databases, just two topics, no admin actions.
>>>
>>> Sergi
>>>
>>> ср, 30 окт. 2019 г. в 22:39, Jörn Franke <jornfranke@gmail.com>:
>>>
>>>> Please note that for exactlyOnce it is not sufficient to set simply an
>>>> option. The producer and consumer must individually make sure that they
>>>> only process the message once. For instance, the consumer can crash and
>>> it
>>>> may then resend already submitted messages or the producer might crash
>>> and
>>>> might write the same message twice to a database etc.
>>>> Or due to a backup and restore or through a manual admin action all
>> these
>>>> things might happen.
>>>> Those are not "edge" scenarios. In operations they can happen quiet
>>> often,
>>>> especially in a Containerized infrastructure.
>>>> This you have to consider for all messaging solutions (not only Kafka)
>> in
>>>> your technical design.
>>>>
>>>>> Am 30.10.2019 um 20:30 schrieb Sergi Vladykin <
>>> sergi.vladykin@gmail.com
>>>>> :
>>>>>
>>>>> Hi!
>>>>>
>>>>> I investigate possibilities of "exactly once" Kafka transactions for
>>>>> consume-transform-produce pattern. As far as I understand, the logic
>>> must
>>>>> be the following (in pseudo-code):
>>>>>
>>>>> var cons = createKafkaConsumer(MY_CONSUMER_GROUP_ID);
>>>>> cons.subscribe(TOPIC_A);
>>>>> for (;;) {
>>>>> var recs = cons.poll();
>>>>> for (var part : recs.partitions()) {
>>>>> var partRecs = recs.records(part);
>>>>> var prod = getOrCreateProducerForPart(MY_TX_ID_PREFIX +
>> part);
>>>>> prod.beginTransaction();
>>>>> sendAllRecs(prod, TOPIC_B, partRecs);
>>>>> prod.sendOffsetsToTransaction(singletonMap(part,
>>>>> lastRecOffset(partRecs) + 1),
>>>>>
>>>>> MY_CONSUMER_GROUP_ID);
>>>>> prod.commitTransaction();
>>>>> }
>>>>> }
>>>>>
>>>>> Is this right approach?
>>>>>
>>>>> Because it looks to me there is a possible race here and the same
>>> record
>>>>> from topic A can be committed to topic B more than once: if
>> rebalancing
>>>>> happens after our thread polled a record and before creating a
>>> producer,
>>>>> then another thread will read and commit the same record, after that
>>> our
>>>>> thread will wake up, create a producer (and fence the other one) and
>>>>> successfully commit the same record second time.
>>>>>
>>>>> Can anyone explain please how to do "exactly once" in Kafka right?
>>>>> Examples would be very helpful.
>>>>>
>>>>> Sergi
>>>>
>>>
>>
>

Comments