diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 90440ed084f..49040ecbeb3 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -142,14 +142,16 @@ void StorageMergeTree::startup() void StorageMergeTree::flush() { + if (flush_called.exchange(true)) + return; + flushAllInMemoryPartsIfNeeded(); } void StorageMergeTree::shutdown() { - if (shutdown_called) + if (shutdown_called.exchange(true)) return; - shutdown_called = true; /// Unlock all waiting mutations { diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index af7e112462f..d3970449ceb 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -140,6 +140,7 @@ private: std::map, UInt64> updated_version_by_block_range; std::atomic shutdown_called {false}; + std::atomic flush_called {false}; void loadMutations(); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index a23859e7b5e..7743736724f 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4070,9 +4070,19 @@ void StorageReplicatedMergeTree::startup() } } +void StorageReplicatedMergeTree::flush() +{ + if (flush_called.exchange(true)) + return; + + flushAllInMemoryPartsIfNeeded(); +} void StorageReplicatedMergeTree::shutdown() { + if (shutdown_called.exchange(true)) + return; + /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. fetcher.blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever(); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index c91152ca0f3..4fe2c080333 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -87,6 +87,7 @@ class StorageReplicatedMergeTree final : public shared_ptr_helper shutdown_called {false}; + std::atomic flush_called {false}; + int metadata_version = 0; /// Threads.