Hi,
I'm trying to write a very basic Kafka streams consumer in Java.
Once I add a KTable, I see a message in the server log that I have been unsubscribed from all topics.
Doing the same with a KStream instead of KTable works fine for me.
I'm using Kafka version 3.2.0 (kafka_2.13-3.2.0) and am running on raspbian OS.
I tried modifying the group.initial.rebalance.delay.ms in the server properties but this did not help.
The message I get in the server log is:
[2022-05-28 00:29:43,989] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group streams-wiki-created-table in Empty state. Created a new member id streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,055] INFO [GroupCoordinator 0]: Preparing to rebalance group streams-wiki-created-table in state PreparingRebalance with old generation 2 (__consumer_offsets-16) (reason: Adding new member streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,089] INFO [GroupCoordinator 0]: Stabilized group streams-wiki-created-table generation 3 (__consumer_offsets-16) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,458] INFO [GroupCoordinator 0]: Assignment received from leader streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e for group streams-wiki-created-table for generation 3. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,955] INFO [GroupCoordinator 0]: Preparing to rebalance group streams-wiki-created-table in state PreparingRebalance with old generation 3 (__consumer_offsets-16) (reason: Removing member streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e on LeaveGroup; client reason: the consumer unsubscribed from all topics) (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,960] INFO [GroupCoordinator 0]: Group streams-wiki-created-table with generation 4 is now empty (__consumer_offsets-16) (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,998] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e, groupInstanceId=None, clientId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer, clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=List(stream)) has left group streams-wiki-created-table through explicit `LeaveGroup`; client reason: the consumer unsubscribed from all topics (kafka.coordinator.group.GroupCoordinator)
My code is as following:
properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wiki-created-table");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
TimeWindows monthWindow = TimeWindows.of(Duration.ofDays(30));
TimeWindows weekWindow = TimeWindows.of(Duration.ofDays(7));
TimeWindows dayWindow = TimeWindows.of(Duration.ofDays(1));
TimeWindows hourWindow = TimeWindows.of(Duration.ofHours(1));
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Long> createdPagesUserTypeTable = builder.stream("temp-create-stream", Consumed.with(Serdes.String(), WikiEventSerdes.WikiEvent()))
.selectKey((ignored, value) -> value.getUserType()).groupByKey().count();
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
Can someone please help me figure out what's wrong here?
Thanks,
Meir
I'm trying to write a very basic Kafka streams consumer in Java.
Once I add a KTable, I see a message in the server log that I have been unsubscribed from all topics.
Doing the same with a KStream instead of KTable works fine for me.
I'm using Kafka version 3.2.0 (kafka_2.13-3.2.0) and am running on raspbian OS.
I tried modifying the group.initial.rebalance.delay.ms in the server properties but this did not help.
The message I get in the server log is:
[2022-05-28 00:29:43,989] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group streams-wiki-created-table in Empty state. Created a new member id streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,055] INFO [GroupCoordinator 0]: Preparing to rebalance group streams-wiki-created-table in state PreparingRebalance with old generation 2 (__consumer_offsets-16) (reason: Adding new member streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,089] INFO [GroupCoordinator 0]: Stabilized group streams-wiki-created-table generation 3 (__consumer_offsets-16) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,458] INFO [GroupCoordinator 0]: Assignment received from leader streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e for group streams-wiki-created-table for generation 3. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,955] INFO [GroupCoordinator 0]: Preparing to rebalance group streams-wiki-created-table in state PreparingRebalance with old generation 3 (__consumer_offsets-16) (reason: Removing member streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e on LeaveGroup; client reason: the consumer unsubscribed from all topics) (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,960] INFO [GroupCoordinator 0]: Group streams-wiki-created-table with generation 4 is now empty (__consumer_offsets-16) (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,998] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e, groupInstanceId=None, clientId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer, clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=List(stream)) has left group streams-wiki-created-table through explicit `LeaveGroup`; client reason: the consumer unsubscribed from all topics (kafka.coordinator.group.GroupCoordinator)
My code is as following:
properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wiki-created-table");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
TimeWindows monthWindow = TimeWindows.of(Duration.ofDays(30));
TimeWindows weekWindow = TimeWindows.of(Duration.ofDays(7));
TimeWindows dayWindow = TimeWindows.of(Duration.ofDays(1));
TimeWindows hourWindow = TimeWindows.of(Duration.ofHours(1));
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Long> createdPagesUserTypeTable = builder.stream("temp-create-stream", Consumed.with(Serdes.String(), WikiEventSerdes.WikiEvent()))
.selectKey((ignored, value) -> value.getUserType()).groupByKey().count();
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
Can someone please help me figure out what's wrong here?
Thanks,
Meir
Comments
Post a Comment