Add queue cleanup. [#CLICKHOUSE-5]

This commit is contained in:
Vitaliy Lyudvichenko 2017-04-19 17:21:27 +03:00
parent feb1db051d
commit 91f45c628b
2 changed files with 78 additions and 4 deletions

View File

@ -24,6 +24,8 @@
#include <Columns/ColumnArray.h>
#include <zkutil/ZooKeeper.h>
#include <Poco/Timestamp.h>
namespace DB
{
@ -107,7 +109,6 @@ DDLWorker::~DDLWorker()
void DDLWorker::processTasks()
{
LOG_DEBUG(log, "processTasks");
zookeeper->createAncestors(root_dir + "/");
Strings queue_nodes = zookeeper->getChildren(root_dir, nullptr, event_queue_updated);
if (queue_nodes.empty())
@ -172,6 +173,62 @@ void DDLWorker::processTasks()
}
}
void DDLWorker::cleanupQueue(const Strings * node_names_to_check)
{
/// Both ZK and Poco use Unix epoch
size_t current_time_seconds = Poco::Timestamp().epochTime();
constexpr size_t zookeeper_time_resolution = 1000;
// Too early to check
if (!last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + 10)
return;
last_cleanup_time_seconds = current_time_seconds;
String data;
zkutil::Stat stat;
DDLLogEntry node;
Strings failed_hosts, sucess_hosts;
Strings node_names_fetched = node_names_to_check ? Strings{} : zookeeper->getChildren(root_dir);
const Strings & node_names = (node_names_to_check) ? *node_names_to_check : node_names_fetched;
for (const String & node_name : node_names)
{
/// TODO: Add /root/locks/node_name lock to avoid rare race counditions.
try
{
String node_path = root_dir + "/" + node_name;
if (!zookeeper->tryGet(node_path, data, &stat))
continue;
node.parse(data);
size_t zookeeper_time_seconds = stat.mtime / zookeeper_time_resolution;
if (zookeeper_time_seconds + node_max_lifetime_seconds < current_time_seconds)
{
size_t lifetime_seconds = current_time_seconds - zookeeper_time_seconds;
LOG_INFO(log, "Lifetime of node " << node_name << " (" << lifetime_seconds << " sec.) is expired, deleting it");
zookeeper->removeRecursive(node_path);
continue;
}
Strings sucess_nodes = zookeeper->getChildren(node_path + "/sucess");
Strings failed_nodes = zookeeper->getChildren(node_path + "/failed");
if (sucess_nodes.size() + failed_nodes.size() >= node.hosts.size())
{
LOG_INFO(log, "Node " << node_name << " had been executed by each host, deleting it");
zookeeper->removeRecursive(node_path);
}
}
catch (...)
{
tryLogCurrentException(log, "An error occured while checking and cleaning node " + node_name + " from queue");
}
}
}
/// Try to create unexisting "status" dirs for a node
void DDLWorker::createStatusDirs(const std::string & node_path)
@ -260,6 +317,7 @@ void DDLWorker::run()
{
using namespace std::chrono_literals;
zookeeper->createAncestors(root_dir + "/");
LOG_DEBUG(log, "Started DDLWorker thread");
while (!stop_flag)
@ -275,6 +333,15 @@ void DDLWorker::run()
tryLogCurrentException(log);
}
try
{
cleanupQueue();
}
catch (...)
{
tryLogCurrentException(log);
}
//std::unique_lock<std::mutex> g(lock);
//cond_var.wait_for(g, 10s);
@ -287,6 +354,7 @@ void DDLWorker::run()
class DDLQueryStatusInputSream : public IProfilingBlockInputStream
{
public:
DDLQueryStatusInputSream(const String & zk_node_path, Context & context, size_t num_hosts)
: node_path(zk_node_path), context(context)
{
@ -364,10 +432,12 @@ public:
for (const String & elem : cur_list)
{
if (!prev.count(elem))
{
diff.emplace_back(elem);
prev.emplace(elem);
}
}
prev.insert(diff.cbegin(), diff.cend());
return diff;
}

View File

@ -39,8 +39,8 @@ private:
void createStatusDirs(const std::string & node_name);
void processQueries();
bool processQuery(const std::string & task);
/// Checks and cleanups queue's nodes
void cleanupQueue(const Strings * node_names_to_check = nullptr);
void run();
@ -62,6 +62,10 @@ private:
std::mutex lock;
std::thread thread;
size_t last_cleanup_time_seconds = 0;
static constexpr size_t node_max_lifetime_seconds = 10; // 7 * 24 * 60 * 60;
static constexpr size_t cleanup_after_seconds = 10;
friend class DDLQueryStatusInputSream;
};