From a83832c03e64b4abdc72fab07e322c7ba87b6015 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Mon, 27 Dec 2021 18:54:28 +0300 Subject: [PATCH 1/2] fix flushing of in-memory parts --- src/Storages/StorageMergeTree.cpp | 4 ++++ src/Storages/StorageMergeTree.h | 1 + src/Storages/StorageReplicatedMergeTree.cpp | 13 +++++++++++++ src/Storages/StorageReplicatedMergeTree.h | 4 ++++ 4 files changed, 22 insertions(+) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 03ac27d0e46..02ea036c8ea 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -141,6 +141,10 @@ void StorageMergeTree::startup() void StorageMergeTree::flush() { + if (flush_called) + return; + + flush_called = true; flushAllInMemoryPartsIfNeeded(); } diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index ee99b412f59..4f600404c72 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -140,6 +140,7 @@ private: std::map, UInt64> updated_version_by_block_range; std::atomic shutdown_called {false}; + std::atomic flush_called {false}; private: void loadMutations(); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index a1f82e14868..a46a7c2f66f 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4120,9 +4120,22 @@ void StorageReplicatedMergeTree::startup() } } +void StorageReplicatedMergeTree::flush() +{ + if (flush_called) + return; + + flush_called = true; + flushAllInMemoryPartsIfNeeded(); +} void StorageReplicatedMergeTree::shutdown() { + if (shutdown_called) + return; + + shutdown_called = true; + /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. fetcher.blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever(); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 6861d89f070..cdee22899c4 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -86,6 +86,7 @@ class StorageReplicatedMergeTree final : public shared_ptr_helper shutdown_called {false}; + std::atomic flush_called {false}; + int metadata_version = 0; /// Threads. From 4ebf61b809707d776a85d9b7e6206769d5deb7f8 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 29 Dec 2021 01:03:55 +0300 Subject: [PATCH 2/2] proper checking of atomic flags --- src/Storages/StorageMergeTree.cpp | 6 ++---- src/Storages/StorageReplicatedMergeTree.cpp | 7 ++----- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 02ea036c8ea..d14ee672727 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -141,18 +141,16 @@ void StorageMergeTree::startup() void StorageMergeTree::flush() { - if (flush_called) + if (flush_called.exchange(true)) return; - flush_called = true; flushAllInMemoryPartsIfNeeded(); } void StorageMergeTree::shutdown() { - if (shutdown_called) + if (shutdown_called.exchange(true)) return; - shutdown_called = true; /// Unlock all waiting mutations { diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index a46a7c2f66f..5f829db72f8 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4122,20 +4122,17 @@ void StorageReplicatedMergeTree::startup() void StorageReplicatedMergeTree::flush() { - if (flush_called) + if (flush_called.exchange(true)) return; - flush_called = true; flushAllInMemoryPartsIfNeeded(); } void StorageReplicatedMergeTree::shutdown() { - if (shutdown_called) + if (shutdown_called.exchange(true)) return; - shutdown_called = true; - /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. fetcher.blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever();