From dd867b787f0de6d6d7dca46a6bcf451990ceed6d Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 3 Sep 2020 01:35:47 +0300 Subject: [PATCH] Allow parallel execution of distributed DDL Add distributed_ddl.pool_size to control maximum parallel to handle distributed DDL. Also: - convert Exception constructors to fmt-like - use sleepFor* over std::this_thread::sleep_for() --- programs/server/Server.cpp | 5 +- programs/server/config.xml | 3 + src/Interpreters/DDLWorker.cpp | 259 +++++++++++++++++---------------- src/Interpreters/DDLWorker.h | 31 ++-- 4 files changed, 162 insertions(+), 136 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index f24ba444203..e4fd351f091 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -708,7 +708,10 @@ int Server::main(const std::vector & /*args*/) { /// DDL worker should be started after all tables were loaded String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/"); - global_context->setDDLWorker(std::make_unique(ddl_zookeeper_path, *global_context, &config(), "distributed_ddl")); + int pool_size = config().getInt("distributed_ddl.pool_size", 1); + if (pool_size < 1) + throw Exception("distributed_ddl.pool_size should be greater then 0", ErrorCodes::ARGUMENT_OUT_OF_BOUND); + global_context->setDDLWorker(std::make_unique(pool_size, ddl_zookeeper_path, *global_context, &config(), "distributed_ddl")); } std::unique_ptr dns_cache_updater; diff --git a/programs/server/config.xml b/programs/server/config.xml index af01e880dc2..d13978f9ee8 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -615,6 +615,9 @@ + + + diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index b9b52e2f3fe..526f15d921f 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -38,10 +37,11 @@ #include #include #include +#include #include +#include #include #include -#include namespace DB @@ -144,7 +144,7 @@ struct DDLLogEntry rb >> "version: " >> version >> "\n"; if (version != CURRENT_VERSION) - throw Exception("Unknown DDLLogEntry format version: " + DB::toString(version), ErrorCodes::UNKNOWN_FORMAT_VERSION); + throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}", version); Strings host_id_strings; rb >> "query: " >> escape >> query >> "\n"; @@ -308,9 +308,14 @@ static bool isSupportedAlterType(int type) } -DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix) - : context(context_), log(&Poco::Logger::get("DDLWorker")) +DDLWorker::DDLWorker(int pool_size_, const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix) + : context(context_) + , log(&Poco::Logger::get("DDLWorker")) + , pool_size(pool_size_) + , worker_pool(pool_size_) { + last_tasks.reserve(pool_size); + queue_dir = zk_root_dir; if (queue_dir.back() == '/') queue_dir.resize(queue_dir.size() - 1); @@ -343,6 +348,7 @@ DDLWorker::~DDLWorker() stop_flag = true; queue_updated_event->set(); cleanup_event->set(); + worker_pool.wait(); main_thread.join(); cleanup_thread.join(); } @@ -364,8 +370,27 @@ DDLWorker::ZooKeeperPtr DDLWorker::getAndSetZooKeeper() return current_zookeeper; } +void DDLWorker::recoverZooKeeper() +{ + LOG_DEBUG(log, "Recovering ZooKeeper session after: {}", getCurrentExceptionMessage(false)); -bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) + while (!stop_flag) + { + try + { + getAndSetZooKeeper(); + break; + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + sleepForSeconds(5); + } + } +} + + +DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) { String node_data; String entry_path = queue_dir + "/" + entry_name; @@ -374,7 +399,7 @@ bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, { /// It is Ok that node could be deleted just now. It means that there are no current host in node's host list. out_reason = "The task was deleted"; - return false; + return {}; } auto task = std::make_unique(); @@ -405,7 +430,7 @@ bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, } out_reason = "Incorrect task format"; - return false; + return {}; } bool host_in_hostlist = false; @@ -433,12 +458,13 @@ bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, } } - if (host_in_hostlist) - current_task = std::move(task); - else + if (!host_in_hostlist) + { out_reason = "There is no a local address in host list"; + return {}; + } - return host_in_hostlist; + return task; } @@ -448,10 +474,9 @@ static void filterAndSortQueueNodes(Strings & all_nodes) std::sort(all_nodes.begin(), all_nodes.end()); } - -void DDLWorker::processTasks() +void DDLWorker::scheduleTasks() { - LOG_DEBUG(log, "Processing tasks"); + LOG_DEBUG(log, "Scheduling tasks"); auto zookeeper = tryGetZooKeeper(); Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event); @@ -459,86 +484,60 @@ void DDLWorker::processTasks() if (queue_nodes.empty()) return; - bool server_startup = last_processed_task_name.empty(); + bool server_startup = last_tasks.empty(); auto begin_node = server_startup ? queue_nodes.begin() - : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_processed_task_name); + : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_tasks.back()); for (auto it = begin_node; it != queue_nodes.end(); ++it) { String entry_name = *it; - if (current_task) + String reason; + auto task = initAndCheckTask(entry_name, reason, zookeeper); + if (!task) { - if (current_task->entry_name == entry_name) - { - LOG_INFO(log, "Trying to process task {} again", entry_name); - } - else - { - LOG_INFO(log, "Task {} was deleted from ZooKeeper before current host committed it", current_task->entry_name); - current_task = nullptr; - } + LOG_DEBUG(log, "Will not execute task {}: {}", entry_name, reason); + saveTask(entry_name); + continue; } - if (!current_task) + bool already_processed = zookeeper->exists(task->entry_path + "/finished/" + task->host_id_str); + if (!server_startup && !task->was_executed && already_processed) { - String reason; - if (!initAndCheckTask(entry_name, reason, zookeeper)) - { - LOG_DEBUG(log, "Will not execute task {}: {}", entry_name, reason); - last_processed_task_name = entry_name; - continue; - } - } - - DDLTask & task = *current_task; - - bool already_processed = zookeeper->exists(task.entry_path + "/finished/" + task.host_id_str); - if (!server_startup && !task.was_executed && already_processed) - { - throw Exception( - "Server expects that DDL task " + task.entry_name + " should be processed, but it was already processed according to ZK", - ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Server expects that DDL task {} should be processed, but it was already processed according to ZK", + entry_name); } if (!already_processed) { - try + worker_pool.scheduleOrThrowOnError([this, task_ptr = task.release()]() { - processTask(task, zookeeper); - } - catch (const Coordination::Exception & e) - { - if (server_startup && e.code == Coordination::Error::ZNONODE) - { - LOG_WARNING(log, "ZooKeeper NONODE error during startup. Ignoring entry {} ({}) : {}", task.entry_name, task.entry.query, getCurrentExceptionMessage(true)); - } - else - { - throw; - } - } - catch (...) - { - LOG_WARNING(log, "An error occurred while processing task {} ({}) : {}", task.entry_name, task.entry.query, getCurrentExceptionMessage(true)); - throw; - } + enqueueTask(DDLTaskPtr(task_ptr)); + }); } else { - LOG_DEBUG(log, "Task {} ({}) has been already processed", task.entry_name, task.entry.query); + LOG_DEBUG(log, "Task {} ({}) has been already processed", entry_name, task->entry.query); } - last_processed_task_name = task.entry_name; - current_task.reset(); + saveTask(entry_name); if (stop_flag) break; } } +void DDLWorker::saveTask(const String & entry_name) +{ + if (last_tasks.size() == pool_size) + { + last_tasks.erase(last_tasks.begin()); + } + last_tasks.emplace_back(entry_name); +} /// Parses query and resolves cluster and host in cluster void DDLWorker::parseQueryAndResolveHost(DDLTask & task) @@ -559,10 +558,9 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task) task.cluster_name = task.query_on_cluster->cluster; task.cluster = context.tryGetCluster(task.cluster_name); if (!task.cluster) - { - throw Exception("DDL task " + task.entry_name + " contains current host " + task.host_id.readableString() - + " in cluster " + task.cluster_name + ", but there are no such cluster here.", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION); - } + throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, + "DDL task {} contains current host {} in cluster {}, but there are no such cluster here.", + task.entry_name, task.host_id.readableString(), task.cluster_name); /// Try to find host from task host list in cluster /// At the first, try find exact match (host name and ports should be literally equal) @@ -583,10 +581,9 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task) { if (default_database == address.default_database) { - throw Exception( - "There are two exactly the same ClickHouse instances " + address.readableString() + " in cluster " - + task.cluster_name, - ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION); + throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, + "There are two exactly the same ClickHouse instances {} in cluster {}", + address.readableString(), task.cluster_name); } else { @@ -600,9 +597,8 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task) auto * query_with_table = dynamic_cast(task.query.get()); if (!query_with_table || query_with_table->database.empty()) { - throw Exception( - "For a distributed DDL on circular replicated cluster its table name must be qualified by database name.", - ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION); + throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, + "For a distributed DDL on circular replicated cluster its table name must be qualified by database name."); } if (default_database == query_with_table->database) return; @@ -635,8 +631,9 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task) { if (found_via_resolving) { - throw Exception("There are two the same ClickHouse instances in cluster " + task.cluster_name + " : " - + task.address_in_cluster.readableString() + " and " + address.readableString(), ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION); + throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, + "There are two the same ClickHouse instances in cluster {} : {} and {}", + task.cluster_name, task.address_in_cluster.readableString(), address.readableString()); } else { @@ -651,8 +648,9 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task) if (!found_via_resolving) { - throw Exception("Not found host " + task.host_id.readableString() + " in definition of cluster " + task.cluster_name, - ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION); + throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, + "Not found host {} in definition of cluster {}", + task.host_id.readableString(), task.cluster_name); } else { @@ -673,7 +671,7 @@ bool DDLWorker::tryExecuteQuery(const String & query, const DDLTask & task, Exec try { - current_context = std::make_unique(context); + auto current_context = std::make_unique(context); current_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; current_context->setCurrentQueryId(""); // generate random query_id executeQuery(istr, ostr, false, *current_context, {}); @@ -707,8 +705,44 @@ void DDLWorker::attachToThreadGroup() } -void DDLWorker::processTask(DDLTask & task, const ZooKeeperPtr & zookeeper) +void DDLWorker::enqueueTask(DDLTaskPtr task_ptr) { + auto & task = *task_ptr; + + while (!stop_flag) + { + try + { + processTask(task); + return; + } + catch (const Coordination::Exception & e) + { + if (Coordination::isHardwareError(e.code)) + { + recoverZooKeeper(); + } + else if (e.code == Coordination::Error::ZNONODE) + { + LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true)); + // TODO: retry? + } + else + { + LOG_ERROR(log, "Unexpected ZooKeeper error: {}.", getCurrentExceptionMessage(true)); + return; + } + } + catch (...) + { + LOG_WARNING(log, "An error occurred while processing task {} ({}) : {}", task.entry_name, task.entry.query, getCurrentExceptionMessage(true)); + } + } +} +void DDLWorker::processTask(DDLTask & task) +{ + auto zookeeper = tryGetZooKeeper(); + LOG_DEBUG(log, "Processing task {} ({})", task.entry_name, task.entry.query); String dummy; @@ -816,16 +850,17 @@ void DDLWorker::checkShardConfig(const String & table, const DDLTask & task, Sto if (storage->supportsReplication() && !config_is_replicated_shard) { - throw Exception("Table " + backQuote(table) + " is replicated, but shard #" + toString(task.host_shard_num + 1) + - " isn't replicated according to its cluster definition." - " Possibly true is forgotten in the cluster config.", - ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION); + throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, + "Table {} is replicated, but shard #{} isn't replicated according to its cluster definition. " + "Possibly true is forgotten in the cluster config.", + backQuote(table), task.host_shard_num + 1); } if (!storage->supportsReplication() && config_is_replicated_shard) { - throw Exception("Table " + backQuote(table) + " isn't replicated, but shard #" + toString(task.host_shard_num + 1) + - " is replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION); + throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, + "Table {} isn't replicated, but shard #{} is replicated according to its cluster definition", + backQuote(table), task.host_shard_num + 1); } } @@ -841,7 +876,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( /// If we will develop new replicated storage if (!replicated_storage) - throw Exception("Storage type '" + storage->getName() + "' is not supported by distributed DDL", ErrorCodes::NOT_IMPLEMENTED); + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Storage type '{}' is not supported by distributed DDL", storage->getName()); /// Generate unique name for shard node, it will be used to execute the query by only single host /// Shard node name has format 'replica_name1,replica_name2,...,replica_nameN' @@ -1118,7 +1153,7 @@ void DDLWorker::runMainThread() attachToThreadGroup(); cleanup_event->set(); - processTasks(); + scheduleTasks(); LOG_DEBUG(log, "Waiting a watch"); queue_updated_event->wait(); @@ -1127,23 +1162,7 @@ void DDLWorker::runMainThread() { if (Coordination::isHardwareError(e.code)) { - LOG_DEBUG(log, "Recovering ZooKeeper session after: {}", getCurrentExceptionMessage(false)); - - while (!stop_flag) - { - try - { - getAndSetZooKeeper(); - break; - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - - using namespace std::chrono_literals; - std::this_thread::sleep_for(5s); - } - } + recoverZooKeeper(); } else if (e.code == Coordination::Error::ZNONODE) { @@ -1260,28 +1279,24 @@ public: size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished; size_t num_active_hosts = current_active_hosts.size(); - std::stringstream msg; - msg << "Watching task " << node_path << " is executing longer than distributed_ddl_task_timeout" - << " (=" << timeout_seconds << ") seconds." - << " There are " << num_unfinished_hosts << " unfinished hosts" - << " (" << num_active_hosts << " of them are currently active)" - << ", they are going to execute the query in background"; - throw Exception(msg.str(), ErrorCodes::TIMEOUT_EXCEEDED); + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, + "Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. " + "There are {} unfinished hosts ({} of them are currently active), they are going to execute the query in background", + node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts); } if (num_hosts_finished != 0 || try_number != 0) { - auto current_sleep_for = std::chrono::milliseconds(std::min(static_cast(1000), 50 * (try_number + 1))); - std::this_thread::sleep_for(current_sleep_for); + sleepForMilliseconds(std::min(1000, 50 * (try_number + 1))); } /// TODO: add shared lock if (!zookeeper->exists(node_path)) { - throw Exception("Cannot provide query execution status. The query's node " + node_path - + " has been deleted by the cleaner since it was finished (or its lifetime is expired)", - ErrorCodes::UNFINISHED); + throw Exception(ErrorCodes::UNFINISHED, + "Cannot provide query execution status. The query's node {} has been deleted by the cleaner since it was finished (or its lifetime is expired)", + node_path); } Strings new_hosts = getNewAndUpdate(getChildrenAllowNoNode(zookeeper, node_path + "/finished")); @@ -1304,7 +1319,7 @@ public: auto [host, port] = Cluster::Address::fromString(host_id); if (status.code != 0 && first_exception == nullptr) - first_exception = std::make_unique("There was an error on [" + host + ":" + toString(port) + "]: " + status.message, status.code); + first_exception = std::make_unique(status.code, "There was an error on [{}:{}]: {}", host, port, status.message); ++num_hosts_finished; diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index 544fb3da27d..f6b4dd00684 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -26,6 +26,7 @@ class ASTAlterQuery; class AccessRightsElements; struct DDLLogEntry; struct DDLTask; +using DDLTaskPtr = std::unique_ptr; /// Pushes distributed DDL query to the queue @@ -37,7 +38,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & conte class DDLWorker { public: - DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix); + DDLWorker(int pool_size_, const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix); ~DDLWorker(); /// Pushes query into DDL queue, returns path to created node @@ -57,14 +58,19 @@ private: ZooKeeperPtr tryGetZooKeeper() const; /// If necessary, creates a new session and caches it. ZooKeeperPtr getAndSetZooKeeper(); + /// ZooKeeper recover loop (while not stopped). + void recoverZooKeeper(); - void processTasks(); + void checkCurrentTasks(); + void scheduleTasks(); + void saveTask(const String & entry_name); /// Reads entry and check that the host belongs to host list of the task - /// Returns true and sets current_task if entry parsed and the check is passed - bool initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper); + /// Returns non-empty DDLTaskPtr if entry parsed and the check is passed + DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper); - void processTask(DDLTask & task, const ZooKeeperPtr & zookeeper); + void enqueueTask(DDLTaskPtr task); + void processTask(DDLTask & task); /// Check that query should be executed on leader replica only static bool taskShouldBeExecutedOnLeader(const ASTPtr ast_ddl, StoragePtr storage); @@ -101,32 +107,31 @@ private: void attachToThreadGroup(); private: - bool is_circular_replicated; + std::atomic is_circular_replicated = false; Context & context; Poco::Logger * log; - std::unique_ptr current_context; std::string host_fqdn; /// current host domain name std::string host_fqdn_id; /// host_name:port std::string queue_dir; /// dir with queue of queries - /// Name of last task that was skipped or successfully executed - std::string last_processed_task_name; - mutable std::mutex zookeeper_mutex; ZooKeeperPtr current_zookeeper; /// Save state of executed task to avoid duplicate execution on ZK error - using DDLTaskPtr = std::unique_ptr; - DDLTaskPtr current_task; + std::vector last_tasks; std::shared_ptr queue_updated_event = std::make_shared(); std::shared_ptr cleanup_event = std::make_shared(); - std::atomic stop_flag{false}; + std::atomic stop_flag = false; ThreadFromGlobalPool main_thread; ThreadFromGlobalPool cleanup_thread; + /// Size of the pool for query execution. + size_t pool_size = 1; + ThreadPool worker_pool; + /// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago Int64 cleanup_delay_period = 60; // minute (in seconds) /// Delete node if its age is greater than that