Consumer not receiving messages when subscribing to a topic but can receive message when assigning a partition
Hello
I have been struggling to receive messages when I subscribe to a topic or when I use a consumer group. However, when I assign a partition I am able to receive messages.
what am I doing wrong.
=======================
Consumer config:
=======================
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=console
socket.connection.setup.timeout.max.ms=5000
retry.backoff.max.ms=10000
max.poll.records=20
reconnect.backoff.max.ms=10000
socket.connection.setup.timeout.ms=2000
request.timeout.ms=5000
reconnect.backoff.ms=2000
read_uncommitted=read_committed
bootstrap.servers=localhost:9092
retry.backoff.ms=2000
enable.auto.commit=false
allow.auto.create.topics=true
fetch.max.wait.ms=5000
connections.max.idle.ms=600000
session.timeout.ms=1800000
max.poll.interval.ms=2000
auto.offset.reset=earliest
default.api.timeout.ms=5000
====================
Non-working Java code:
=====================================================================
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);
kafkaConsumer.subscribe(Collections.singletonList(topic));
while(true) {
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2));
.
.
}
======================================================================
Working code:
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);
TopicPartition topicPartition = new TopicPartition(topic, partition);
kafkaConsumer.assign(Collections.singletonList(topicPartition));
while(true) {
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2));
.
.
}
======================================================================
The behavior is the same while using CLI binaries.
Non-working CLI command:
=================================
> kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --from-beginning --group console
Processed a total of 0 messages
> kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --from-beginning
Processed a total of 0 messages
===================================
Working CLI command:
> kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --partition 0 --offset earliest
test1
value-6
value-10
.
.
===================================
regards
Ranganath Samudrala
I have been struggling to receive messages when I subscribe to a topic or when I use a consumer group. However, when I assign a partition I am able to receive messages.
what am I doing wrong.
=======================
Consumer config:
=======================
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=console
socket.connection.setup.timeout.max.ms=5000
retry.backoff.max.ms=10000
max.poll.records=20
reconnect.backoff.max.ms=10000
socket.connection.setup.timeout.ms=2000
request.timeout.ms=5000
reconnect.backoff.ms=2000
read_uncommitted=read_committed
bootstrap.servers=localhost:9092
retry.backoff.ms=2000
enable.auto.commit=false
allow.auto.create.topics=true
fetch.max.wait.ms=5000
connections.max.idle.ms=600000
session.timeout.ms=1800000
max.poll.interval.ms=2000
auto.offset.reset=earliest
default.api.timeout.ms=5000
====================
Non-working Java code:
=====================================================================
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);
kafkaConsumer.subscribe(Collections.singletonList(topic));
while(true) {
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2));
.
.
}
======================================================================
Working code:
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);
TopicPartition topicPartition = new TopicPartition(topic, partition);
kafkaConsumer.assign(Collections.singletonList(topicPartition));
while(true) {
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2));
.
.
}
======================================================================
The behavior is the same while using CLI binaries.
Non-working CLI command:
=================================
> kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --from-beginning --group console
Processed a total of 0 messages
> kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --from-beginning
Processed a total of 0 messages
===================================
Working CLI command:
> kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --partition 0 --offset earliest
test1
value-6
value-10
.
.
===================================
regards
Ranganath Samudrala
Comments
Post a Comment