Hi,
Thanks for your question.
It appears you're using the legacy consumer API, which was removed in 2.0.0
and is no longer supported.
I would strongly suggest building on top of the modern Java Consumer API at
this time.
The modern API exposes the deserialized headers via the
ConsumerRecord#headers method:
https://kafka.apache.org/39/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
Hope this helps,
Greg
On Fri, Dec 27, 2024, 10:19 AM Chain Head <mrchainhead@gmail.com> wrote:
> Hi,
> I am struggling to get the key-value pair from the Produce Request API. I
> want to write it to a Buffer for further processing. I can't seem to get
> the `k` and `v` values whereas the `.keySize` and `.valueSize` are reported
> correctly. Please advise how to extract the key value pairs from the
> Produce request API payload.
>
> For better format, see https://pastebin.com/ZKad1ET6
>
> MemoryRecords partitionRecords = (MemoryRecords)
> partitionData.records();
> for (RecordBatch batch : partitionRecords.batches()) {
> // Iterate through reords of a batch
> Buffer batchBuffer = Buffer.buffer();
> Iterator<org.apache.kafka.common.record.Record> it =
> batch.iterator();
> while (it.hasNext()) {
> org.apache.kafka.common.record.Record record = it.next();
>
> String k = "";
> String v = "";
>
> for (Header header : record.headers()) {
> v = new String(header.value());
> // Some logic with k and v to write to a Buffer
> }
>
> if (record.hasKey()) {
> ByteBuffer keyBuffer = record.key();
> ByteBuffer valueBuffer = record.value();
>
> if (record.hasValue()) {
> k = new String(keyBuffer.array(), keyBuffer.position(),
> record.keySize());
> v = new String(valueBuffer.array(), valueBuffer.position(),
> record.valueSize());
> // Some logic with k and v to write to a Buffer
> } else {
> k = new String(keyBuffer.array(), keyBuffer.position(),
> record.keySize());
> // Some logic with k and v to write to a Buffer
> }
> } else {
> // Empty buffer
> }
> }
> }
>
Thanks for your question.
It appears you're using the legacy consumer API, which was removed in 2.0.0
and is no longer supported.
I would strongly suggest building on top of the modern Java Consumer API at
this time.
The modern API exposes the deserialized headers via the
ConsumerRecord#headers method:
https://kafka.apache.org/39/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
Hope this helps,
Greg
On Fri, Dec 27, 2024, 10:19 AM Chain Head <mrchainhead@gmail.com> wrote:
> Hi,
> I am struggling to get the key-value pair from the Produce Request API. I
> want to write it to a Buffer for further processing. I can't seem to get
> the `k` and `v` values whereas the `.keySize` and `.valueSize` are reported
> correctly. Please advise how to extract the key value pairs from the
> Produce request API payload.
>
> For better format, see https://pastebin.com/ZKad1ET6
>
> MemoryRecords partitionRecords = (MemoryRecords)
> partitionData.records();
> for (RecordBatch batch : partitionRecords.batches()) {
> // Iterate through reords of a batch
> Buffer batchBuffer = Buffer.buffer();
> Iterator<org.apache.kafka.common.record.Record> it =
> batch.iterator();
> while (it.hasNext()) {
> org.apache.kafka.common.record.Record record = it.next();
>
> String k = "";
> String v = "";
>
> for (Header header : record.headers()) {
> v = new String(header.value());
> // Some logic with k and v to write to a Buffer
> }
>
> if (record.hasKey()) {
> ByteBuffer keyBuffer = record.key();
> ByteBuffer valueBuffer = record.value();
>
> if (record.hasValue()) {
> k = new String(keyBuffer.array(), keyBuffer.position(),
> record.keySize());
> v = new String(valueBuffer.array(), valueBuffer.position(),
> record.valueSize());
> // Some logic with k and v to write to a Buffer
> } else {
> k = new String(keyBuffer.array(), keyBuffer.position(),
> record.keySize());
> // Some logic with k and v to write to a Buffer
> }
> } else {
> // Empty buffer
> }
> }
> }
>
Comments
Post a Comment