After some more testing and debugging, it seems that it is caused by the
compaction option I've configured for RocksDB. When removed everything is
fine...
The option is as follow:
CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
fifoOptions.setMaxTableFilesSize(maxSize);
fifoOptions.setAllowCompaction(true);
options.setCompactionOptionsFIFO(fifoOptions);
options.setCompactionStyle(CompactionStyle.FIFO);
Le mar. 31 mars 2020 à 16:27, Nicolas Carlot <nicolas.carlot@chronopost.fr>
a écrit :
> Hello everyone,
>
> I'm currently facing an issue with RocksDb internal compaction process,
> which occurs when the local state store of several of my KafkaStream
> applications are being restored. This is sadly a huge concern as it
> completely discard resiliency over node failure as those often lead to a
> state store restoration. The only workaround I currently have is to delete
> the local store to restore it from scratch. I'm using version 2.4.1 of the
> Java libraries.
>
> The exception thrown by the KStream process is:
> org.apache.kafka.streams.errors.ProcessorStateException: Error while range
> compacting during restoring store merge_store
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> [kafka-stream-router.jar:?]
> Caused by: org.rocksdb.RocksDBException: Target level exceeds number of
> levels
> at org.rocksdb.RocksDB.compactRange(Native Method)
> ~[kafka-stream-router.jar:?]
> at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
> ~[kafka-stream-router.jar:?]
> ... 11 more
>
> Here is the state of partition 0_0 of one of those stores:
> -rw-r--r-- 1 java j2ee 148210568 31 mars 10:49 000139.sst
> -rw-r--r-- 1 java j2ee 21620385 31 mars 11:06 000184.sst
> -rw-r--r-- 1 java j2ee 0 31 mars 11:11 000198.log
> -rw-r--r-- 1 java j2ee 31602468 31 mars 11:31 000251.sst
> -rw-r--r-- 1 java j2ee 37856549 31 mars 12:00 000324.sst
> -rw-r--r-- 1 java j2ee 33498822 31 mars 12:26 000393.sst
> -rw-r--r-- 1 java j2ee 34368461 31 mars 12:49 000450.sst
> -rw-r--r-- 1 java j2ee 11371247 31 mars 12:55 000467.sst
> -rw-r--r-- 1 java j2ee 14356435 31 mars 13:04 000489.sst
> -rw-r--r-- 1 java j2ee 5858737 31 mars 13:05 000494.sst
> -rw-r--r-- 1 java j2ee 2545952 31 mars 14:08 000659.sst
> -rw-r--r-- 1 java j2ee 3187275 31 mars 15:27 000868.sst
> -rw-r--r-- 1 java j2ee 407017 31 mars 15:34 000885.sst
> -rw-r--r-- 1 java j2ee 590190 31 mars 15:45 000914.sst
> -rw-r--r-- 1 java j2ee 154471 31 mars 15:47 000919.sst
> -rw-r--r-- 1 java j2ee 139838 31 mars 15:49 000924.sst
> -rw-r--r-- 1 java j2ee 35058 31 mars 15:49 000925.sst
> -rw-r--r-- 1 java j2ee 33987 31 mars 15:50 000926.sst
> -rw-r--r-- 1 java j2ee 16 31 mars 11:11 CURRENT
> -rw-r--r-- 1 java j2ee 37 31 mars 10:33 IDENTITY
> -rw-r--r-- 1 java j2ee 0 31 mars 10:33 LOCK
> -rw-r--r-- 1 java j2ee 15340 31 mars 11:11 LOG
> -rw-r--r-- 1 java j2ee 15046 31 mars 10:33 LOG.old.1585643630145007
> -rw-r--r-- 1 java j2ee 15290 31 mars 10:33 LOG.old.1585643826265995
> -rw-r--r-- 1 java j2ee 15384 31 mars 10:37 LOG.old.1585645861692248
> -rw-r--r-- 1 java j2ee 60767 31 mars 15:55 MANIFEST-000197
> -rw-r--r-- 1 java j2ee 4857 31 mars 10:37 OPTIONS-000107
> -rw-r--r-- 1 java j2ee 4857 31 mars 11:11 OPTIONS-000200
>
> I can see that Kafka is running RocksDB's compaction with:
> public void toggleDbForBulkLoading() {
> try {
> db.compactRange(columnFamily, true, 1, 0);
> } catch (final RocksDBException e) {
> throw new ProcessorStateException("Error while range
> compacting during restoring store " + name, e);
> }
> }
>
> Seems related to an issue with RocksDB itself:
> https://github.com/facebook/rocksdb/issues/2734
> But i'm not sure of this.
> Any help would be greatly appreciated here :)
>
--
*Nicolas Carlot*
Lead dev
| | nicolas.carlot@chronopost.fr
*Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
[image: Logo Chronopost]
| chronopost.fr <http://www.chronopost.fr/>
Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et Twitter
<https://twitter.com/chronopost>.
[image: DPD Group]
compaction option I've configured for RocksDB. When removed everything is
fine...
The option is as follow:
CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
fifoOptions.setMaxTableFilesSize(maxSize);
fifoOptions.setAllowCompaction(true);
options.setCompactionOptionsFIFO(fifoOptions);
options.setCompactionStyle(CompactionStyle.FIFO);
Le mar. 31 mars 2020 à 16:27, Nicolas Carlot <nicolas.carlot@chronopost.fr>
a écrit :
> Hello everyone,
>
> I'm currently facing an issue with RocksDb internal compaction process,
> which occurs when the local state store of several of my KafkaStream
> applications are being restored. This is sadly a huge concern as it
> completely discard resiliency over node failure as those often lead to a
> state store restoration. The only workaround I currently have is to delete
> the local store to restore it from scratch. I'm using version 2.4.1 of the
> Java libraries.
>
> The exception thrown by the KStream process is:
> org.apache.kafka.streams.errors.ProcessorStateException: Error while range
> compacting during restoring store merge_store
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> [kafka-stream-router.jar:?]
> Caused by: org.rocksdb.RocksDBException: Target level exceeds number of
> levels
> at org.rocksdb.RocksDB.compactRange(Native Method)
> ~[kafka-stream-router.jar:?]
> at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
> ~[kafka-stream-router.jar:?]
> ... 11 more
>
> Here is the state of partition 0_0 of one of those stores:
> -rw-r--r-- 1 java j2ee 148210568 31 mars 10:49 000139.sst
> -rw-r--r-- 1 java j2ee 21620385 31 mars 11:06 000184.sst
> -rw-r--r-- 1 java j2ee 0 31 mars 11:11 000198.log
> -rw-r--r-- 1 java j2ee 31602468 31 mars 11:31 000251.sst
> -rw-r--r-- 1 java j2ee 37856549 31 mars 12:00 000324.sst
> -rw-r--r-- 1 java j2ee 33498822 31 mars 12:26 000393.sst
> -rw-r--r-- 1 java j2ee 34368461 31 mars 12:49 000450.sst
> -rw-r--r-- 1 java j2ee 11371247 31 mars 12:55 000467.sst
> -rw-r--r-- 1 java j2ee 14356435 31 mars 13:04 000489.sst
> -rw-r--r-- 1 java j2ee 5858737 31 mars 13:05 000494.sst
> -rw-r--r-- 1 java j2ee 2545952 31 mars 14:08 000659.sst
> -rw-r--r-- 1 java j2ee 3187275 31 mars 15:27 000868.sst
> -rw-r--r-- 1 java j2ee 407017 31 mars 15:34 000885.sst
> -rw-r--r-- 1 java j2ee 590190 31 mars 15:45 000914.sst
> -rw-r--r-- 1 java j2ee 154471 31 mars 15:47 000919.sst
> -rw-r--r-- 1 java j2ee 139838 31 mars 15:49 000924.sst
> -rw-r--r-- 1 java j2ee 35058 31 mars 15:49 000925.sst
> -rw-r--r-- 1 java j2ee 33987 31 mars 15:50 000926.sst
> -rw-r--r-- 1 java j2ee 16 31 mars 11:11 CURRENT
> -rw-r--r-- 1 java j2ee 37 31 mars 10:33 IDENTITY
> -rw-r--r-- 1 java j2ee 0 31 mars 10:33 LOCK
> -rw-r--r-- 1 java j2ee 15340 31 mars 11:11 LOG
> -rw-r--r-- 1 java j2ee 15046 31 mars 10:33 LOG.old.1585643630145007
> -rw-r--r-- 1 java j2ee 15290 31 mars 10:33 LOG.old.1585643826265995
> -rw-r--r-- 1 java j2ee 15384 31 mars 10:37 LOG.old.1585645861692248
> -rw-r--r-- 1 java j2ee 60767 31 mars 15:55 MANIFEST-000197
> -rw-r--r-- 1 java j2ee 4857 31 mars 10:37 OPTIONS-000107
> -rw-r--r-- 1 java j2ee 4857 31 mars 11:11 OPTIONS-000200
>
> I can see that Kafka is running RocksDB's compaction with:
> public void toggleDbForBulkLoading() {
> try {
> db.compactRange(columnFamily, true, 1, 0);
> } catch (final RocksDBException e) {
> throw new ProcessorStateException("Error while range
> compacting during restoring store " + name, e);
> }
> }
>
> Seems related to an issue with RocksDB itself:
> https://github.com/facebook/rocksdb/issues/2734
> But i'm not sure of this.
> Any help would be greatly appreciated here :)
>
--
*Nicolas Carlot*
Lead dev
| | nicolas.carlot@chronopost.fr
*Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
[image: Logo Chronopost]
| chronopost.fr <http://www.chronopost.fr/>
Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et Twitter
<https://twitter.com/chronopost>.
[image: DPD Group]
Comments
Post a Comment