fix flushing of in-memory parts

This commit is contained in:
Anton Popov 2021-12-27 18:54:28 +03:00
parent 454207a9a2
commit a83832c03e
4 changed files with 22 additions and 0 deletions

View File

@ -141,6 +141,10 @@ void StorageMergeTree::startup()
void StorageMergeTree::flush() void StorageMergeTree::flush()
{ {
if (flush_called)
return;
flush_called = true;
flushAllInMemoryPartsIfNeeded(); flushAllInMemoryPartsIfNeeded();
} }

View File

@ -140,6 +140,7 @@ private:
std::map<std::pair<Int64, Int64>, UInt64> updated_version_by_block_range; std::map<std::pair<Int64, Int64>, UInt64> updated_version_by_block_range;
std::atomic<bool> shutdown_called {false}; std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};
private: private:
void loadMutations(); void loadMutations();

View File

@ -4120,9 +4120,22 @@ void StorageReplicatedMergeTree::startup()
} }
} }
void StorageReplicatedMergeTree::flush()
{
if (flush_called)
return;
flush_called = true;
flushAllInMemoryPartsIfNeeded();
}
void StorageReplicatedMergeTree::shutdown() void StorageReplicatedMergeTree::shutdown()
{ {
if (shutdown_called)
return;
shutdown_called = true;
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever(); fetcher.blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever();

View File

@ -86,6 +86,7 @@ class StorageReplicatedMergeTree final : public shared_ptr_helper<StorageReplica
public: public:
void startup() override; void startup() override;
void shutdown() override; void shutdown() override;
void flush() override;
~StorageReplicatedMergeTree() override; ~StorageReplicatedMergeTree() override;
std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree"; } std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree"; }
@ -350,6 +351,9 @@ private:
/// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires. /// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires.
Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};
int metadata_version = 0; int metadata_version = 0;
/// Threads. /// Threads.