Hi,
I am looking at parsing Produce request API on broker side. This is for
simulating a broker. No consumer is involved. Also, I am using 3.8.0.
On Sat, 28 Dec, 2024, 04:47 Greg Harris, <greg.harris@aiven.io.invalid>
wrote:
> Hi,
>
> Thanks for your question.
>
> It appears you're using the legacy consumer API, which was removed in 2.0.0
> and is no longer supported.
> I would strongly suggest building on top of the modern Java Consumer API at
> this time.
>
> The modern API exposes the deserialized headers via the
> ConsumerRecord#headers method:
>
> https://kafka.apache.org/39/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
>
> Hope this helps,
> Greg
>
> On Fri, Dec 27, 2024, 10:19 AM Chain Head <mrchainhead@gmail.com> wrote:
>
> > Hi,
> > I am struggling to get the key-value pair from the Produce Request API. I
> > want to write it to a Buffer for further processing. I can't seem to get
> > the `k` and `v` values whereas the `.keySize` and `.valueSize` are
> reported
> > correctly. Please advise how to extract the key value pairs from the
> > Produce request API payload.
> >
> > For better format, see https://pastebin.com/ZKad1ET6
> >
> > MemoryRecords partitionRecords = (MemoryRecords)
> > partitionData.records();
> > for (RecordBatch batch : partitionRecords.batches()) {
> > // Iterate through reords of a batch
> > Buffer batchBuffer = Buffer.buffer();
> > Iterator<org.apache.kafka.common.record.Record> it =
> > batch.iterator();
> > while (it.hasNext()) {
> > org.apache.kafka.common.record.Record record = it.next();
> >
> > String k = "";
> > String v = "";
> >
> > for (Header header : record.headers()) {
> > v = new String(header.value());
> > // Some logic with k and v to write to a Buffer
> > }
> >
> > if (record.hasKey()) {
> > ByteBuffer keyBuffer = record.key();
> > ByteBuffer valueBuffer = record.value();
> >
> > if (record.hasValue()) {
> > k = new String(keyBuffer.array(), keyBuffer.position(),
> > record.keySize());
> > v = new String(valueBuffer.array(),
> valueBuffer.position(),
> > record.valueSize());
> > // Some logic with k and v to write to a Buffer
> > } else {
> > k = new String(keyBuffer.array(), keyBuffer.position(),
> > record.keySize());
> > // Some logic with k and v to write to a Buffer
> > }
> > } else {
> > // Empty buffer
> > }
> > }
> > }
> >
>
I am looking at parsing Produce request API on broker side. This is for
simulating a broker. No consumer is involved. Also, I am using 3.8.0.
On Sat, 28 Dec, 2024, 04:47 Greg Harris, <greg.harris@aiven.io.invalid>
wrote:
> Hi,
>
> Thanks for your question.
>
> It appears you're using the legacy consumer API, which was removed in 2.0.0
> and is no longer supported.
> I would strongly suggest building on top of the modern Java Consumer API at
> this time.
>
> The modern API exposes the deserialized headers via the
> ConsumerRecord#headers method:
>
> https://kafka.apache.org/39/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
>
> Hope this helps,
> Greg
>
> On Fri, Dec 27, 2024, 10:19 AM Chain Head <mrchainhead@gmail.com> wrote:
>
> > Hi,
> > I am struggling to get the key-value pair from the Produce Request API. I
> > want to write it to a Buffer for further processing. I can't seem to get
> > the `k` and `v` values whereas the `.keySize` and `.valueSize` are
> reported
> > correctly. Please advise how to extract the key value pairs from the
> > Produce request API payload.
> >
> > For better format, see https://pastebin.com/ZKad1ET6
> >
> > MemoryRecords partitionRecords = (MemoryRecords)
> > partitionData.records();
> > for (RecordBatch batch : partitionRecords.batches()) {
> > // Iterate through reords of a batch
> > Buffer batchBuffer = Buffer.buffer();
> > Iterator<org.apache.kafka.common.record.Record> it =
> > batch.iterator();
> > while (it.hasNext()) {
> > org.apache.kafka.common.record.Record record = it.next();
> >
> > String k = "";
> > String v = "";
> >
> > for (Header header : record.headers()) {
> > v = new String(header.value());
> > // Some logic with k and v to write to a Buffer
> > }
> >
> > if (record.hasKey()) {
> > ByteBuffer keyBuffer = record.key();
> > ByteBuffer valueBuffer = record.value();
> >
> > if (record.hasValue()) {
> > k = new String(keyBuffer.array(), keyBuffer.position(),
> > record.keySize());
> > v = new String(valueBuffer.array(),
> valueBuffer.position(),
> > record.valueSize());
> > // Some logic with k and v to write to a Buffer
> > } else {
> > k = new String(keyBuffer.array(), keyBuffer.position(),
> > record.keySize());
> > // Some logic with k and v to write to a Buffer
> > }
> > } else {
> > // Empty buffer
> > }
> > }
> > }
> >
>
Comments
Post a Comment