Skip to main content

GlobalKTable with RocksDB - queries before state RUNNING?

Hi,

we have the following problem - a Kafka Topic ~20Megabytes is made available as GlobalKTable for queries. With using RocksDB the GKTable is ready for queries instantly even without having reading the data complete - all get() requests return null. After a few seconds the data is querieable correctly - but this is to late for our application. Once we switch to IN_MEMORY we get the expected behavior. The store is only ready after all data has been read from topic.

How can we achieve the same behavior with the RocksDB setup?

Snipet to build KafkaStreams Topology

builder.globalTable(
"topic-name",
Consumed.with(Serdes.String(), Serdes.String()),
Materialized.as(STORE_NAME).withStoreType(Materialized.StoreType.ROCKS_DB)
);

Query the Table

while (true) {
try {
return streams.store(
StoreQueryParameters.fromNameAndType(FileCrawlerKafkaTopologyProducer.STORE_NAME, QueryableStoreTypes.keyValueStore()));
} catch (InvalidStateStoreException e) {
logger.warn(e.getMessage());
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
}
}
}

The store is queried with getStore().get(key); <- here we get the null values.

This is the Log Output when RocksDB - first query before state RUNNING

...
2023-11-21 15:15:40,629 INFO [com.osr.serKafkaStreamsService] (Quarkus Main Thread) wait for kafka streams store to get ready: KafkaStreams has not been started, you can retry after calling start()
2023-11-21 15:15:41,781 INFO [org.apa.kaf.str.KafkaStreams] (pool-10-thread-1) stream-client [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State transition from CREATED to REBALANCING
2023-11-21 15:15:41,819 INFO [org.apa.kaf.str.sta.int.RocksDBTimestampedStore] (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread) Opening store store-name in regular mode
2023-11-21 15:15:41,825 INFO [org.apa.kaf.str.pro.int.GlobalStateManagerImpl] (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread) global-stream-thread [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread] Restoring state for global store store-name
2023-11-21 15:15:43,753 INFO [io.quarkus] (Quarkus Main Thread) demo 1.0-SNAPSHOT on JVM (powered by Quarkus 3.2.8.Final) started in 5.874s.
2023-11-21 15:15:43,754 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2023-11-21 15:15:43,756 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [apicurio-registry-avro, cdi, config-yaml, kafka-client, kafka-streams, logging-gelf, smallrye-context-propagation, smallrye-fault-tolerance, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
2023-11-21 15:15:44,195 INFO [com.osr.ser.KafkaStreamsService] (vert.x-worker-thread-1) new file processed
2023-11-21 15:15:44,629 INFO [org.apa.kaf.str.pro.int.GlobalStreamThread] (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread) global-stream-thread [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread] State transition from CREATED to RUNNING
2023-11-21 15:15:44,631 INFO [org.apa.kaf.str.KafkaStreams] (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread) stream-client [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State transition from REBALANCING to RUNNING
2023-11-21 15:15:44,631 INFO [org.apa.kaf.str.KafkaStreams] (pool-10-thread-1) stream-client [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] Started 0 stream threads
...

Once I configure with StoreType.IN_MEMORY no queries are done before the state is RUNNING

2023-11-21 15:28:25,511 WARN [com.osr.serKafkaStreamsService] (Quarkus Main Thread) KafkaStreams has not been started, you can retry after calling start()
2023-11-21 15:28:26,730 INFO [org.apa.kaf.str.KafkaStreams] (pool-10-thread-1) stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State transition from CREATED to REBALANCING
2023-11-21 15:28:26,752 INFO [org.apa.kaf.str.pro.int.GlobalStateManagerImpl] (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread) global-stream-thread [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread] Restoring state for global store store-name
2023-11-21 15:28:29,834 WARN [com.osr.serKafkaStreamsService] (Quarkus Main Thread) the state store, store-name, is not open.
2023-11-21 15:28:33,670 WARN [com.osr.serKafkaStreamsService] (Quarkus Main Thread) the state store, store-name, is not open.
2023-11-21 15:28:33,763 INFO [org.apa.kaf.str.pro.int.GlobalStreamThread] (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread) global-stream-thread [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread] State transition from CREATED to RUNNING
2023-11-21 15:28:33,765 INFO [org.apa.kaf.str.KafkaStreams] (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread) stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State transition from REBALANCING to RUNNING
2023-11-21 15:28:33,765 INFO [org.apa.kaf.str.KafkaStreams] (pool-10-thread-1) stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] Started 0 stream threads
2023-11-21 15:28:36,774 INFO [com.osr.serKafkaStreamsService] (vert.x-worker-thread-1) new file processed


Thanks for any input!
Christian

Comments