diff --git a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h index 7482737bff4..4b805884a54 100644 --- a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h +++ b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h @@ -48,9 +48,11 @@ public: ~ReshardingWorker(); - /// Запустить поток выполняющий задачи перешардирования. + /// Start the thread which performs resharding jobs. void start(); + /// Stop the thread which performs resharding jobs. + /// If any job is in progress, put it on hold for further execution. void shutdown(); /// Прислать запрос на перешардирование. @@ -61,7 +63,7 @@ public: /// Создать новый координатор распределённой задачи. Вызывается с инициатора. std::string createCoordinator(const Cluster & cluster); - /// + /// Register a query into a coordinator. void registerQuery(const std::string & coordinator_id, const std::string & query); /// Удалить координатор. void deleteCoordinator(const std::string & coordinator_id); @@ -72,8 +74,10 @@ public: void unsubscribe(const std::string & coordinator_id); /// Увеличить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя. void addPartitions(const std::string & coordinator_id, const PartitionList & partition_list); - /// - ReshardingWorker::PartitionList::iterator categorizePartitions(const std::string & coordinator_id, ReshardingWorker::PartitionList & partition_list); + /// Rearrange partitions into two categories: coordinated job, uncoordinated job. + /// Returns an iterator to the beginning of the list of uncoordinated jobs. + ReshardingWorker::PartitionList::iterator categorizePartitions(const std::string & coordinator_id, + ReshardingWorker::PartitionList & partition_list); /// Получить количество партиций входящих в одну распределённую задачу. Вызывается с исполнителя. size_t getPartitionCount(const std::string & coordinator_id); /// Получить количество учавствующих узлов. @@ -83,12 +87,10 @@ public: /// Ждать завершение всех необходмых отмен подписей. void waitForOptOutCompletion(const std::string & coordinator_id, size_t count); - /// Установить статус заданной распределённой задачи. + /// Set the shard-independent status of a given coordinator. void setStatus(const std::string & coordinator_id, Status status); - /// + /// Set the status of a shard under a given coordinator. void setStatus(const std::string & coordinator_id, const std::string & hostname, Status status); - /// Получить статус заданной распределённой задачи. - Status getStatus(); zkutil::RWLock createDeletionLock(const std::string & coordinator_id); @@ -125,15 +127,17 @@ private: /// Удалить временные данные с локального узла и ZooKeeper'а. void softCleanup(); void hardCleanup(); - void cleanupCommon(); - /// Принудительно завершить поток. + /// Принудительно завершить поток, если выполнено условие. void abortPollingIfRequested(); void abortCoordinatorIfRequested(const std::string & coordinator_id); void abortRecoveryIfRequested(); void abortJobIfRequested(); + /// Get the current job-independent status of the coordinator. Status getCoordinatorStatus(const std::string & coordinator_id); + /// Get the status of the current distributed job. + Status getStatus(); /// Зарегистрировать задачу в соответствующий координатор. void attachJob(); @@ -144,27 +148,35 @@ private: size_t getPartitionCountUnlocked(const std::string & coordinator_id); - bool updateOfflineNodes(const std::string & coordinator_id); - bool updateOfflineNodes(); - bool updateOfflineNodesCommon(const std::string & path, const std::string & coordinator_id); - - Status getStatusCommon(const std::string & path, const std::string & coordinator_id); + /// Detect offline nodes under a given coordinator. + bool detectOfflineNodes(const std::string & coordinator_id); + /// Detect offline nodes under the current job. + bool detectOfflineNodes(); /// Функции, которые создают необходимые объекты для синхронизации /// распределённых задач. zkutil::RWLock createLock(); zkutil::RWLock createCoordinatorLock(const std::string & coordinator_id); - zkutil::SingleBarrier createCheckBarrier(const std::string & coordinator_id); zkutil::SingleBarrier createOptOutBarrier(const std::string & coordinator_id, size_t count); zkutil::SingleBarrier createRecoveryBarrier(const ReshardingJob & job); zkutil::SingleBarrier createUploadBarrier(const ReshardingJob & job); - std::string computeHash(const std::string & in); - + /// Get the ZooKeeper path of a given coordinator. std::string getCoordinatorPath(const std::string & coordinator_id) const; + /// Get the ZooKeeper path of a given job partition. std::string getPartitionPath(const ReshardingJob & job) const; + /// Common code for softCleanup() and hardCleanup(). + void cleanupCommon(); + /// Common code for detectOfflineNodes(). + bool detectOfflineNodesCommon(const std::string & path, const std::string & coordinator_id); + /// Common code for getStatus() and getCoordinatorStatus(). + Status getStatusCommon(const std::string & path, const std::string & coordinator_id); + + /// Compute the hash value of a given string. + std::string computeHash(const std::string & in); + private: ReshardingJob current_job; std::thread polling_thread; diff --git a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp index d9247738351..58ff90f0016 100644 --- a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp @@ -30,8 +30,11 @@ #include #include + #include #include +#include +#include namespace DB { @@ -97,7 +100,7 @@ private: } -/// Rationale for distributed jobs. +/// Rationale for distributed jobs: /// /// A distributed job is initiated in a query ALTER TABLE RESHARD inside which /// we specify a distributed table. Then ClickHouse creates a job coordinating @@ -107,6 +110,23 @@ private: /// receives one query ALTER TABLE RESHARD with the keyword COORDINATE WITH /// indicating the aforementioned coordinator id. /// +/// Locking notes: +/// +/// In order to avoid any deadlock situation, two approaches are implemented: +/// +/// 1. As long as a cluster is busy with a distributed job, we forbid clients +/// to submit any new distributed job on this cluster. For this purpose, clusters +/// are identified by the hash value of the ordered list of their hostnames and +/// ports. In the event that an initiator should identify a cluster node by means +/// of a local address, some additional checks are performed on shards themselves. +/// +/// 2. Also, the jobs that constitute a distributed job are submitted in identical +/// order on all the shards of a cluster. If one or more shards fail while performing +/// a distributed job, the latter is put on hold. Then no new jobs are performed +/// until the failing shards have come back online. +/// +/// ZooKeeper coordinator structure description: +/// /// At the highest level we have under the /resharding_distributed znode: /// /// /lock: global distributed read/write lock; @@ -582,6 +602,12 @@ void ReshardingWorker::publishShardedPartitions() { } + TaskInfo(const TaskInfo &) = delete; + TaskInfo & operator=(const TaskInfo &) = delete; + + TaskInfo(TaskInfo &&) = default; + TaskInfo & operator=(TaskInfo &&) = default; + std::string replica_path; ReplicatedMergeTreeAddress dest; std::string part; @@ -704,17 +730,24 @@ void ReshardingWorker::publishShardedPartitions() void ReshardingWorker::applyChanges() { + /// Note: since this method actually performs a distributed commit (i.e. it + /// attaches partitions on various shards), we should implement a two-phase + /// commit protocol in a future release in order to get even more safety + /// guarantees. + LOG_DEBUG(log, "Attaching new partitions."); auto & storage = *(current_job.storage); auto zookeeper = context.getZooKeeper(); - /// На локальном узле удалить первоначальную партицию. + /// Locally drop the initial partition. std::string query_str = "ALTER TABLE " + current_job.database_name + "." + current_job.table_name + " DROP PARTITION " + current_job.partition; (void) executeQuery(query_str, context, true); - /// На всех участвующих репликах добавить соответствующие шардированные партиции в таблицу. + /// On each participating shard, attach the corresponding sharded partition to the table. + + /// Description of a task on a replica. struct TaskInfo { TaskInfo(const std::string & replica_path_, const ReplicatedMergeTreeAddress & dest_) @@ -722,13 +755,52 @@ void ReshardingWorker::applyChanges() { } + TaskInfo(const TaskInfo &) = delete; + TaskInfo & operator=(const TaskInfo &) = delete; + + TaskInfo(TaskInfo &&) = default; + TaskInfo & operator=(TaskInfo &&) = default; + std::string replica_path; ReplicatedMergeTreeAddress dest; }; - using TaskInfoList = std::vector; + /// Description of tasks for each replica of a shard. + /// For fault tolerance purposes, some fields are provided + /// to perform attempts on more than one replica if needed. + struct ShardTaskInfo + { + ShardTaskInfo() + { + struct timespec times; + if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, ×)) + throwFromErrno("Cannot clock_gettime.", DB::ErrorCodes::CANNOT_CLOCK_GETTIME); + + (void) srand48_r(reinterpret_cast(this) ^ times.tv_nsec, &rand_state); + } + + ShardTaskInfo(const ShardTaskInfo &) = delete; + ShardTaskInfo & operator=(const ShardTaskInfo &) = delete; + + ShardTaskInfo(ShardTaskInfo &&) = default; + ShardTaskInfo & operator=(ShardTaskInfo &&) = default; + + /// one task for each replica + std::vector shard_tasks; + /// index to the replica to be used + size_t next = 0; + /// result of the operation on the current replica + bool is_success = false; + /// index to the corresponding thread pool entry + size_t pool_index; + drand48_data rand_state; + }; + + using TaskInfoList = std::vector; + TaskInfoList task_info_list; + /// Initialize all the possible tasks for each replica of each shard. for (const auto & entry : storage.data.per_shard_data_parts) { size_t shard_no = entry.first; @@ -739,55 +811,95 @@ void ReshardingWorker::applyChanges() const WeightedZooKeeperPath & weighted_path = current_job.paths[shard_no]; const std::string & zookeeper_path = weighted_path.first; + task_info_list.emplace_back(); + ShardTaskInfo & shard_task_info = task_info_list.back(); + auto children = zookeeper->getChildren(zookeeper_path + "/replicas"); for (const auto & child : children) { const std::string replica_path = zookeeper_path + "/replicas/" + child; auto host = zookeeper->get(replica_path + "/host"); ReplicatedMergeTreeAddress host_desc(host); - task_info_list.emplace_back(replica_path, host_desc); + + shard_task_info.shard_tasks.emplace_back(replica_path, host_desc); } } - boost::threadpool::pool pool(task_info_list.size()); - - using Tasks = std::vector >; - Tasks tasks(task_info_list.size()); - - try + /// Loop as long as there are ATTACH operations that need to be performed + /// on some shards and there remains at least one valid replica on each of + /// these shards. + size_t remaining_task_count = task_info_list.size(); + while (remaining_task_count > 0) { - for (size_t i = 0; i < task_info_list.size(); ++i) + boost::threadpool::pool pool(remaining_task_count); + + using Tasks = std::vector >; + Tasks tasks(remaining_task_count); + + try { - const auto & entry = task_info_list[i]; - const auto & replica_path = entry.replica_path; - const auto & dest = entry.dest; + size_t pool_index = 0; + for (auto & info : task_info_list) + { + if (info.is_success) + { + /// We have already successfully performed the operation on this shard. + continue; + } - InterserverIOEndpointLocation location(replica_path, dest.host, dest.replication_port); + /// Randomly choose a replica on which to perform the operation. + long int rand_res; + (void) lrand48_r(&info.rand_state, &rand_res); + size_t current = info.next + rand_res % (info.shard_tasks.size() - info.next); + std::swap(info.shard_tasks[info.next], info.shard_tasks[current]); + ++info.next; - std::string query_str = "ALTER TABLE " + dest.database + "." - + dest.table + " ATTACH PARTITION " + current_job.partition; + info.pool_index = pool_index; - tasks[i] = Tasks::value_type(std::bind(&RemoteQueryExecutor::Client::executeQuery, - &storage.remote_query_executor_client, location, query_str)); + TaskInfo & cur_task_info = info.shard_tasks[info.next - 1]; - pool.schedule([i, &tasks]{ tasks[i](); }); + const auto & replica_path = cur_task_info.replica_path; + const auto & dest = cur_task_info.dest; + + /// Create and register the task. + + InterserverIOEndpointLocation location(replica_path, dest.host, dest.replication_port); + + std::string query_str = "ALTER TABLE " + dest.database + "." + + dest.table + " ATTACH PARTITION " + current_job.partition; + + tasks[pool_index] = Tasks::value_type(std::bind(&RemoteQueryExecutor::Client::executeQuery, + &storage.remote_query_executor_client, location, query_str)); + + pool.schedule([pool_index, &tasks]{ tasks[pool_index](); }); + + /// Allocate an entry for the next task. + ++pool_index; + } } - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); + catch (...) + { + pool.wait(); + throw; + } + pool.wait(); - throw; - } - pool.wait(); + for (auto & info : task_info_list) + { + if (info.is_success) + continue; - for (auto & task : tasks) - { - bool res = task.get_future().get(); - if (!res) - throw Exception("Failed to attach partition on replica", - ErrorCodes::PARTITION_ATTACH_FAILED); + info.is_success = tasks[info.pool_index].get_future().get(); + if (info.is_success) + --remaining_task_count; + else if (info.next == info.shard_tasks.size()) + { + /// No more attempts are possible. + throw Exception("Failed to attach partition on shard", + ErrorCodes::PARTITION_ATTACH_FAILED); + } + } } } @@ -863,8 +975,7 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster) "", zkutil::CreateMode::Persistent); (void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/status", - "", zkutil::CreateMode::Persistent); - setStatus(coordinator_id, STATUS_OK); + toString(static_cast(STATUS_OK)), zkutil::CreateMode::Persistent); (void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/partitions", "", zkutil::CreateMode::Persistent); @@ -995,9 +1106,7 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std throw zkutil::KeeperException(code); zookeeper->create(getCoordinatorPath(coordinator_id) + "/status/" - + current_host, "", zkutil::CreateMode::Persistent); - - setStatus(coordinator_id, current_host, STATUS_OK); + + current_host, toString(static_cast(STATUS_OK)), zkutil::CreateMode::Persistent); /// Assign a unique block number to the current node. We will use it in order /// to avoid any possible conflict when uploading resharded partitions. @@ -1159,17 +1268,17 @@ void ReshardingWorker::setStatus(const std::string & coordinator_id, const std:: toString(static_cast(status))); } -bool ReshardingWorker::updateOfflineNodes(const std::string & coordinator_id) +bool ReshardingWorker::detectOfflineNodes(const std::string & coordinator_id) { - return updateOfflineNodesCommon(getCoordinatorPath(coordinator_id) + "/status", coordinator_id); + return detectOfflineNodesCommon(getCoordinatorPath(coordinator_id) + "/status", coordinator_id); } -bool ReshardingWorker::updateOfflineNodes() +bool ReshardingWorker::detectOfflineNodes() { - return updateOfflineNodesCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id); + return detectOfflineNodesCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id); } -bool ReshardingWorker::updateOfflineNodesCommon(const std::string & path, const std::string & coordinator_id) +bool ReshardingWorker::detectOfflineNodesCommon(const std::string & path, const std::string & coordinator_id) { auto zookeeper = context.getZooKeeper(); @@ -1215,7 +1324,7 @@ ReshardingWorker::Status ReshardingWorker::getStatusCommon(const std::string & p if (coordinator_status != STATUS_OK) return static_cast(coordinator_status); - (void) updateOfflineNodesCommon(path, coordinator_id); + (void) detectOfflineNodesCommon(path, coordinator_id); auto nodes = zookeeper->getChildren(path); @@ -1341,7 +1450,7 @@ void ReshardingWorker::abortRecoveryIfRequested() try { - has_offline_nodes = updateOfflineNodes(); + has_offline_nodes = detectOfflineNodes(); must_abort = must_stop || has_offline_nodes; } catch (...) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 6a55c70a90d..8fa0219d2b2 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3761,6 +3761,12 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths { } + TaskInfo(const TaskInfo &) = delete; + TaskInfo & operator=(const TaskInfo &) = delete; + + TaskInfo(TaskInfo &&) = default; + TaskInfo & operator=(TaskInfo &&) = default; + std::string replica_path; ReplicatedMergeTreeAddress address; };