From 78aaa42dddc3bb05a1179546912f3f77e01b4609 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 23 Nov 2017 16:12:22 +0300 Subject: [PATCH] add idempotent retries of updating the replica queue in ZooKeeper [#CLICKHOUSE-3405] --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 54 +++++++++++++++---- .../MergeTree/ReplicatedMergeTreeQueue.h | 11 ++-- .../Storages/StorageReplicatedMergeTree.cpp | 2 +- .../src/Storages/StorageReplicatedMergeTree.h | 1 + 4 files changed, 50 insertions(+), 18 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index fe5065da61f..65bb198484c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1,7 +1,8 @@ +#include #include #include -#include #include +#include #include @@ -15,6 +16,13 @@ namespace ErrorCodes } +ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(const StorageReplicatedMergeTree & storage_) + : storage(storage_) + , virtual_parts(storage.data.format_version) +{ +} + + void ReplicatedMergeTreeQueue::initVirtualParts(const MergeTreeData::DataParts & parts) { std::lock_guard lock(mutex); @@ -233,7 +241,8 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B { std::lock_guard lock(pull_logs_to_queue_mutex); - String index_str = zookeeper->get(replica_path + "/log_pointer"); + zkutil::Stat log_pointer_stat; + String index_str = zookeeper->get(replica_path + "/log_pointer", &log_pointer_stat); UInt64 index; Strings log_entries = zookeeper->getChildren(zookeeper_path + "/log"); @@ -243,7 +252,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B /// If we do not already have a pointer to the log, put a pointer to the first entry in it. index = log_entries.empty() ? 0 : parse(std::min_element(log_entries.begin(), log_entries.end())->substr(strlen("log-"))); - zookeeper->set(replica_path + "/log_pointer", toString(index)); + zookeeper->set(replica_path + "/log_pointer", toString(index), -1, &log_pointer_stat); } else { @@ -320,13 +329,36 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B } ops.emplace_back(std::make_unique( - replica_path + "/log_pointer", toString(last_entry_index + 1), -1)); + replica_path + "/log_pointer", toString(last_entry_index + 1), log_pointer_stat.version)); if (min_unprocessed_insert_time_changed) ops.emplace_back(std::make_unique( replica_path + "/min_unprocessed_insert_time", toString(min_unprocessed_insert_time), -1)); - auto results = zookeeper->multi(ops); + for (size_t num_tries = 0; ; ++num_tries) + { + if (storage.shutdown_called) + return false; + + try + { + zookeeper->multi(ops); + break; + } + catch (const zkutil::KeeperException & ex) + { + if (num_tries > 0 && ex.code == ZBADVERSION) + { + LOG_TRACE(log, "Version mismatch while updating the queue. Assuming we have already updated it."); + break; + } + + if (!ex.isTemporaryError()) + throw; + + tryLogCurrentException(log, "Temporary error while updating the queue (will retry)"); + } + } /// Now we have successfully updated the queue in ZooKeeper. Update it in RAM. @@ -422,7 +454,7 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z for (Queue::iterator it = queue.begin(); it != queue.end();) { if (((*it)->type == LogEntry::GET_PART || (*it)->type == LogEntry::MERGE_PARTS) && - MergeTreePartInfo::contains(part_name, (*it)->new_part_name, format_version)) + MergeTreePartInfo::contains(part_name, (*it)->new_part_name, storage.data.format_version)) { if ((*it)->currently_executing) to_wait.push_back(*it); @@ -461,14 +493,14 @@ ReplicatedMergeTreeQueue::Queue ReplicatedMergeTreeQueue::getConflictsForClearCo { if (elem->type == LogEntry::MERGE_PARTS || elem->type == LogEntry::GET_PART || elem->type == LogEntry::ATTACH_PART) { - if (MergeTreePartInfo::contains(entry.new_part_name, elem->new_part_name, format_version)) + if (MergeTreePartInfo::contains(entry.new_part_name, elem->new_part_name, storage.data.format_version)) conflicts.emplace_back(elem); } if (elem->type == LogEntry::CLEAR_COLUMN) { - auto cur_part = MergeTreePartInfo::fromPartName(elem->new_part_name, format_version); - auto part = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version); + auto cur_part = MergeTreePartInfo::fromPartName(elem->new_part_name, storage.data.format_version); + auto part = MergeTreePartInfo::fromPartName(entry.new_part_name, storage.data.format_version); if (part.partition_id == cur_part.partition_id) conflicts.emplace_back(elem); @@ -524,12 +556,12 @@ bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_ /// A more complex check is whether another part is currently created by other action that will cover this part. /// NOTE The above is redundant, but left for a more convenient message in the log. - auto result_part = MergeTreePartInfo::fromPartName(new_part_name, format_version); + auto result_part = MergeTreePartInfo::fromPartName(new_part_name, storage.data.format_version); /// It can slow down when the size of `future_parts` is large. But it can not be large, since `BackgroundProcessingPool` is limited. for (const auto & future_part_name : future_parts) { - auto future_part = MergeTreePartInfo::fromPartName(future_part_name, format_version); + auto future_part = MergeTreePartInfo::fromPartName(future_part_name, storage.data.format_version); if (future_part.contains(result_part)) { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 46c018431d4..4a1677630f1 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -1,3 +1,5 @@ +#pragma once + #include #include #include @@ -11,6 +13,7 @@ namespace DB class MergeTreeDataMerger; +class StorageReplicatedMergeTree; class ReplicatedMergeTreeQueue @@ -38,7 +41,7 @@ private: using InsertsByTime = std::set; - MergeTreeDataFormatVersion format_version; + const StorageReplicatedMergeTree & storage; String zookeeper_path; String replica_path; @@ -124,11 +127,7 @@ private: }; public: - ReplicatedMergeTreeQueue(MergeTreeDataFormatVersion format_version_) - : format_version(format_version_) - , virtual_parts(format_version) - { - } + ReplicatedMergeTreeQueue(const StorageReplicatedMergeTree & storage_); void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_, const MergeTreeData::DataParts & parts, zkutil::ZooKeeperPtr zookeeper); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 85d4b6661ec..4828892da69 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -188,7 +188,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( settings_, database_name_ + "." + table_name, true, attach, [this] (const std::string & name) { enqueuePartForCheck(name); }, [this] () { clearOldPartsAndRemoveFromZK(); }), - reader(data), writer(data), merger(data, context.getBackgroundPool()), queue(data.format_version), + reader(data), writer(data), merger(data, context.getBackgroundPool()), queue(*this), fetcher(data), shutdown_event(false), part_check_thread(*this), log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)")) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 108baf4028d..ebcc6b5bd4a 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -205,6 +205,7 @@ private: friend class ReplicatedMergeTreeRestartingThread; friend struct ReplicatedMergeTreeLogEntry; friend class ScopedPartitionMergeLock; + friend class ReplicatedMergeTreeQueue; using LogEntry = ReplicatedMergeTreeLogEntry; using LogEntryPtr = LogEntry::Ptr;