Hi,
I am struggling to get the key-value pair from the Produce Request API. I
want to write it to a Buffer for further processing. I can't seem to get
the `k` and `v` values whereas the `.keySize` and `.valueSize` are reported
correctly. Please advise how to extract the key value pairs from the
Produce request API payload.
For better format, see https://pastebin.com/ZKad1ET6
MemoryRecords partitionRecords = (MemoryRecords)
partitionData.records();
for (RecordBatch batch : partitionRecords.batches()) {
// Iterate through reords of a batch
Buffer batchBuffer = Buffer.buffer();
Iterator<org.apache.kafka.common.record.Record> it =
batch.iterator();
while (it.hasNext()) {
org.apache.kafka.common.record.Record record = it.next();
String k = "";
String v = "";
for (Header header : record.headers()) {
v = new String(header.value());
// Some logic with k and v to write to a Buffer
}
if (record.hasKey()) {
ByteBuffer keyBuffer = record.key();
ByteBuffer valueBuffer = record.value();
if (record.hasValue()) {
k = new String(keyBuffer.array(), keyBuffer.position(),
record.keySize());
v = new String(valueBuffer.array(), valueBuffer.position(),
record.valueSize());
// Some logic with k and v to write to a Buffer
} else {
k = new String(keyBuffer.array(), keyBuffer.position(),
record.keySize());
// Some logic with k and v to write to a Buffer
}
} else {
// Empty buffer
}
}
}
I am struggling to get the key-value pair from the Produce Request API. I
want to write it to a Buffer for further processing. I can't seem to get
the `k` and `v` values whereas the `.keySize` and `.valueSize` are reported
correctly. Please advise how to extract the key value pairs from the
Produce request API payload.
For better format, see https://pastebin.com/ZKad1ET6
MemoryRecords partitionRecords = (MemoryRecords)
partitionData.records();
for (RecordBatch batch : partitionRecords.batches()) {
// Iterate through reords of a batch
Buffer batchBuffer = Buffer.buffer();
Iterator<org.apache.kafka.common.record.Record> it =
batch.iterator();
while (it.hasNext()) {
org.apache.kafka.common.record.Record record = it.next();
String k = "";
String v = "";
for (Header header : record.headers()) {
v = new String(header.value());
// Some logic with k and v to write to a Buffer
}
if (record.hasKey()) {
ByteBuffer keyBuffer = record.key();
ByteBuffer valueBuffer = record.value();
if (record.hasValue()) {
k = new String(keyBuffer.array(), keyBuffer.position(),
record.keySize());
v = new String(valueBuffer.array(), valueBuffer.position(),
record.valueSize());
// Some logic with k and v to write to a Buffer
} else {
k = new String(keyBuffer.array(), keyBuffer.position(),
record.keySize());
// Some logic with k and v to write to a Buffer
}
} else {
// Empty buffer
}
}
}
Comments
Post a Comment