From 797e21761ab3e2eb1c019da71e44400898125f68 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 24 Aug 2022 17:44:14 +0000 Subject: [PATCH] Address PR comments --- src/Interpreters/Context.h | 1 - .../ReplicatedMergeTreeAttachThread.cpp | 27 +++--------- .../ReplicatedMergeTreeRestartingThread.cpp | 3 +- src/Storages/StorageReplicatedMergeTree.cpp | 43 ++++++++----------- src/Storages/StorageReplicatedMergeTree.h | 7 ++- 5 files changed, 29 insertions(+), 52 deletions(-) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index f044df02d61..ea03b8e6586 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -739,7 +739,6 @@ public: ReplicatedFetchList & getReplicatedFetchList(); const ReplicatedFetchList & getReplicatedFetchList() const; - /// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call. /// If no ZooKeeper configured, throws an exception. std::shared_ptr getZooKeeper() const; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index fd8ad2767b7..566d5dc3fa3 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -1,6 +1,6 @@ #include #include -#include "Common/ZooKeeper/IKeeper.h" +#include namespace DB { @@ -48,22 +48,16 @@ void ReplicatedMergeTreeAttachThread::run() catch (const Exception & e) { if (const auto * coordination_exception = dynamic_cast(&e)) - { - std::array retriable_errors{ - Coordination::Error::ZCONNECTIONLOSS, Coordination::Error::ZSESSIONEXPIRED, Coordination::Error::ZOPERATIONTIMEOUT}; - needs_retry = std::any_of( - retriable_errors.begin(), retriable_errors.end(), [&](const auto error) { return error == coordination_exception->code; }); - } + needs_retry = Coordination::isHardwareError(coordination_exception->code); - if (!needs_retry) + if (needs_retry) { - LOG_ERROR(log, "Initialization failed, table will remain readonly. Error: {}", e.message()); - std::lock_guard lock(storage.initialization_mutex); - storage.initialization_done = true; + LOG_ERROR(log, "Initialization failed. Error: {}", e.message()); } else { - LOG_ERROR(log, "Initialization failed. Error: {}", e.message()); + LOG_ERROR(log, "Initialization failed, table will remain readonly. Error: {}", e.message()); + storage.initialization_done = true; } } @@ -160,14 +154,7 @@ void ReplicatedMergeTreeAttachThread::runImpl() void ReplicatedMergeTreeAttachThread::finalizeInitialization() TSA_NO_THREAD_SAFETY_ANALYSIS { - std::unique_lock lock(storage.initialization_mutex); - if (storage.startup_called) - { - lock.unlock(); - storage.startupImpl(); - lock.lock(); - } - + storage.startupImpl(); storage.initialization_done = true; LOG_INFO(log, "Table is initialized"); } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index dc960484f92..9d95189b611 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -104,8 +104,7 @@ void ReplicatedMergeTreeRestartingThread::run() bool ReplicatedMergeTreeRestartingThread::runImpl() { - auto zookeeper = storage.tryGetZooKeeper(); - if (!storage.is_readonly && zookeeper && !zookeeper->expired()) + if (!storage.is_readonly && !storage.getZooKeeper()->expired()) return true; if (first_time) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 9682a25ca18..453288361ba 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -277,7 +277,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( , cleanup_thread(*this) , part_check_thread(*this) , restarting_thread(*this) - , attach_thread(*this) , part_moves_between_shards_orchestrator(*this) , renaming_restrictions(renaming_restrictions_) , replicated_fetches_pool_size(getContext()->getSettingsRef().background_fetches_pool_size) @@ -382,10 +381,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( if (attach) { LOG_INFO(log, "Table will be in readonly mode until initialization is finished"); - attach_thread.setSkipSanityChecks(skip_sanity_checks); - attach_thread.start(); - attach_thread.waitFirstTry(); - + attach_thread.emplace(*this); + attach_thread->setSkipSanityChecks(skip_sanity_checks); return; } @@ -572,6 +569,7 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( void StorageReplicatedMergeTree::createNewZooKeeperNodes() { auto zookeeper = getZooKeeper(); + std::vector futures; /// These 4 nodes used to be created in createNewZookeeperNodes() and they were moved to createTable() @@ -819,16 +817,13 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada void StorageReplicatedMergeTree::drop() { + assert(shutdown_called); + /// There is also the case when user has configured ClickHouse to wrong ZooKeeper cluster /// or metadata of staled replica were removed manually, /// in this case, has_metadata_in_zookeeper = false, and we also permit to drop the table. - bool maybe_has_metadata_in_zookeeper = false; - { - std::lock_guard lock{initialization_mutex}; - maybe_has_metadata_in_zookeeper = !initialization_done || !has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper; - } - + bool maybe_has_metadata_in_zookeeper = !has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper; if (maybe_has_metadata_in_zookeeper) { /// Table can be shut down, restarting thread is not active @@ -843,7 +838,6 @@ void StorageReplicatedMergeTree::drop() if (!zookeeper) throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY); - shutdown(); dropReplica(zookeeper, zookeeper_path, replica_name, log, getSettings()); } @@ -1057,6 +1051,7 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper void StorageReplicatedMergeTree::checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot) { auto zookeeper = getZooKeeper(); + ReplicatedMergeTreeTableMetadata old_metadata(*this, metadata_snapshot); Coordination::Stat metadata_stat; @@ -1113,6 +1108,7 @@ static time_t tryGetPartCreateTime(zkutil::ZooKeeperPtr & zookeeper, const Strin void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) { auto zookeeper = getZooKeeper(); + Strings expected_parts_vec = zookeeper->getChildren(fs::path(replica_path) / "parts"); /// Parts in ZK. @@ -1252,6 +1248,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) void StorageReplicatedMergeTree::syncPinnedPartUUIDs() { auto zookeeper = getZooKeeper(); + Coordination::Stat stat; String s = zookeeper->get(zookeeper_path + "/pinned_part_uuids", &stat); @@ -3448,7 +3445,7 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n void StorageReplicatedMergeTree::startBeingLeader() { auto zookeeper = getZooKeeper(); - assert(zookeeper); + if (!getSettings()->replicated_can_become_leader) { LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0"); @@ -4156,13 +4153,11 @@ DataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart( void StorageReplicatedMergeTree::startup() { + if (attach_thread) { - std::lock_guard lock(initialization_mutex); - if (!initialization_done) - { - startup_called = true; - return; - } + attach_thread->start(); + attach_thread->waitFirstTry(); + return; } startupImpl(); @@ -4236,7 +4231,8 @@ void StorageReplicatedMergeTree::shutdown() mutations_finalizing_task->deactivate(); stopBeingLeader(); - attach_thread.shutdown(); + if (attach_thread) + attach_thread->shutdown(); restarting_thread.shutdown(); background_operations_assignee.finish(); part_moves_between_shards_orchestrator.shutdown(); @@ -4977,11 +4973,8 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper() { LOG_INFO(log, "Restoring replica metadata"); - { - std::lock_guard lock(initialization_mutex); - if (!initialization_done) - throw Exception(ErrorCodes::NOT_INITIALIZED, "Table is not initialized yet"); - } + if (!initialization_done) + throw Exception(ErrorCodes::NOT_INITIALIZED, "Table is not initialized yet"); if (!is_readonly) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica must be readonly"); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index d203f2bd2c9..24b4a4d5634 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -447,13 +447,12 @@ private: /// A thread that processes reconnection to ZooKeeper when the session expires. ReplicatedMergeTreeRestartingThread restarting_thread; - ReplicatedMergeTreeAttachThread attach_thread; + /// A thread that attaches the table using ZooKeeper + std::optional attach_thread; PartMovesBetweenShardsOrchestrator part_moves_between_shards_orchestrator; - mutable std::mutex initialization_mutex; - TSA_GUARDED_BY(initialization_mutex) bool initialization_done{false}; - TSA_GUARDED_BY(initialization_mutex) bool startup_called{false}; + std::atomic initialization_done{false}; /// True if replica was created for existing table with fixed granularity bool other_replicas_fixed_granularity = false;