Skip to main content

Re: Consuming an entire partition with control messages

Thank you Matthias for your answer, I open an issue on the aiokafka
project as follow up, let's see how we can resolve it there
https://github.com/aio-libs/aiokafka/issues/911

As mentioned in the issue, some tools like kafka-consumer-groups.sh also
display a lag of "1" in this kind of situation

Best regards,

Vincent

On 13/06/2023 17:27, Matthias J. Sax wrote:
> Sounds like a bug in aiokafka library to me.
>
> If the last message in a topic partition is a tx-marker, the consumer
> should step over it, and report the correct position after the marker.
>
> The official KafkaConsumer (ie, the Java one), does the exact same thing.
>
>
> -Matthias
>
> On 5/30/23 8:41 AM, Vincent Maurin wrote:
>> Hello !
>>
>> I am working on an exactly once stream processors in Python, using
>> aiokafka client library. My program stores a state in memory, that is
>> recovered from a changelog topic, like in kafka streams.
>>
>> On each processing loop, I am consuming messages, producing messages
>> to an output topics and to my changelog topic, within a transaction.
>>
>> When I need to restart a runner, to restore the state in memory, I
>> have a routine consuming the changelog topic from the beginning to the
>> "end" with a read_commited isolation level. Here I am struggling to
>> define when to stop my recovery :
>> * my current (maybe) working solution is to loop over "poll" until
>> poll is not returning any messages anymore
>> * I tried to do more something based on the end offests, the checking
>> the consumer position, but with control messages at the end of the
>> partition, I am running into an issue where position is one below end
>> offsets, and doesn't go further
>>
>> I had a quick look to
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
>>
>> but it is a bit hard to figure out what is going on here
>>
>> Best regards,
>> Vincent

Comments