slightly better DDLWorker initialization and restarting

This commit is contained in:
Alexander Tokmakov 2020-12-03 21:14:27 +03:00
parent 1a4bd67736
commit 39532f7d9e
7 changed files with 114 additions and 132 deletions

View File

@ -213,8 +213,6 @@ std::pair<ResponsePtr, Undo> TestKeeperCreateRequest::process(TestKeeper::Contai
created_node.is_sequental = is_sequential;
std::string path_created = path;
++it->second.seq_num;
if (is_sequential)
{
auto seq_num = it->second.seq_num;
@ -226,6 +224,8 @@ std::pair<ResponsePtr, Undo> TestKeeperCreateRequest::process(TestKeeper::Contai
path_created += seq_num_str.str();
}
++it->second.seq_num;
response.path_created = path_created;
container.emplace(path_created, std::move(created_node));

View File

@ -11,10 +11,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <filesystem>
//FIXME it shouldn't be here
#include <Interpreters/DDLTask.h>
#include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{

View File

@ -17,7 +17,26 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db
/// Pool size must be 1 (to avoid reordering of log entries)
}
void DatabaseReplicatedDDLWorker::initialize()
void DatabaseReplicatedDDLWorker::initializeMainThread()
{
do
{
try
{
auto zookeeper = getAndSetZooKeeper();
initializeReplication();
initialized = true;
}
catch (...)
{
tryLogCurrentException(log, fmt::format("Error on initialization of {}", database->getDatabaseName()));
sleepForSeconds(5);
}
}
while (!initialized && !stop_flag);
}
void DatabaseReplicatedDDLWorker::initializeReplication()
{
/// Check if we need to recover replica.
/// Invariant: replica is lost if it's log_ptr value is less then min_log_ptr value.
@ -101,11 +120,16 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
if (task->entry.query.empty())
{
//TODO better way to determine special entries
task->was_executed = true;
out_reason = "It's dummy task";
return {};
}
else
task->parseQueryFromEntry(context);
if (zookeeper->exists(task->getFinishedNodePath()))
{
task->parseQueryFromEntry(context);
out_reason = "Task has been already processed";
return {};
}
return task;

View File

@ -15,7 +15,8 @@ public:
String enqueueQuery(DDLLogEntry & entry) override;
private:
void initialize() override;
void initializeMainThread() override;
void initializeReplication();
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override;

View File

@ -76,6 +76,8 @@ struct DDLTaskBase
bool was_executed = false;
DDLTaskBase(const String & name, const String & path) : entry_name(name), entry_path(path) {}
DDLTaskBase(const DDLTaskBase &) = delete;
DDLTaskBase(DDLTaskBase &&) = default;
virtual ~DDLTaskBase() = default;
void parseQueryFromEntry(const Context & context);

View File

@ -143,9 +143,14 @@ DDLWorker::DDLWorker(int pool_size_, const std::string & zk_root_dir, const Cont
const String & logger_name)
: context(context_)
, log(&Poco::Logger::get(logger_name))
, pool_size(pool_size_) //FIXME make it optional
, worker_pool(pool_size_)
, pool_size(pool_size_)
{
if (1 < pool_size)
{
LOG_WARNING(log, "DDLWorker is configured to use multiple threads. "
"It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear.");
worker_pool.emplace(pool_size);
}
queue_dir = zk_root_dir;
if (queue_dir.back() == '/')
queue_dir.resize(queue_dir.size() - 1);
@ -185,7 +190,8 @@ void DDLWorker::shutdown()
DDLWorker::~DDLWorker()
{
shutdown();
worker_pool.wait();
if (worker_pool)
worker_pool->wait();
if (main_thread.joinable())
main_thread.join();
if (cleanup_thread.joinable())
@ -209,24 +215,6 @@ ZooKeeperPtr DDLWorker::getAndSetZooKeeper()
return current_zookeeper;
}
void DDLWorker::recoverZooKeeper()
{
LOG_DEBUG(log, "Recovering ZooKeeper session after: {}", getCurrentExceptionMessage(false));
while (!stop_flag)
{
try
{
getAndSetZooKeeper();
break;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
sleepForSeconds(5);
}
}
}
DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper)
{
@ -285,6 +273,12 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
return {};
}
if (zookeeper->exists(task->getFinishedNodePath()))
{
out_reason = "Task has been already processed";
return {};
}
/// Now task is ready for execution
return task;
}
@ -309,11 +303,11 @@ void DDLWorker::scheduleTasks()
return;
}
bool server_startup = !last_entry_name.has_value();
bool server_startup = current_tasks.empty();
auto begin_node = server_startup
? queue_nodes.begin()
: std::upper_bound(queue_nodes.begin(), queue_nodes.end(), *last_entry_name);
: std::upper_bound(queue_nodes.begin(), queue_nodes.end(), current_tasks.back()->entry_name);
for (auto it = begin_node; it != queue_nodes.end() && !stop_flag; ++it)
{
@ -325,42 +319,39 @@ void DDLWorker::scheduleTasks()
if (!task)
{
LOG_DEBUG(log, "Will not execute task {}: {}", entry_name, reason);
last_entry_name = entry_name;
task->was_executed = true;
saveTask(std::move(task)); //FIXME questionable
continue;
}
bool already_processed = zookeeper->exists(task->entry_path + "/finished/" + task->host_id_str);
if (!server_startup && !task->was_executed && already_processed)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Server expects that DDL task {} should be processed, but it was already processed according to ZK",
entry_name);
}
auto & saved_task = saveTask(std::move(task));
if (!already_processed)
if (worker_pool)
{
if (pool_size == 1)
worker_pool->scheduleOrThrowOnError([this, &saved_task]()
{
enqueueTask(DDLTaskPtr(task.release()));
}
else
{
worker_pool.scheduleOrThrowOnError([this, task_ptr = task.release()]()
{
setThreadName("DDLWorkerExec");
enqueueTask(DDLTaskPtr(task_ptr));
});
}
setThreadName("DDLWorkerExec");
processTask(saved_task);
});
}
else
{
LOG_DEBUG(log, "Task {} ({}) has been already processed", entry_name, task->entry.query);
processTask(saved_task);
}
last_entry_name = entry_name;
}
}
DDLTaskBase & DDLWorker::saveTask(DDLTaskPtr && task)
{
if (current_tasks.size() == pool_size)
{
assert(current_tasks.front()->was_executed);
current_tasks.pop_front();
}
current_tasks.emplace_back(std::move(task));
return *current_tasks.back();
}
bool DDLWorker::tryExecuteQuery(const String & query, const DDLTaskBase & task, ExecutionStatus & status)
{
/// Add special comment at the start of query to easily identify DDL-produced queries in query_log
@ -404,48 +395,6 @@ void DDLWorker::attachToThreadGroup()
}
}
void DDLWorker::enqueueTask(DDLTaskPtr task_ptr)
{
auto & task = *task_ptr;
while (!stop_flag)
{
try
{
processTask(task);
return;
}
/// TODO recover zk in runMainThread(...) and retry task (why do we need another place where session is recovered?)
catch (const Coordination::Exception & e)
{
if (Coordination::isHardwareError(e.code))
{
recoverZooKeeper();
}
else if (e.code == Coordination::Error::ZNONODE)
{
LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true));
if (!current_zookeeper->exists(task_ptr->entry_path))
{
//FIXME race condition with cleanup thread
LOG_ERROR(log, "Task {} is lost. It probably was removed by other server.", task_ptr->entry_path);
return;
}
}
else
{
LOG_ERROR(log, "Unexpected ZooKeeper error: {}.", getCurrentExceptionMessage(true));
return;
}
}
catch (...)
{
LOG_WARNING(log, "An error occurred while processing task {} ({}) : {}", task.entry_name, task.entry.query, getCurrentExceptionMessage(true));
}
}
}
void DDLWorker::processTask(DDLTaskBase & task)
{
auto zookeeper = tryGetZooKeeper();
@ -458,22 +407,16 @@ void DDLWorker::processTask(DDLTaskBase & task)
String dummy;
auto code = zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy);
if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
{
// Ok
}
else if (code == Coordination::Error::ZNONODE)
if (code == Coordination::Error::ZNONODE)
{
/// There is no parent
//TODO why not to create parent before active_node?
createStatusDirs(task.entry_path, zookeeper);
if (Coordination::Error::ZOK != zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy))
throw Coordination::Exception(code, active_node_path);
zookeeper->create(active_node_path, "", zkutil::CreateMode::Ephemeral);
}
else
throw Coordination::Exception(code, active_node_path);
if (!task.was_executed)
if (!task.was_executed) // FIXME always true
{
try
{
@ -513,6 +456,9 @@ void DDLWorker::processTask(DDLTaskBase & task)
}
/// FIXME: if server fails right here, the task will be executed twice. We need WAL here.
/// Another possible issue: if ZooKeeper session is lost here, we will recover connection and execute the task second time.
/// Delete active flag and create finish flag
Coordination::Requests ops;
@ -787,7 +733,9 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
/// Optional step
/// We cannot create status dirs in a single transaction with previous request,
/// because we don't know node_path until previous request is executed.
/// Se we try to create status dirs here or later when we will execute entry.
try
{
createStatusDirs(node_path, zookeeper);
@ -801,70 +749,80 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
}
void DDLWorker::runMainThread()
void DDLWorker::initializeMainThread()
{
setThreadName("DDLWorker");
LOG_DEBUG(log, "Started DDLWorker thread");
do
{
try
{
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(queue_dir + "/");
initialize();
initialized = true;
}
catch (const Coordination::Exception & e)
{
if (!Coordination::isHardwareError(e.code))
throw; /// A logical error.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected ZooKeeper error: {}", e.message());
tryLogCurrentException(__PRETTY_FUNCTION__);
/// Avoid busy loop when ZooKeeper is not available.
sleepForSeconds(1);
sleepForSeconds(5);
}
catch (...)
{
tryLogCurrentException(log, "Terminating. Cannot initialize DDL queue.");
return;
tryLogCurrentException(log, "Cannot initialize main thread of DDLWorker, will try again");
sleepForSeconds(5);
}
}
while (!initialized && !stop_flag);
}
void DDLWorker::runMainThread()
{
setThreadName("DDLWorker");
attachToThreadGroup();
LOG_DEBUG(log, "Starting DDLWorker thread");
while (!stop_flag)
{
try
{
attachToThreadGroup();
/// Reinitialize DDLWorker state (including ZooKeeper connection) if required
if (!initialized)
{
initializeMainThread();
LOG_DEBUG(log, "Initialized DDLWorker thread");
}
cleanup_event->set();
scheduleTasks();
LOG_DEBUG(log, "Waiting a watch");
LOG_DEBUG(log, "Waiting for queue updates");
queue_updated_event->wait();
}
catch (const Coordination::Exception & e)
{
if (Coordination::isHardwareError(e.code))
{
recoverZooKeeper();
initialized = false;
}
else if (e.code == Coordination::Error::ZNONODE)
{
// TODO add comment: when it happens and why it's expected?
// maybe because cleanup thread may remove nodes inside queue entry which are currently processed
LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true));
}
else
{
LOG_ERROR(log, "Unexpected ZooKeeper error: {}. Terminating.", getCurrentExceptionMessage(true));
return;
LOG_ERROR(log, "Unexpected ZooKeeper error: {}.", getCurrentExceptionMessage(true));
assert(false);
}
}
catch (...)
{
tryLogCurrentException(log, "Unexpected error, will terminate:");
return;
tryLogCurrentException(log, "Unexpected error, will try to restart main thread:");
initialized = false;
}
}
}
@ -891,6 +849,7 @@ void DDLWorker::runCleanupThread()
continue;
}
/// ZooKeeper connection is recovered by main thread. We will wait for it on cleanup_event.
auto zookeeper = tryGetZooKeeper();
if (zookeeper->expired())
continue;

View File

@ -62,17 +62,16 @@ protected:
ZooKeeperPtr tryGetZooKeeper() const;
/// If necessary, creates a new session and caches it.
ZooKeeperPtr getAndSetZooKeeper();
/// ZooKeeper recover loop (while not stopped).
void recoverZooKeeper();
void checkCurrentTasks();
/// Iterates through queue tasks in ZooKeeper, runs execution of new tasks
void scheduleTasks();
DDLTaskBase & saveTask(DDLTaskPtr && task);
/// Reads entry and check that the host belongs to host list of the task
/// Returns non-empty DDLTaskPtr if entry parsed and the check is passed
virtual DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper);
void enqueueTask(DDLTaskPtr task);
void processTask(DDLTaskBase & task);
/// Check that query should be executed on leader replica only
@ -98,7 +97,7 @@ protected:
/// Init task node
static void createStatusDirs(const std::string & node_path, const ZooKeeperPtr & zookeeper);
virtual void initialize() {}
virtual void initializeMainThread();
void runMainThread();
void runCleanupThread();
@ -117,8 +116,8 @@ protected:
ZooKeeperPtr current_zookeeper;
/// Save state of executed task to avoid duplicate execution on ZK error
//std::vector<std::string> last_tasks;
std::optional<String> last_entry_name;
//std::optional<String> last_entry_name;
std::list<DDLTaskPtr> current_tasks;
std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>();
std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>();
@ -130,7 +129,7 @@ protected:
/// Size of the pool for query execution.
size_t pool_size = 1;
ThreadPool worker_pool;
std::optional<ThreadPool> worker_pool;
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
Int64 cleanup_delay_period = 60; // minute (in seconds)