Sounds like a bug in aiokafka library to me.
If the last message in a topic partition is a tx-marker, the consumer
should step over it, and report the correct position after the marker.
The official KafkaConsumer (ie, the Java one), does the exact same thing.
-Matthias
On 5/30/23 8:41 AM, Vincent Maurin wrote:
> Hello !
>
> I am working on an exactly once stream processors in Python, using
> aiokafka client library. My program stores a state in memory, that is
> recovered from a changelog topic, like in kafka streams.
>
> On each processing loop, I am consuming messages, producing messages
> to an output topics and to my changelog topic, within a transaction.
>
> When I need to restart a runner, to restore the state in memory, I
> have a routine consuming the changelog topic from the beginning to the
> "end" with a read_commited isolation level. Here I am struggling to
> define when to stop my recovery :
> * my current (maybe) working solution is to loop over "poll" until
> poll is not returning any messages anymore
> * I tried to do more something based on the end offests, the checking
> the consumer position, but with control messages at the end of the
> partition, I am running into an issue where position is one below end
> offsets, and doesn't go further
>
> I had a quick look to
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> but it is a bit hard to figure out what is going on here
>
> Best regards,
> Vincent
If the last message in a topic partition is a tx-marker, the consumer
should step over it, and report the correct position after the marker.
The official KafkaConsumer (ie, the Java one), does the exact same thing.
-Matthias
On 5/30/23 8:41 AM, Vincent Maurin wrote:
> Hello !
>
> I am working on an exactly once stream processors in Python, using
> aiokafka client library. My program stores a state in memory, that is
> recovered from a changelog topic, like in kafka streams.
>
> On each processing loop, I am consuming messages, producing messages
> to an output topics and to my changelog topic, within a transaction.
>
> When I need to restart a runner, to restore the state in memory, I
> have a routine consuming the changelog topic from the beginning to the
> "end" with a read_commited isolation level. Here I am struggling to
> define when to stop my recovery :
> * my current (maybe) working solution is to loop over "poll" until
> poll is not returning any messages anymore
> * I tried to do more something based on the end offests, the checking
> the consumer position, but with control messages at the end of the
> partition, I am running into an issue where position is one below end
> offsets, and doesn't go further
>
> I had a quick look to
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> but it is a bit hard to figure out what is going on here
>
> Best regards,
> Vincent
Comments
Post a Comment