Re: [External] Re: Consumer not receiving messages when subscribing to a topic but can receive message when assigning a partition
Hello
Thanks for the response. Your suggestion of setting auto.commit to true and performing commitSync() while using subscribe method did not work. Poll does not return any records.
[L4J2-test] 12:21:38 INFO : test.kafka.TestKafkaCluster - Kafka properties:
{node.id=1, retry.backoff.max.ms=5000, log.flush.interval.messages=1, reconnect.backoff.max.ms=5000, listeners=CLIENT://127.0.0.1:56431,INTERNAL://127.0.0.1:56432,CONTROLLER://127.0.0.1:56434,EXTERNAL://127.0.0.1:56433, log.flush.interval.ms=1, inter.broker.listener.name=INTERNAL, reconnect.backoff.ms=2000, retry.backoff.ms=2000, bootstrap-server=127.0.0.1:56431, retries=10, controller.quorum.voters=1@127.0.0.1:56434<mailto:controller.quorum.voters=1@127.0.0.1:56434>, zookeeper.connect=127.0.0.1:56427, process.roles=broker,controller, controller.listener.names=CONTROLLER, broker.session.timeout.ms=600000, listeners.external.bootstrap.servers=PLAINTEXT://127.0.0.1:56433, connections.max.idle.ms=600000, max.connections=10, advertised.listeners=CLIENT://127.0.0.1:56431,INTERNAL://127.0.0.1:56432,CONTROLLER://127.0.0.1:56434,EXTERNAL://127.0.0.1:56433, log.dirs=/var/folders/vq/wxftld0j1k55ggcj01cm5xqr0000gn/T/kafka-tmp-dir10111617547501751573/kafka-logs, listener.security.protocol.map=CLIENT:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT}
[L4J2-test] 12:21:38 INFO : test.kafka.TestKafkaCluster - Kafka starting...
[L4J2-test] 12:21:38 INFO : test.kafka.TestKafkaCluster - Zookeeper starting...
[L4J2-test] 12:21:41 INFO : test.base.BaseTest - Using groupId: KafkaUtilsITTest
[L4J2-test] 12:21:41 INFO : jobs.kafka.common.KafkaUtilsITTest - Created consumer properties: {key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, socket.connection.setup.timeout.max.ms=5000, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, retry.backoff.max.ms=10000, max.poll.records=20, reconnect.backoff.max.ms=10000, socket.connection.setup.timeout.ms=2000, request.timeout.ms=5000, group.id=KafkaUtilsITTest, reconnect.backoff.ms=2000, read_uncommitted=read_committed, bootstrap.servers=127.0.0.1:56431, retry.backoff.ms=2000, enable.auto.commit=true, allow.auto.create.topics=true, fetch.max.wait.ms=5000, connections.max.idle.ms=600000, session.timeout.ms=1800000, max.poll.interval.ms=2000, auto.offset.reset=earliest, client.id=KafkaUtilsITTest, default.api.timeout.ms=5000}
[L4J2-test] 12:21:41 INFO : jobs.kafka.common.KafkaUtilsITTest - Created producer properties: {retries=5, value.serializer=org.apache.kafka.common.serialization.StringSerializer, retry.backoff.max.ms=10000, reconnect.backoff.max.ms=10000, request.timeout.ms=10000, reconnect.backoff.ms=2000, bootstrap.servers=127.0.0.1:56431, delivery.timeout.ms=20000, connections.max.idle.ms=600000, retry.backoff.ms=2000, key.serializer=org.apache.kafka.common.serialization.StringSerializer}
[L4J2-test] 12:21:42 INFO : core.kafka.KafkaUtils - Successfully created topic: input. topic UUID: SoTqJq1zRE-elNzGGVCnsg
[L4J2-test] 12:21:42 INFO : jobs.kafka.common.KafkaUtilsITTest - Created topic: input
[L4J2-test] 12:21:42 INFO : jobs.kafka.common.KafkaUtilsITTest - Sending record: ProducerRecord(topic=input, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=test-key, value=test-value, timestamp=null)
[L4J2-test] 12:21:43 INFO : jobs.kafka.common.KafkaUtilsITTest - Sent record to topic: input, parition: 1, offset: 0
kafkaConsumer.subscribe(Arrays.asList(KAFKA_INPUT_TOPIC));
String key = "test-key";
String value = "test-value";
final ProducerRecord<String, String> expected1 = newProducerRecord(KAFKA_INPUT_TOPIC, key, value);
LOG.info("Sending record: {}", expected1);
Future<RecordMetadata> sentRecordMetadata = kafkaProducer.send(expected1);
RecordMetadata recordMetadata = sentRecordMetadata.get();
LOG.info("Sent record to topic: {}, parition: {}, offset: {}"
, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
int count = records.count();
if(count > 0) {
kafkaConsumer.commitSync();
}
regards
Ranga
________________________________
From: Manabolu Sisindri <manabolusisi@gmail.com>
Sent: Thursday, June 26, 2025 12:55 PM
To: users@kafka.apache.org <users@kafka.apache.org>
Subject: [External] Re: Consumer not receiving messages when subscribing to a topic but can receive message when assigning a partition
Hi Ranganath, If messages are only received when a specific partition is assigned, but not when subscribing via a consumer group. This is because: --> The consumer config has enable. auto. commit=false, but no manual offset commits are being
Hi Ranganath,
If messages are only received when a specific partition is assigned, but
not when subscribing via a consumer group. This is because:
--> The consumer config has enable.auto.commit=false, but no manual offset
commits are being made (commitSync() is missing). As a result, Kafka
thinks there are no new messages to consume for the group.
--> Also, if offsets were already committed earlier, --from-beginning has
no effect unless the offsets are reset.
*Recommended fixes:*
1.
Add kafkaConsumer.commitSync() after polling records in Java code.
2.
temporarily set enable.auto.commit=true to allow auto commits.
3.
For CLI, reset the group offset using:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group
console --topic input --reset-offsets --to-earliest --execute
Regards,
Sisindri M.
On Thu, Jun 26, 2025 at 1:03 AM Samudrala, Ranganath [USA]
<Samudrala_Ranganath@bah.com.invalid> wrote:
> Hello
> I have been struggling to receive messages when I subscribe to a topic or
> when I use a consumer group. However, when I assign a partition I am able
> to receive messages.
> what am I doing wrong.
>
> =======================
> Consumer config:
> =======================
> key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
> value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
> group.id=console
> socket.connection.setup.timeout.max.ms=5000
> retry.backoff.max.ms=10000
> max.poll.records=20
> reconnect.backoff.max.ms=10000
> socket.connection.setup.timeout.ms=2000
> request.timeout.ms=5000
> reconnect.backoff.ms=2000
> read_uncommitted=read_committed
> bootstrap.servers=localhost:9092
> retry.backoff.ms=2000
> enable.auto.commit=false
> allow.auto.create.topics=true
> fetch.max.wait.ms=5000
> connections.max.idle.ms=600000
> session.timeout.ms=1800000
> max.poll.interval.ms=2000
> auto.offset.reset=earliest
> default.api.timeout.ms=5000
>
> ====================
> Non-working Java code:
>
> =====================================================================
> KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);
> kafkaConsumer.subscribe(Collections.singletonList(topic));
> while(true) {
> ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2));
> .
> .
> }
> ======================================================================
> Working code:
>
> KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);
> TopicPartition topicPartition = new TopicPartition(topic, partition);
> kafkaConsumer.assign(Collections.singletonList(topicPartition));
> while(true) {
> ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2));
>
> .
> .
> }
> ======================================================================
>
> The behavior is the same while using CLI binaries.
>
> Non-working CLI command:
> =================================
> > kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
> input --from-beginning --group console
> Processed a total of 0 messages
>
> > kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
> input --from-beginning
> Processed a total of 0 messages
> ===================================
> Working CLI command:
> > kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
> input --partition 0 --offset earliest
> test1
> value-6
> value-10
> .
> .
> ===================================
>
> regards
> Ranganath Samudrala
>
Thanks for the response. Your suggestion of setting auto.commit to true and performing commitSync() while using subscribe method did not work. Poll does not return any records.
[L4J2-test] 12:21:38 INFO : test.kafka.TestKafkaCluster - Kafka properties:
{node.id=1, retry.backoff.max.ms=5000, log.flush.interval.messages=1, reconnect.backoff.max.ms=5000, listeners=CLIENT://127.0.0.1:56431,INTERNAL://127.0.0.1:56432,CONTROLLER://127.0.0.1:56434,EXTERNAL://127.0.0.1:56433, log.flush.interval.ms=1, inter.broker.listener.name=INTERNAL, reconnect.backoff.ms=2000, retry.backoff.ms=2000, bootstrap-server=127.0.0.1:56431, retries=10, controller.quorum.voters=1@127.0.0.1:56434<mailto:controller.quorum.voters=1@127.0.0.1:56434>, zookeeper.connect=127.0.0.1:56427, process.roles=broker,controller, controller.listener.names=CONTROLLER, broker.session.timeout.ms=600000, listeners.external.bootstrap.servers=PLAINTEXT://127.0.0.1:56433, connections.max.idle.ms=600000, max.connections=10, advertised.listeners=CLIENT://127.0.0.1:56431,INTERNAL://127.0.0.1:56432,CONTROLLER://127.0.0.1:56434,EXTERNAL://127.0.0.1:56433, log.dirs=/var/folders/vq/wxftld0j1k55ggcj01cm5xqr0000gn/T/kafka-tmp-dir10111617547501751573/kafka-logs, listener.security.protocol.map=CLIENT:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT}
[L4J2-test] 12:21:38 INFO : test.kafka.TestKafkaCluster - Kafka starting...
[L4J2-test] 12:21:38 INFO : test.kafka.TestKafkaCluster - Zookeeper starting...
[L4J2-test] 12:21:41 INFO : test.base.BaseTest - Using groupId: KafkaUtilsITTest
[L4J2-test] 12:21:41 INFO : jobs.kafka.common.KafkaUtilsITTest - Created consumer properties: {key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, socket.connection.setup.timeout.max.ms=5000, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, retry.backoff.max.ms=10000, max.poll.records=20, reconnect.backoff.max.ms=10000, socket.connection.setup.timeout.ms=2000, request.timeout.ms=5000, group.id=KafkaUtilsITTest, reconnect.backoff.ms=2000, read_uncommitted=read_committed, bootstrap.servers=127.0.0.1:56431, retry.backoff.ms=2000, enable.auto.commit=true, allow.auto.create.topics=true, fetch.max.wait.ms=5000, connections.max.idle.ms=600000, session.timeout.ms=1800000, max.poll.interval.ms=2000, auto.offset.reset=earliest, client.id=KafkaUtilsITTest, default.api.timeout.ms=5000}
[L4J2-test] 12:21:41 INFO : jobs.kafka.common.KafkaUtilsITTest - Created producer properties: {retries=5, value.serializer=org.apache.kafka.common.serialization.StringSerializer, retry.backoff.max.ms=10000, reconnect.backoff.max.ms=10000, request.timeout.ms=10000, reconnect.backoff.ms=2000, bootstrap.servers=127.0.0.1:56431, delivery.timeout.ms=20000, connections.max.idle.ms=600000, retry.backoff.ms=2000, key.serializer=org.apache.kafka.common.serialization.StringSerializer}
[L4J2-test] 12:21:42 INFO : core.kafka.KafkaUtils - Successfully created topic: input. topic UUID: SoTqJq1zRE-elNzGGVCnsg
[L4J2-test] 12:21:42 INFO : jobs.kafka.common.KafkaUtilsITTest - Created topic: input
[L4J2-test] 12:21:42 INFO : jobs.kafka.common.KafkaUtilsITTest - Sending record: ProducerRecord(topic=input, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=test-key, value=test-value, timestamp=null)
[L4J2-test] 12:21:43 INFO : jobs.kafka.common.KafkaUtilsITTest - Sent record to topic: input, parition: 1, offset: 0
kafkaConsumer.subscribe(Arrays.asList(KAFKA_INPUT_TOPIC));
String key = "test-key";
String value = "test-value";
final ProducerRecord<String, String> expected1 = newProducerRecord(KAFKA_INPUT_TOPIC, key, value);
LOG.info("Sending record: {}", expected1);
Future<RecordMetadata> sentRecordMetadata = kafkaProducer.send(expected1);
RecordMetadata recordMetadata = sentRecordMetadata.get();
LOG.info("Sent record to topic: {}, parition: {}, offset: {}"
, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
int count = records.count();
if(count > 0) {
kafkaConsumer.commitSync();
}
regards
Ranga
________________________________
From: Manabolu Sisindri <manabolusisi@gmail.com>
Sent: Thursday, June 26, 2025 12:55 PM
To: users@kafka.apache.org <users@kafka.apache.org>
Subject: [External] Re: Consumer not receiving messages when subscribing to a topic but can receive message when assigning a partition
Hi Ranganath, If messages are only received when a specific partition is assigned, but not when subscribing via a consumer group. This is because: --> The consumer config has enable. auto. commit=false, but no manual offset commits are being
Hi Ranganath,
If messages are only received when a specific partition is assigned, but
not when subscribing via a consumer group. This is because:
--> The consumer config has enable.auto.commit=false, but no manual offset
commits are being made (commitSync() is missing). As a result, Kafka
thinks there are no new messages to consume for the group.
--> Also, if offsets were already committed earlier, --from-beginning has
no effect unless the offsets are reset.
*Recommended fixes:*
1.
Add kafkaConsumer.commitSync() after polling records in Java code.
2.
temporarily set enable.auto.commit=true to allow auto commits.
3.
For CLI, reset the group offset using:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group
console --topic input --reset-offsets --to-earliest --execute
Regards,
Sisindri M.
On Thu, Jun 26, 2025 at 1:03 AM Samudrala, Ranganath [USA]
<Samudrala_Ranganath@bah.com.invalid> wrote:
> Hello
> I have been struggling to receive messages when I subscribe to a topic or
> when I use a consumer group. However, when I assign a partition I am able
> to receive messages.
> what am I doing wrong.
>
> =======================
> Consumer config:
> =======================
> key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
> value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
> group.id=console
> socket.connection.setup.timeout.max.ms=5000
> retry.backoff.max.ms=10000
> max.poll.records=20
> reconnect.backoff.max.ms=10000
> socket.connection.setup.timeout.ms=2000
> request.timeout.ms=5000
> reconnect.backoff.ms=2000
> read_uncommitted=read_committed
> bootstrap.servers=localhost:9092
> retry.backoff.ms=2000
> enable.auto.commit=false
> allow.auto.create.topics=true
> fetch.max.wait.ms=5000
> connections.max.idle.ms=600000
> session.timeout.ms=1800000
> max.poll.interval.ms=2000
> auto.offset.reset=earliest
> default.api.timeout.ms=5000
>
> ====================
> Non-working Java code:
>
> =====================================================================
> KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);
> kafkaConsumer.subscribe(Collections.singletonList(topic));
> while(true) {
> ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2));
> .
> .
> }
> ======================================================================
> Working code:
>
> KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);
> TopicPartition topicPartition = new TopicPartition(topic, partition);
> kafkaConsumer.assign(Collections.singletonList(topicPartition));
> while(true) {
> ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2));
>
> .
> .
> }
> ======================================================================
>
> The behavior is the same while using CLI binaries.
>
> Non-working CLI command:
> =================================
> > kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
> input --from-beginning --group console
> Processed a total of 0 messages
>
> > kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
> input --from-beginning
> Processed a total of 0 messages
> ===================================
> Working CLI command:
> > kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
> input --partition 0 --offset earliest
> test1
> value-6
> value-10
> .
> .
> ===================================
>
> regards
> Ranganath Samudrala
>
Comments
Post a Comment