diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 03ac27d0e46..02ea036c8ea 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -141,6 +141,10 @@ void StorageMergeTree::startup() void StorageMergeTree::flush() { + if (flush_called) + return; + + flush_called = true; flushAllInMemoryPartsIfNeeded(); } diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index ee99b412f59..4f600404c72 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -140,6 +140,7 @@ private: std::map, UInt64> updated_version_by_block_range; std::atomic shutdown_called {false}; + std::atomic flush_called {false}; private: void loadMutations(); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index a1f82e14868..a46a7c2f66f 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4120,9 +4120,22 @@ void StorageReplicatedMergeTree::startup() } } +void StorageReplicatedMergeTree::flush() +{ + if (flush_called) + return; + + flush_called = true; + flushAllInMemoryPartsIfNeeded(); +} void StorageReplicatedMergeTree::shutdown() { + if (shutdown_called) + return; + + shutdown_called = true; + /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. fetcher.blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever(); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 6861d89f070..cdee22899c4 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -86,6 +86,7 @@ class StorageReplicatedMergeTree final : public shared_ptr_helper shutdown_called {false}; + std::atomic flush_called {false}; + int metadata_version = 0; /// Threads.