diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 2d89228c7ae..86387417a3c 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -213,8 +213,6 @@ std::pair TestKeeperCreateRequest::process(TestKeeper::Contai created_node.is_sequental = is_sequential; std::string path_created = path; - ++it->second.seq_num; - if (is_sequential) { auto seq_num = it->second.seq_num; @@ -226,6 +224,8 @@ std::pair TestKeeperCreateRequest::process(TestKeeper::Contai path_created += seq_num_str.str(); } + ++it->second.seq_num; + response.path_created = path_created; container.emplace(path_created, std::move(created_node)); diff --git a/src/Databases/DatabaseAtomic.cpp b/src/Databases/DatabaseAtomic.cpp index b60adf44e51..438fa2d97bd 100644 --- a/src/Databases/DatabaseAtomic.cpp +++ b/src/Databases/DatabaseAtomic.cpp @@ -11,10 +11,7 @@ #include #include #include - -//FIXME it shouldn't be here #include -#include namespace DB { diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index 29599d4d66d..0c2368cdcf6 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -17,7 +17,26 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db /// Pool size must be 1 (to avoid reordering of log entries) } -void DatabaseReplicatedDDLWorker::initialize() +void DatabaseReplicatedDDLWorker::initializeMainThread() +{ + do + { + try + { + auto zookeeper = getAndSetZooKeeper(); + initializeReplication(); + initialized = true; + } + catch (...) + { + tryLogCurrentException(log, fmt::format("Error on initialization of {}", database->getDatabaseName())); + sleepForSeconds(5); + } + } + while (!initialized && !stop_flag); +} + +void DatabaseReplicatedDDLWorker::initializeReplication() { /// Check if we need to recover replica. /// Invariant: replica is lost if it's log_ptr value is less then min_log_ptr value. @@ -101,11 +120,16 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na if (task->entry.query.empty()) { //TODO better way to determine special entries - task->was_executed = true; + out_reason = "It's dummy task"; + return {}; } - else + + task->parseQueryFromEntry(context); + + if (zookeeper->exists(task->getFinishedNodePath())) { - task->parseQueryFromEntry(context); + out_reason = "Task has been already processed"; + return {}; } return task; diff --git a/src/Databases/DatabaseReplicatedWorker.h b/src/Databases/DatabaseReplicatedWorker.h index d190bd1795d..7994104331e 100644 --- a/src/Databases/DatabaseReplicatedWorker.h +++ b/src/Databases/DatabaseReplicatedWorker.h @@ -15,7 +15,8 @@ public: String enqueueQuery(DDLLogEntry & entry) override; private: - void initialize() override; + void initializeMainThread() override; + void initializeReplication(); DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override; diff --git a/src/Interpreters/DDLTask.h b/src/Interpreters/DDLTask.h index 2db1a696384..94127b39b84 100644 --- a/src/Interpreters/DDLTask.h +++ b/src/Interpreters/DDLTask.h @@ -76,6 +76,8 @@ struct DDLTaskBase bool was_executed = false; DDLTaskBase(const String & name, const String & path) : entry_name(name), entry_path(path) {} + DDLTaskBase(const DDLTaskBase &) = delete; + DDLTaskBase(DDLTaskBase &&) = default; virtual ~DDLTaskBase() = default; void parseQueryFromEntry(const Context & context); diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 188d38b8647..e4ea5f8db17 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -143,9 +143,14 @@ DDLWorker::DDLWorker(int pool_size_, const std::string & zk_root_dir, const Cont const String & logger_name) : context(context_) , log(&Poco::Logger::get(logger_name)) - , pool_size(pool_size_) //FIXME make it optional - , worker_pool(pool_size_) + , pool_size(pool_size_) { + if (1 < pool_size) + { + LOG_WARNING(log, "DDLWorker is configured to use multiple threads. " + "It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear."); + worker_pool.emplace(pool_size); + } queue_dir = zk_root_dir; if (queue_dir.back() == '/') queue_dir.resize(queue_dir.size() - 1); @@ -185,7 +190,8 @@ void DDLWorker::shutdown() DDLWorker::~DDLWorker() { shutdown(); - worker_pool.wait(); + if (worker_pool) + worker_pool->wait(); if (main_thread.joinable()) main_thread.join(); if (cleanup_thread.joinable()) @@ -209,24 +215,6 @@ ZooKeeperPtr DDLWorker::getAndSetZooKeeper() return current_zookeeper; } -void DDLWorker::recoverZooKeeper() -{ - LOG_DEBUG(log, "Recovering ZooKeeper session after: {}", getCurrentExceptionMessage(false)); - - while (!stop_flag) - { - try - { - getAndSetZooKeeper(); - break; - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - sleepForSeconds(5); - } - } -} DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) { @@ -285,6 +273,12 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r return {}; } + if (zookeeper->exists(task->getFinishedNodePath())) + { + out_reason = "Task has been already processed"; + return {}; + } + /// Now task is ready for execution return task; } @@ -309,11 +303,11 @@ void DDLWorker::scheduleTasks() return; } - bool server_startup = !last_entry_name.has_value(); + bool server_startup = current_tasks.empty(); auto begin_node = server_startup ? queue_nodes.begin() - : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), *last_entry_name); + : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), current_tasks.back()->entry_name); for (auto it = begin_node; it != queue_nodes.end() && !stop_flag; ++it) { @@ -325,42 +319,39 @@ void DDLWorker::scheduleTasks() if (!task) { LOG_DEBUG(log, "Will not execute task {}: {}", entry_name, reason); - last_entry_name = entry_name; + task->was_executed = true; + saveTask(std::move(task)); //FIXME questionable continue; } - bool already_processed = zookeeper->exists(task->entry_path + "/finished/" + task->host_id_str); - if (!server_startup && !task->was_executed && already_processed) - { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Server expects that DDL task {} should be processed, but it was already processed according to ZK", - entry_name); - } + auto & saved_task = saveTask(std::move(task)); - if (!already_processed) + if (worker_pool) { - if (pool_size == 1) + worker_pool->scheduleOrThrowOnError([this, &saved_task]() { - enqueueTask(DDLTaskPtr(task.release())); - } - else - { - worker_pool.scheduleOrThrowOnError([this, task_ptr = task.release()]() - { - setThreadName("DDLWorkerExec"); - enqueueTask(DDLTaskPtr(task_ptr)); - }); - } + setThreadName("DDLWorkerExec"); + processTask(saved_task); + }); } else { - LOG_DEBUG(log, "Task {} ({}) has been already processed", entry_name, task->entry.query); + processTask(saved_task); } - - last_entry_name = entry_name; } } +DDLTaskBase & DDLWorker::saveTask(DDLTaskPtr && task) +{ + if (current_tasks.size() == pool_size) + { + assert(current_tasks.front()->was_executed); + current_tasks.pop_front(); + } + current_tasks.emplace_back(std::move(task)); + return *current_tasks.back(); +} + bool DDLWorker::tryExecuteQuery(const String & query, const DDLTaskBase & task, ExecutionStatus & status) { /// Add special comment at the start of query to easily identify DDL-produced queries in query_log @@ -404,48 +395,6 @@ void DDLWorker::attachToThreadGroup() } } - -void DDLWorker::enqueueTask(DDLTaskPtr task_ptr) -{ - auto & task = *task_ptr; - - while (!stop_flag) - { - try - { - processTask(task); - return; - } - /// TODO recover zk in runMainThread(...) and retry task (why do we need another place where session is recovered?) - catch (const Coordination::Exception & e) - { - if (Coordination::isHardwareError(e.code)) - { - recoverZooKeeper(); - } - else if (e.code == Coordination::Error::ZNONODE) - { - LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true)); - if (!current_zookeeper->exists(task_ptr->entry_path)) - { - //FIXME race condition with cleanup thread - LOG_ERROR(log, "Task {} is lost. It probably was removed by other server.", task_ptr->entry_path); - return; - } - } - else - { - LOG_ERROR(log, "Unexpected ZooKeeper error: {}.", getCurrentExceptionMessage(true)); - return; - } - } - catch (...) - { - LOG_WARNING(log, "An error occurred while processing task {} ({}) : {}", task.entry_name, task.entry.query, getCurrentExceptionMessage(true)); - } - } -} - void DDLWorker::processTask(DDLTaskBase & task) { auto zookeeper = tryGetZooKeeper(); @@ -458,22 +407,16 @@ void DDLWorker::processTask(DDLTaskBase & task) String dummy; auto code = zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy); - if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS) - { - // Ok - } - else if (code == Coordination::Error::ZNONODE) + if (code == Coordination::Error::ZNONODE) { /// There is no parent - //TODO why not to create parent before active_node? createStatusDirs(task.entry_path, zookeeper); - if (Coordination::Error::ZOK != zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy)) - throw Coordination::Exception(code, active_node_path); + zookeeper->create(active_node_path, "", zkutil::CreateMode::Ephemeral); } else throw Coordination::Exception(code, active_node_path); - if (!task.was_executed) + if (!task.was_executed) // FIXME always true { try { @@ -513,6 +456,9 @@ void DDLWorker::processTask(DDLTaskBase & task) } /// FIXME: if server fails right here, the task will be executed twice. We need WAL here. + /// Another possible issue: if ZooKeeper session is lost here, we will recover connection and execute the task second time. + + /// Delete active flag and create finish flag Coordination::Requests ops; @@ -787,7 +733,9 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry) String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential); - /// Optional step + /// We cannot create status dirs in a single transaction with previous request, + /// because we don't know node_path until previous request is executed. + /// Se we try to create status dirs here or later when we will execute entry. try { createStatusDirs(node_path, zookeeper); @@ -801,70 +749,80 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry) } -void DDLWorker::runMainThread() +void DDLWorker::initializeMainThread() { - setThreadName("DDLWorker"); - LOG_DEBUG(log, "Started DDLWorker thread"); - do { try { auto zookeeper = getAndSetZooKeeper(); zookeeper->createAncestors(queue_dir + "/"); - initialize(); initialized = true; } catch (const Coordination::Exception & e) { if (!Coordination::isHardwareError(e.code)) - throw; /// A logical error. + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected ZooKeeper error: {}", e.message()); tryLogCurrentException(__PRETTY_FUNCTION__); /// Avoid busy loop when ZooKeeper is not available. - sleepForSeconds(1); + sleepForSeconds(5); } catch (...) { - tryLogCurrentException(log, "Terminating. Cannot initialize DDL queue."); - return; + tryLogCurrentException(log, "Cannot initialize main thread of DDLWorker, will try again"); + sleepForSeconds(5); } } while (!initialized && !stop_flag); +} + +void DDLWorker::runMainThread() +{ + setThreadName("DDLWorker"); + attachToThreadGroup(); + LOG_DEBUG(log, "Starting DDLWorker thread"); while (!stop_flag) { try { - attachToThreadGroup(); + /// Reinitialize DDLWorker state (including ZooKeeper connection) if required + if (!initialized) + { + initializeMainThread(); + LOG_DEBUG(log, "Initialized DDLWorker thread"); + } cleanup_event->set(); scheduleTasks(); - LOG_DEBUG(log, "Waiting a watch"); + LOG_DEBUG(log, "Waiting for queue updates"); queue_updated_event->wait(); } catch (const Coordination::Exception & e) { if (Coordination::isHardwareError(e.code)) { - recoverZooKeeper(); + initialized = false; } else if (e.code == Coordination::Error::ZNONODE) { + // TODO add comment: when it happens and why it's expected? + // maybe because cleanup thread may remove nodes inside queue entry which are currently processed LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true)); } else { - LOG_ERROR(log, "Unexpected ZooKeeper error: {}. Terminating.", getCurrentExceptionMessage(true)); - return; + LOG_ERROR(log, "Unexpected ZooKeeper error: {}.", getCurrentExceptionMessage(true)); + assert(false); } } catch (...) { - tryLogCurrentException(log, "Unexpected error, will terminate:"); - return; + tryLogCurrentException(log, "Unexpected error, will try to restart main thread:"); + initialized = false; } } } @@ -891,6 +849,7 @@ void DDLWorker::runCleanupThread() continue; } + /// ZooKeeper connection is recovered by main thread. We will wait for it on cleanup_event. auto zookeeper = tryGetZooKeeper(); if (zookeeper->expired()) continue; diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index f41ca0fce8f..78921fa60e3 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -62,17 +62,16 @@ protected: ZooKeeperPtr tryGetZooKeeper() const; /// If necessary, creates a new session and caches it. ZooKeeperPtr getAndSetZooKeeper(); - /// ZooKeeper recover loop (while not stopped). - void recoverZooKeeper(); - void checkCurrentTasks(); + /// Iterates through queue tasks in ZooKeeper, runs execution of new tasks void scheduleTasks(); + DDLTaskBase & saveTask(DDLTaskPtr && task); + /// Reads entry and check that the host belongs to host list of the task /// Returns non-empty DDLTaskPtr if entry parsed and the check is passed virtual DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper); - void enqueueTask(DDLTaskPtr task); void processTask(DDLTaskBase & task); /// Check that query should be executed on leader replica only @@ -98,7 +97,7 @@ protected: /// Init task node static void createStatusDirs(const std::string & node_path, const ZooKeeperPtr & zookeeper); - virtual void initialize() {} + virtual void initializeMainThread(); void runMainThread(); void runCleanupThread(); @@ -117,8 +116,8 @@ protected: ZooKeeperPtr current_zookeeper; /// Save state of executed task to avoid duplicate execution on ZK error - //std::vector last_tasks; - std::optional last_entry_name; + //std::optional last_entry_name; + std::list current_tasks; std::shared_ptr queue_updated_event = std::make_shared(); std::shared_ptr cleanup_event = std::make_shared(); @@ -130,7 +129,7 @@ protected: /// Size of the pool for query execution. size_t pool_size = 1; - ThreadPool worker_pool; + std::optional worker_pool; /// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago Int64 cleanup_delay_period = 60; // minute (in seconds)