diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index ceaf3f7ac4c..1c778b3b67b 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -24,6 +24,8 @@ #include #include +#include + namespace DB { @@ -107,7 +109,6 @@ DDLWorker::~DDLWorker() void DDLWorker::processTasks() { LOG_DEBUG(log, "processTasks"); - zookeeper->createAncestors(root_dir + "/"); Strings queue_nodes = zookeeper->getChildren(root_dir, nullptr, event_queue_updated); if (queue_nodes.empty()) @@ -172,6 +173,62 @@ void DDLWorker::processTasks() } } +void DDLWorker::cleanupQueue(const Strings * node_names_to_check) +{ + /// Both ZK and Poco use Unix epoch + size_t current_time_seconds = Poco::Timestamp().epochTime(); + constexpr size_t zookeeper_time_resolution = 1000; + + // Too early to check + if (!last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + 10) + return; + + last_cleanup_time_seconds = current_time_seconds; + + String data; + zkutil::Stat stat; + DDLLogEntry node; + Strings failed_hosts, sucess_hosts; + + Strings node_names_fetched = node_names_to_check ? Strings{} : zookeeper->getChildren(root_dir); + const Strings & node_names = (node_names_to_check) ? *node_names_to_check : node_names_fetched; + + for (const String & node_name : node_names) + { + /// TODO: Add /root/locks/node_name lock to avoid rare race counditions. + try + { + String node_path = root_dir + "/" + node_name; + if (!zookeeper->tryGet(node_path, data, &stat)) + continue; + + node.parse(data); + + size_t zookeeper_time_seconds = stat.mtime / zookeeper_time_resolution; + if (zookeeper_time_seconds + node_max_lifetime_seconds < current_time_seconds) + { + size_t lifetime_seconds = current_time_seconds - zookeeper_time_seconds; + LOG_INFO(log, "Lifetime of node " << node_name << " (" << lifetime_seconds << " sec.) is expired, deleting it"); + zookeeper->removeRecursive(node_path); + continue; + } + + Strings sucess_nodes = zookeeper->getChildren(node_path + "/sucess"); + Strings failed_nodes = zookeeper->getChildren(node_path + "/failed"); + + if (sucess_nodes.size() + failed_nodes.size() >= node.hosts.size()) + { + LOG_INFO(log, "Node " << node_name << " had been executed by each host, deleting it"); + zookeeper->removeRecursive(node_path); + } + } + catch (...) + { + tryLogCurrentException(log, "An error occured while checking and cleaning node " + node_name + " from queue"); + } + } +} + /// Try to create unexisting "status" dirs for a node void DDLWorker::createStatusDirs(const std::string & node_path) @@ -260,6 +317,7 @@ void DDLWorker::run() { using namespace std::chrono_literals; + zookeeper->createAncestors(root_dir + "/"); LOG_DEBUG(log, "Started DDLWorker thread"); while (!stop_flag) @@ -275,6 +333,15 @@ void DDLWorker::run() tryLogCurrentException(log); } + try + { + cleanupQueue(); + } + catch (...) + { + tryLogCurrentException(log); + } + //std::unique_lock g(lock); //cond_var.wait_for(g, 10s); @@ -287,6 +354,7 @@ void DDLWorker::run() class DDLQueryStatusInputSream : public IProfilingBlockInputStream { public: + DDLQueryStatusInputSream(const String & zk_node_path, Context & context, size_t num_hosts) : node_path(zk_node_path), context(context) { @@ -364,10 +432,12 @@ public: for (const String & elem : cur_list) { if (!prev.count(elem)) + { diff.emplace_back(elem); + prev.emplace(elem); + } } - prev.insert(diff.cbegin(), diff.cend()); return diff; } diff --git a/dbms/src/Interpreters/DDLWorker.h b/dbms/src/Interpreters/DDLWorker.h index 99fc7633a18..34903e675a2 100644 --- a/dbms/src/Interpreters/DDLWorker.h +++ b/dbms/src/Interpreters/DDLWorker.h @@ -39,8 +39,8 @@ private: void createStatusDirs(const std::string & node_name); - void processQueries(); - bool processQuery(const std::string & task); + /// Checks and cleanups queue's nodes + void cleanupQueue(const Strings * node_names_to_check = nullptr); void run(); @@ -62,6 +62,10 @@ private: std::mutex lock; std::thread thread; + size_t last_cleanup_time_seconds = 0; + static constexpr size_t node_max_lifetime_seconds = 10; // 7 * 24 * 60 * 60; + static constexpr size_t cleanup_after_seconds = 10; + friend class DDLQueryStatusInputSream; };