From f9fb73ce2e62828ef4c9d934794c7cb9836ec423 Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Fri, 28 Sep 2018 16:44:39 +0300 Subject: [PATCH] fix distributed "create table as select" query --- dbms/src/Interpreters/DDLWorker.cpp | 21 ++++++++++++++++++++- dbms/src/Interpreters/DDLWorker.h | 6 ++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index 69e951f18e4..b3baf0731ff 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -178,7 +178,7 @@ struct DDLTask Cluster::Address address_in_cluster; size_t host_shard_num; size_t host_replica_num; - +// /// Stage 3.3: execute query ExecutionStatus execution_status; bool was_executed = false; @@ -335,6 +335,7 @@ void DDLWorker::processTasks() { LOG_DEBUG(log, "Processing tasks"); + Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, event_queue_updated); filterAndSortQueueNodes(queue_nodes); if (queue_nodes.empty()) @@ -536,11 +537,29 @@ bool DDLWorker::tryExecuteQuery(const String & query, const DDLTask & task, Exec return true; } +void DDLWorker::attachToThreadGroup() +{ + std::lock_guard lock(thread_group_mutex); + + if (thread_group) + { + /// Put all threads to one thread pool + CurrentThread::attachToIfDetached(thread_group); + } + else + { + CurrentThread::initializeQuery(); + thread_group = CurrentThread::getGroup(); + } +} + void DDLWorker::processTask(DDLTask & task) { LOG_DEBUG(log, "Processing task " << task.entry_name << " (" << task.entry.query << ")"); + attachToThreadGroup(); + String dummy; String active_node_path = task.entry_path + "/active/" + task.host_id_str; String finished_node_path = task.entry_path + "/finished/" + task.host_id_str; diff --git a/dbms/src/Interpreters/DDLWorker.h b/dbms/src/Interpreters/DDLWorker.h index 5db27c49f10..76fa960b3ec 100644 --- a/dbms/src/Interpreters/DDLWorker.h +++ b/dbms/src/Interpreters/DDLWorker.h @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -67,6 +68,8 @@ private: void run(); + void attachToThreadGroup(); + private: Context & context; Logger * log; @@ -98,6 +101,9 @@ private: /// How many tasks could be in the queue size_t max_tasks_in_queue = 1000; + ThreadGroupStatusPtr thread_group; + std::mutex thread_group_mutex; + friend class DDLQueryStatusInputSream; friend struct DDLTask; };