Skip to main content

Consumer not receiving messages when subscribing to a topic but can receive message when assigning a partition

Hello
I have been struggling to receive messages when I subscribe to a topic or when I use a consumer group. However, when I assign a partition I am able to receive messages.
what am I doing wrong.

=======================
Consumer config:
=======================
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=console
socket.connection.setup.timeout.max.ms=5000
retry.backoff.max.ms=10000
max.poll.records=20
reconnect.backoff.max.ms=10000
socket.connection.setup.timeout.ms=2000
request.timeout.ms=5000
reconnect.backoff.ms=2000
read_uncommitted=read_committed
bootstrap.servers=localhost:9092
retry.backoff.ms=2000
enable.auto.commit=false
allow.auto.create.topics=true
fetch.max.wait.ms=5000
connections.max.idle.ms=600000
session.timeout.ms=1800000
max.poll.interval.ms=2000
auto.offset.reset=earliest
default.api.timeout.ms=5000

====================
Non-working Java code:

=====================================================================
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);
kafkaConsumer.subscribe(Collections.singletonList(topic));
while(true) {
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2));
.
.
}
======================================================================
Working code:

KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);
TopicPartition topicPartition = new TopicPartition(topic, partition);
kafkaConsumer.assign(Collections.singletonList(topicPartition));
while(true) {
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2));

.
.
}
======================================================================

The behavior is the same while using CLI binaries.

Non-working CLI command:
=================================
> kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --from-beginning --group console
Processed a total of 0 messages

> kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --from-beginning
Processed a total of 0 messages
===================================
Working CLI command:
> kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --partition 0 --offset earliest
test1
value-6
value-10
.
.
===================================

regards
Ranganath Samudrala

Comments