diff --git a/dbms/include/DB/Interpreters/Cluster.h b/dbms/include/DB/Interpreters/Cluster.h index feff6801689..f36db1d4162 100644 --- a/dbms/include/DB/Interpreters/Cluster.h +++ b/dbms/include/DB/Interpreters/Cluster.h @@ -16,7 +16,7 @@ namespace DB class Cluster { public: - Cluster(const Settings & settings, const String & cluster_short_name, const String & cluster_name); + Cluster(const Settings & settings, const String & cluster_name); /// Построить кластер по именам шардов и реплик. Локальные обрабатываются так же как удаленные. Cluster(const Settings & settings, std::vector> names, @@ -100,8 +100,13 @@ public: private: void initMisc(); + /// Create a unique name based on the list of addresses and ports. + /// We need it in order to be able to perform resharding requests + /// on tables that have the distributed engine. + void assignName(); + private: - /// Название кластера, если существует. + /// Название кластера. String name; /// Описание шардов кластера. ShardsInfo shards_info; diff --git a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h index 46c18290022..2bc8ee86b3c 100644 --- a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h +++ b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h @@ -129,11 +129,11 @@ private: /// Принудительно завершить поток. void abortPollingIfRequested(); - void abortCoordinatorIfRequested(); + void abortCoordinatorIfRequested(const std::string & coordinator_id); void abortRecoveryIfRequested(); void abortJobIfRequested(); - Status getCoordinatorStatus(); + Status getCoordinatorStatus(const std::string & coordinator_id); /// Зарегистрировать задачу в соответствующий координатор. void attachJob(); @@ -148,7 +148,7 @@ private: /// Функции, которые создают необходимые объекты для синхронизации /// распределённых задач. zkutil::RWLock createLock(); - zkutil::RWLock createCoordinatorLock(); + zkutil::RWLock createCoordinatorLock(const std::string & coordinator_id); zkutil::Barrier createCheckBarrier(const std::string & coordinator_id); zkutil::SingleBarrier createOptOutBarrier(const std::string & coordinator_id, size_t count); zkutil::SingleBarrier createRecoveryBarrier(const ReshardingJob & job); @@ -168,7 +168,6 @@ private: std::string distributed_online_path; std::string distributed_lock_path; std::string coordination_path; - std::string current_coordinator_id; Context & context; Logger * log; diff --git a/dbms/src/Core/ErrorCodes.cpp b/dbms/src/Core/ErrorCodes.cpp index 996131ba5fa..49dd24dc2f3 100644 --- a/dbms/src/Core/ErrorCodes.cpp +++ b/dbms/src/Core/ErrorCodes.cpp @@ -326,17 +326,18 @@ namespace ErrorCodes extern const int DUPLICATE_SHARD_PATHS = 320; extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE = 321; extern const int RESHARDING_BUSY_CLUSTER = 322; - extern const int RESHARDING_NO_SUCH_COORDINATOR = 323; - extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP = 324; - extern const int RESHARDING_ALREADY_SUBSCRIBED = 325; - extern const int RESHARDING_REMOTE_NODE_UNAVAILABLE = 326; - extern const int RESHARDING_REMOTE_NODE_ERROR = 327; - extern const int RESHARDING_COORDINATOR_DELETED = 328; - extern const int RESHARDING_DISTRIBUTED_JOB_ON_HOLD = 329; - extern const int RESHARDING_INVALID_QUERY = 330; - extern const int RESHARDING_INITIATOR_CHECK_FAILED = 331; - extern const int RWLOCK_ALREADY_HELD = 332; - extern const int BARRIER_TIMEOUT = 333; + extern const int RESHARDING_BUSY_SHARD = 323; + extern const int RESHARDING_NO_SUCH_COORDINATOR = 324; + extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP = 325; + extern const int RESHARDING_ALREADY_SUBSCRIBED = 326; + extern const int RESHARDING_REMOTE_NODE_UNAVAILABLE = 327; + extern const int RESHARDING_REMOTE_NODE_ERROR = 328; + extern const int RESHARDING_COORDINATOR_DELETED = 329; + extern const int RESHARDING_DISTRIBUTED_JOB_ON_HOLD = 330; + extern const int RESHARDING_INVALID_QUERY = 331; + extern const int RESHARDING_INITIATOR_CHECK_FAILED = 332; + extern const int RWLOCK_ALREADY_HELD = 333; + extern const int BARRIER_TIMEOUT = 334; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 4cda790c777..6fdfaf90ceb 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -119,13 +119,12 @@ Clusters::Clusters(const Settings & settings, const String & config_name) for (const auto & key : config_keys) impl.emplace(std::piecewise_construct, std::forward_as_tuple(key), - std::forward_as_tuple(settings, key, config_name + "." + key)); + std::forward_as_tuple(settings, config_name + "." + key)); } /// Реализация класса Cluster -Cluster::Cluster(const Settings & settings, const String & cluster_short_name, const String & cluster_name) - : name(cluster_short_name) +Cluster::Cluster(const Settings & settings, const String & cluster_name) { Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config(); Poco::Util::AbstractConfiguration::Keys config_keys; @@ -304,43 +303,6 @@ Cluster::Cluster(const Settings & settings, std::vector> nam ++current_shard_num; } - /// Create a unique name based on the list of addresses. - /// We need it in order to be able to perform resharding requests - /// with the remote table function. - std::vector elements; - for (const auto & address : addresses) - { - elements.push_back(address.host_name); - elements.push_back(address.resolved_address.host().toString()); - } - - for (const auto & addresses : addresses_with_failover) - { - for (const auto & address : addresses) - { - elements.push_back(address.host_name); - elements.push_back(address.resolved_address.host().toString()); - } - } - - std::sort(elements.begin(), elements.end()); - - unsigned char hash[SHA512_DIGEST_LENGTH]; - - SHA512_CTX ctx; - SHA512_Init(&ctx); - - for (const auto & host : elements) - SHA512_Update(&ctx, reinterpret_cast(host.data()), host.size()); - - SHA512_Final(hash, &ctx); - - { - WriteBufferFromString buf(name); - HexWriteBuffer hex_buf(buf); - hex_buf.write(reinterpret_cast(hash), sizeof(hash)); - } - initMisc(); } @@ -379,6 +341,48 @@ void Cluster::initMisc() break; } } + + assignName(); +} + + +void Cluster::assignName() +{ + std::vector elements; + + if (!addresses.empty()) + { + for (const auto & address : addresses) + elements.push_back(address.host_name + ":" + toString(address.port)); + } + else if (!addresses_with_failover.empty()) + { + for (const auto & addresses : addresses_with_failover) + { + for (const auto & address : addresses) + elements.push_back(address.host_name + ":" + toString(address.port)); + } + } + else + throw Exception("Cluster: ill-formed cluster", ErrorCodes::LOGICAL_ERROR); + + std::sort(elements.begin(), elements.end()); + + unsigned char hash[SHA512_DIGEST_LENGTH]; + + SHA512_CTX ctx; + SHA512_Init(&ctx); + + for (const auto & host : elements) + SHA512_Update(&ctx, reinterpret_cast(host.data()), host.size()); + + SHA512_Final(hash, &ctx); + + { + WriteBufferFromString buf(name); + HexWriteBuffer hex_buf(buf); + hex_buf.write(reinterpret_cast(hash), sizeof(hash)); + } } } diff --git a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp index 6545c760a57..66ab46c2f43 100644 --- a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp @@ -45,6 +45,7 @@ namespace ErrorCodes extern const int UNKNOWN_ELEMENT_IN_CONFIG; extern const int INVALID_CONFIG_PARAMETER; extern const int RESHARDING_BUSY_CLUSTER; + extern const int RESHARDING_BUSY_SHARD; extern const int RESHARDING_NO_SUCH_COORDINATOR; extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP; extern const int RESHARDING_ALREADY_SUBSCRIBED; @@ -133,6 +134,8 @@ private: /// representations, of all the nodes of the cluster; used to check if a given node /// is a member of the cluster; /// +/// /shards: the list of shards that have subscribed; +/// /// /check_barrier: when all the participating nodes have checked /// that they can perform resharding operations, proceed further; /// @@ -803,7 +806,7 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster) { auto effective_cluster_name = zookeeper->get(coordination_path + "/" + coordinator + "/cluster"); if (effective_cluster_name == cluster_name) - throw Exception("The cluster " + cluster_name + " is currently busy with another " + throw Exception("The cluster specified for this table is currently busy with another " "distributed job. Please try later", ErrorCodes::RESHARDING_BUSY_CLUSTER); } @@ -824,6 +827,9 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster) toString(cluster.getRemoteShardCount() + cluster.getLocalShardCount()), zkutil::CreateMode::Persistent); + (void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/shards", + "", zkutil::CreateMode::Persistent); + (void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/status", "", zkutil::CreateMode::Persistent); setStatus(coordinator_id, STATUS_OK); @@ -836,31 +842,39 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster) (void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/cluster_addresses", "", zkutil::CreateMode::Persistent); - std::vector cluster_addresses; - const auto & addresses = cluster.getShardsAddresses(); - for (const auto & address : addresses) - { - cluster_addresses.push_back(address.host_name); - cluster_addresses.push_back(address.resolved_address.host().toString()); - } - - const auto & addresses_with_failover = cluster.getShardsWithFailoverAddresses(); - for (const auto & addresses : addresses_with_failover) - { - for (const auto & address : addresses) - { - cluster_addresses.push_back(address.host_name); - cluster_addresses.push_back(address.resolved_address.host().toString()); - } - } - - for (const auto & host : cluster_addresses) + auto publish_address = [&](const std::string & host, size_t shard_no) { int32_t code = zookeeper->tryCreate(getCoordinatorPath(coordinator_id) + "/cluster_addresses/" - + host, "", zkutil::CreateMode::Persistent); + + host, toString(shard_no), zkutil::CreateMode::Persistent); if ((code != ZOK) && (code != ZNODEEXISTS)) throw zkutil::KeeperException(code); + }; + + if (!cluster.getShardsAddresses().empty()) + { + size_t shard_no = 0; + for (const auto & address : cluster.getShardsAddresses()) + { + publish_address(address.host_name, shard_no); + publish_address(address.resolved_address.host().toString(), shard_no); + ++shard_no; + } } + else if (!cluster.getShardsWithFailoverAddresses().empty()) + { + size_t shard_no = 0; + for (const auto & addresses : cluster.getShardsWithFailoverAddresses()) + { + for (const auto & address : addresses) + { + publish_address(address.host_name, shard_no); + publish_address(address.resolved_address.host().toString(), shard_no); + } + ++shard_no; + } + } + else + throw Exception("ReshardingWorker: ill-formed cluster", ErrorCodes::LOGICAL_ERROR); return coordinator_id; } @@ -883,21 +897,39 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std { auto zookeeper = context.getZooKeeper(); - current_coordinator_id = coordinator_id; + if (!zookeeper->exists(getCoordinatorPath(coordinator_id))) + throw Exception("Coordinator " + coordinator_id + " does not exist", + ErrorCodes::RESHARDING_NO_SUCH_COORDINATOR); + + auto current_host = getFQDNOrHostName(); + + /// Make sure that this shard is not busy in another distributed job. + { + auto lock = createLock(); + zkutil::RWLock::Guard guard{lock}; + + auto coordinators = zookeeper->getChildren(coordination_path); + for (const auto & coordinator : coordinators) + { + if (coordinator == coordinator_id) + continue; + + auto cluster_addresses = zookeeper->getChildren(coordination_path + "/" + coordinator + + "/cluster_addresses"); + if (std::find(cluster_addresses.begin(), cluster_addresses.end(), current_host) + != cluster_addresses.end()) + throw Exception("This shard is already busy with another distributed job", + ErrorCodes::RESHARDING_BUSY_SHARD); + } + } { - auto lock = createCoordinatorLock(); - zkutil::RWLock::Guard guard{lock}; + auto lock = createCoordinatorLock(coordinator_id); + zkutil::RWLock::Guard guard{lock}; /// Make sure that the query ALTER TABLE RESHARD with the "COORDINATE WITH" tag /// is not bogus. - if (!zookeeper->exists(getCoordinatorPath(coordinator_id))) - throw Exception("Coordinator " + coordinator_id + " does not exist", - ErrorCodes::RESHARDING_NO_SUCH_COORDINATOR); - - auto current_host = getFQDNOrHostName(); - auto cluster_addresses = zookeeper->getChildren(getCoordinatorPath(coordinator_id) + "/cluster_addresses"); if (std::find(cluster_addresses.begin(), cluster_addresses.end(), current_host) @@ -906,19 +938,26 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std + coordinator_id, ErrorCodes::RESHARDING_NO_COORDINATOR_MEMBERSHIP); - int32_t code = zookeeper->tryCreate(getCoordinatorPath(coordinator_id) + "/status/" + current_host, - "", zkutil::CreateMode::Persistent); - if (code == ZNODEEXISTS) - throw Exception("Already subscribed to coordinator " + coordinator_id, - ErrorCodes::RESHARDING_ALREADY_SUBSCRIBED); - else if (code != ZOK) - throw zkutil::KeeperException(code); - + /// Check that the coordinator recognizes our query. auto query_hash = zookeeper->get(getCoordinatorPath(coordinator_id) + "/query_hash"); if (computeHash(query) != query_hash) throw Exception("Coordinator " + coordinator_id + " does not handle this query", ErrorCodes::RESHARDING_INVALID_QUERY); + /// Access granted. Now perform subscription. + auto my_shard_no = zookeeper->get(getCoordinatorPath(coordinator_id) + "/cluster_addresses/" + + current_host); + int32_t code = zookeeper->tryCreate(getCoordinatorPath(coordinator_id) + "/shards/" + + toString(my_shard_no), "", zkutil::CreateMode::Persistent); + if (code == ZNODEEXISTS) + throw Exception("This shard has already subscribed to coordinator " + coordinator_id, + ErrorCodes::RESHARDING_ALREADY_SUBSCRIBED); + else if (code != ZOK) + throw zkutil::KeeperException(code); + + zookeeper->create(getCoordinatorPath(coordinator_id) + "/status/" + + current_host, "", zkutil::CreateMode::Persistent); + setStatus(coordinator_id, current_host, STATUS_OK); } @@ -926,7 +965,7 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std /// to avoid any possible conflict when uploading resharded partitions. UInt64 block_number; { - auto lock = createCoordinatorLock(); + auto lock = createCoordinatorLock(coordinator_id); zkutil::RWLock::Guard guard{lock}; auto current_block_number = zookeeper->get(getCoordinatorPath(coordinator_id) + "/increment"); @@ -945,9 +984,13 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std void ReshardingWorker::unsubscribe(const std::string & coordinator_id) { + /// Note: we don't remove this shard from the /shards znode because + /// it can subscribe to a distributed job only if its cluster is not + /// currently busy with any distributed job. + auto zookeeper = context.getZooKeeper(); - auto lock = createCoordinatorLock(); + auto lock = createCoordinatorLock(coordinator_id); zkutil::RWLock::Guard guard{lock}; auto current_host = getFQDNOrHostName(); @@ -965,7 +1008,7 @@ void ReshardingWorker::addPartitions(const std::string & coordinator_id, { auto zookeeper = context.getZooKeeper(); - auto lock = createCoordinatorLock(); + auto lock = createCoordinatorLock(coordinator_id); zkutil::RWLock::Guard guard{lock}; auto current_host = getFQDNOrHostName(); @@ -987,7 +1030,7 @@ ReshardingWorker::PartitionList::iterator ReshardingWorker::categorizePartitions PartitionList uncoordinated_partition_list; { - auto lock = createCoordinatorLock(); + auto lock = createCoordinatorLock(coordinator_id); zkutil::RWLock::Guard guard{lock}; auto current_host = getFQDNOrHostName(); @@ -1023,7 +1066,7 @@ ReshardingWorker::PartitionList::iterator ReshardingWorker::categorizePartitions size_t ReshardingWorker::getPartitionCount(const std::string & coordinator_id) { - auto lock = createCoordinatorLock(); + auto lock = createCoordinatorLock(coordinator_id); zkutil::RWLock::Guard guard{lock}; return getPartitionCountUnlocked(coordinator_id); @@ -1037,7 +1080,7 @@ size_t ReshardingWorker::getPartitionCountUnlocked(const std::string & coordinat size_t ReshardingWorker::getNodeCount(const std::string & coordinator_id) { - auto lock = createCoordinatorLock(); + auto lock = createCoordinatorLock(coordinator_id); zkutil::RWLock::Guard guard{lock}; auto zookeeper = context.getZooKeeper(); @@ -1069,11 +1112,11 @@ void ReshardingWorker::setStatus(const std::string & coordinator_id, const std:: toString(static_cast(status))); } -ReshardingWorker::Status ReshardingWorker::getCoordinatorStatus() +ReshardingWorker::Status ReshardingWorker::getCoordinatorStatus(const std::string & coordinator_id) { auto zookeeper = context.getZooKeeper(); - auto status_str = zookeeper->get(getCoordinatorPath(current_coordinator_id) + "/status"); + auto status_str = zookeeper->get(getCoordinatorPath(coordinator_id) + "/status"); return static_cast(std::stoull(status_str)); } @@ -1146,8 +1189,6 @@ void ReshardingWorker::attachJob() if (!current_job.isCoordinated()) return; - current_coordinator_id = current_job.coordinator_id; - auto zookeeper = context.getZooKeeper(); /// Check if the corresponding coordinator exists. If it doesn't, throw an exception, @@ -1202,7 +1243,7 @@ void ReshardingWorker::detachJob() bool delete_coordinator = false; { - auto lock = createCoordinatorLock(); + auto lock = createCoordinatorLock(current_job.coordinator_id); zkutil::RWLock::Guard guard{lock}; auto children = zookeeper->getChildren(getPartitionPath(current_job) + "/nodes"); @@ -1242,13 +1283,13 @@ void ReshardingWorker::abortPollingIfRequested() throw Exception("Cancelled resharding", ErrorCodes::ABORTED); } -void ReshardingWorker::abortCoordinatorIfRequested() +void ReshardingWorker::abortCoordinatorIfRequested(const std::string & coordinator_id) { bool must_abort; try { - must_abort = must_stop || (getCoordinatorStatus() != STATUS_OK); + must_abort = must_stop || (getCoordinatorStatus(coordinator_id) != STATUS_OK); } catch (...) { @@ -1347,13 +1388,14 @@ zkutil::RWLock ReshardingWorker::createLock() return lock; } -zkutil::RWLock ReshardingWorker::createCoordinatorLock() +zkutil::RWLock ReshardingWorker::createCoordinatorLock(const std::string & coordinator_id) { auto zookeeper = context.getZooKeeper(); - zkutil::RWLock lock(zookeeper, getCoordinatorPath(current_coordinator_id) + "/lock"); + zkutil::RWLock lock(zookeeper, getCoordinatorPath(coordinator_id) + "/lock"); - zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this); + zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this, + coordinator_id); lock.setCancellationHook(hook); return lock; @@ -1367,7 +1409,9 @@ zkutil::Barrier ReshardingWorker::createCheckBarrier(const std::string & coordin zkutil::Barrier check_barrier{zookeeper, getCoordinatorPath(coordinator_id) + "/check_barrier", std::stoull(node_count)}; - zkutil::Barrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this); + zkutil::Barrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this, + coordinator_id + ); check_barrier.setCancellationHook(hook); return check_barrier; @@ -1381,7 +1425,8 @@ zkutil::SingleBarrier ReshardingWorker::createOptOutBarrier(const std::string & zkutil::SingleBarrier opt_out_barrier{zookeeper, getCoordinatorPath(coordinator_id) + "/opt_out_barrier", count}; - zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this); + zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this, + coordinator_id); opt_out_barrier.setCancellationHook(hook); return opt_out_barrier; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 9ec0fd8de0c..a22cdecdebd 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3463,11 +3463,11 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & if (!resharding_worker.isStarted()) throw Exception("Resharding background thread is not running", ErrorCodes::RESHARDING_NO_WORKER); - bool is_coordinated = !coordinator.isNull(); + bool has_coordinator = !coordinator.isNull(); std::string coordinator_id; UInt64 block_number = 0; - if (is_coordinated) + if (has_coordinator) { coordinator_id = coordinator.get(); block_number = resharding_worker.subscribe(coordinator_id, queryToString(query)); @@ -3533,7 +3533,7 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & if (partition_list.empty()) { - if (!is_coordinated) + if (!has_coordinator) throw Exception("No existing partition found", ErrorCodes::PARTITION_DOESNT_EXIST); } else @@ -3552,7 +3552,7 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & } } - if (is_coordinated) + if (has_coordinator) { size_t old_node_count = resharding_worker.getNodeCount(coordinator_id); resharding_worker.addPartitions(coordinator_id, partition_list); @@ -3596,7 +3596,7 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & if (uncoordinated_begin == partition_list.cbegin()) { coordinator_id.clear(); - is_coordinated = false; + has_coordinator = false; } } else @@ -3635,7 +3635,7 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & } catch (const Exception & ex) { - if (is_coordinated) + if (has_coordinator) { if ((ex.code() == ErrorCodes::RESHARDING_NO_SUCH_COORDINATOR) || (ex.code() == ErrorCodes::RESHARDING_NO_COORDINATOR_MEMBERSHIP) || @@ -3665,7 +3665,7 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & } catch (...) { - if (is_coordinated) + if (has_coordinator) { try {