Skip to main content

Re: Kafka Consumer Consumption based on TimeStamp-based position

Thanks. Just realised that it was in the API since 0.11.0. Thanks Steve.

On Sat, 23 Jan 2021 at 12:42, Steve Howard <steve.howard@confluent.io>
wrote:

> Hi,
>
> Yes, you can use the offsetsForTimes() method. See below for a simple
> example that should get you started...
>
> import org.apache.kafka.clients.consumer.*;
> import org.apache.kafka.common.config.ConfigException;
> import org.apache.kafka.common.*;
> import java.io.*;
> import java.time.Duration;
> import java.util.*;
> import java.text.*;
>
> public class searchByTime {
> static KafkaConsumer<String, String> c;
>
> public static void main(String args[]) throws Exception {
> Properties props = new Properties();
> props.put("bootstrap.servers","localhost:9092");
> props.put("max.poll.records",1);
> props.put("topic","yourtopicname");
> props.put("group.id",UUID.randomUUID().toString());
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> c = new KafkaConsumer<String, String>(props);
> String topic = (String)props.get("topic");
> c.subscribe(Collections.singletonList(topic));
> System.out.println("subscribed to topic " + topic);
> System.out.println(c.partitionsFor(topic));
> List<TopicPartition> partitions = new ArrayList<TopicPartition>();
> for (PartitionInfo p: c.partitionsFor(topic)) {
> partitions.add(new TopicPartition(topic,p.partition()));
> }
> System.out.println(partitions);
>
> long timestamp = Long.parseLong(args[0]);
> Map<TopicPartition, Long> partitionOffsetsRequest = new
> HashMap<>(partitions.size());
> for (TopicPartition partition : partitions) {
> partitionOffsetsRequest.put(new TopicPartition(partition.topic(),
> partition.partition()),
> timestamp);
> }
>
> final Map<TopicPartition, Long> result = new
> HashMap<>(partitions.size());
>
> for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
> c.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
> result.put(new TopicPartition(partitionToOffset.getKey().topic(),
> partitionToOffset.getKey().partition()),
> (partitionToOffset.getValue() == null)
> ? null : partitionToOffset.getValue().offset());
> }
>
> System.out.println(result);
> ConsumerRecords<String, String> records =
> c.poll(Duration.ofSeconds(1));
> for (TopicPartition part: result.keySet()){
> long offset = result.get(part);
> c.seek(part,offset);
> }
>
> System.out.println("trying to get records...");
> records = c.poll(Duration.ofSeconds(1));
> for (ConsumerRecord<String, String> record : records) {
> Date date = new Date(record.timestamp());
> DateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS");
> formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
> String dateFormatted = formatter.format(date);
> System.out.println("Received message: (" + record.key() + ", " +
> record.value() + ") at offset " + record.offset() + " at time " +
> dateFormatted);
> }
> }
> }
>
> Thanks,
>
> Steve
>
>
> On Sat, Jan 23, 2021 at 6:14 AM M. Manna <manmedia@gmail.com> wrote:
>
> > Hello,
> >
> > We know that using KafkaConsumer api we can replay messages from certain
> > offsets. However, we are not sure if we could specify timeStamp from
> which
> > we could replay messages again.
> >
> > Does anyone know if this is possible?
> >
> > Regards,
> >
>

Comments