Skip to main content

Re: Kafka Consumer Consumption based on TimeStamp-based position

Hi,

Yes, you can use the offsetsForTimes() method. See below for a simple
example that should get you started...

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.*;
import java.io.*;
import java.time.Duration;
import java.util.*;
import java.text.*;

public class searchByTime {
static KafkaConsumer<String, String> c;

public static void main(String args[]) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("max.poll.records",1);
props.put("topic","yourtopicname");
props.put("group.id",UUID.randomUUID().toString());
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

c = new KafkaConsumer<String, String>(props);
String topic = (String)props.get("topic");
c.subscribe(Collections.singletonList(topic));
System.out.println("subscribed to topic " + topic);
System.out.println(c.partitionsFor(topic));
List<TopicPartition> partitions = new ArrayList<TopicPartition>();
for (PartitionInfo p: c.partitionsFor(topic)) {
partitions.add(new TopicPartition(topic,p.partition()));
}
System.out.println(partitions);

long timestamp = Long.parseLong(args[0]);
Map<TopicPartition, Long> partitionOffsetsRequest = new
HashMap<>(partitions.size());
for (TopicPartition partition : partitions) {
partitionOffsetsRequest.put(new TopicPartition(partition.topic(),
partition.partition()),
timestamp);
}

final Map<TopicPartition, Long> result = new
HashMap<>(partitions.size());

for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
c.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
result.put(new TopicPartition(partitionToOffset.getKey().topic(),
partitionToOffset.getKey().partition()),
(partitionToOffset.getValue() == null)
? null : partitionToOffset.getValue().offset());
}

System.out.println(result);
ConsumerRecords<String, String> records = c.poll(Duration.ofSeconds(1));
for (TopicPartition part: result.keySet()){
long offset = result.get(part);
c.seek(part,offset);
}

System.out.println("trying to get records...");
records = c.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
Date date = new Date(record.timestamp());
DateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS");
formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
String dateFormatted = formatter.format(date);
System.out.println("Received message: (" + record.key() + ", " +
record.value() + ") at offset " + record.offset() + " at time " +
dateFormatted);
}
}
}

Thanks,

Steve


On Sat, Jan 23, 2021 at 6:14 AM M. Manna <manmedia@gmail.com> wrote:

> Hello,
>
> We know that using KafkaConsumer api we can replay messages from certain
> offsets. However, we are not sure if we could specify timeStamp from which
> we could replay messages again.
>
> Does anyone know if this is possible?
>
> Regards,
>

Comments