From 614e0d9ba09f1020b679896c450eaa41cbd0ba72 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 5 Mar 2021 02:17:07 +0300 Subject: [PATCH] just another fix for ddl worker --- src/Interpreters/DDLTask.cpp | 6 +- src/Interpreters/DDLWorker.cpp | 150 +++++++++++++++++++++++++++------ src/Interpreters/DDLWorker.h | 1 + 3 files changed, 130 insertions(+), 27 deletions(-) diff --git a/src/Interpreters/DDLTask.cpp b/src/Interpreters/DDLTask.cpp index 4be465d3de4..0c4c8a1bc34 100644 --- a/src/Interpreters/DDLTask.cpp +++ b/src/Interpreters/DDLTask.cpp @@ -320,6 +320,8 @@ std::unique_ptr DatabaseReplicatedTask::makeQueryContext(Context & from String DDLTaskBase::getLogEntryName(UInt32 log_entry_number) { + /// Sequential counter in ZooKeeper is Int32. + assert(log_entry_number < std::numeric_limits::max()); constexpr size_t seq_node_digits = 10; String number = toString(log_entry_number); String name = "query-" + String(seq_node_digits - number.size(), '0') + number; @@ -330,7 +332,9 @@ UInt32 DDLTaskBase::getLogEntryNumber(const String & log_entry_name) { constexpr const char * name = "query-"; assert(startsWith(log_entry_name, name)); - return parse(log_entry_name.substr(strlen(name))); + UInt32 num = parse(log_entry_name.substr(strlen(name))); + assert(num < std::numeric_limits::max()); + return num; } void ZooKeeperMetadataTransaction::commit() diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 4da0d21791b..7dec7312649 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -48,6 +48,7 @@ namespace ErrorCodes extern const int MEMORY_LIMIT_EXCEEDED; } +constexpr const char * TASK_PROCESSED_OUT_REASON = "Task has been already processed"; namespace { @@ -290,7 +291,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r if (zookeeper->exists(task->getFinishedNodePath())) { - out_reason = "Task has been already processed"; + out_reason = TASK_PROCESSED_OUT_REASON; return {}; } @@ -311,51 +312,117 @@ void DDLWorker::scheduleTasks(bool reinitialized) auto zookeeper = tryGetZooKeeper(); /// Main thread of DDLWorker was restarted, probably due to lost connection with ZooKeeper. - /// We have some unfinished tasks. To avoid duplication of some queries, try to write execution status. + /// We have some unfinished tasks. + /// To avoid duplication of some queries we should try to write execution status again. + /// To avoid skipping of some entries which were not executed we should be careful when choosing begin_node to start from. + /// NOTE: It does not protect from all cases of query duplication, see also comments in processTask(...) if (reinitialized) { - for (auto & task : current_tasks) + if (current_tasks.empty()) + LOG_TRACE(log, "Don't have unfinished tasks after restarting"); + else + LOG_INFO(log, "Have {} unfinished tasks, will check them", current_tasks.size()); + assert(current_tasks.size() <= pool_size + (worker_pool != nullptr)); + auto task_it = current_tasks.begin(); + while (task_it != current_tasks.end()) { - if (task->was_executed) + auto & task = *task_it; + if (task->completely_processed) { - bool task_still_exists = zookeeper->exists(task->entry_path); + assert(task->was_executed); + /// Status must be written (but finished/ node may not exist if entry was deleted). + /// If someone is deleting entry concurrently, then /active status dir must not exist. + assert(zookeeper->exists(task->getFinishedNodePath()) || !zookeeper->exists(fs::path(task->entry_path) / "active")); + ++task_it; + } + else if (task->was_executed) + { + /// Connection was lost on attempt to write status. Will retry. bool status_written = zookeeper->exists(task->getFinishedNodePath()); - if (!status_written && task_still_exists) - { + /// You might think that the following condition is redundant, because status_written implies completely_processed. + /// But it's wrong. It's possible that (!task->completely_processed && status_written) + /// if ZooKeeper successfully received and processed our request + /// but we lost connection while waiting for the response. + /// Yeah, distributed systems is a zoo. + if (status_written) + task->completely_processed = true; + else processTask(*task, zookeeper); - } + ++task_it; + } + else + { + /// We didn't even executed a query, so let's just remove it. + /// We will try to read the task again and execute it from the beginning. + if (!first_failed_task_name || task->entry_name < *first_failed_task_name) + first_failed_task_name = task->entry_name; + task_it = current_tasks.erase(task_it); } } } Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event); filterAndSortQueueNodes(queue_nodes); - if (queue_nodes.empty()) - { - LOG_TRACE(log, "No tasks to schedule"); - return; - } - else if (max_tasks_in_queue < queue_nodes.size()) + if (max_tasks_in_queue < queue_nodes.size()) cleanup_event->set(); /// Detect queue start, using: /// - skipped tasks - /// - in memory tasks (that are currently active) + /// - in memory tasks (that are currently active or were finished recently) + /// - failed tasks (that should be processed again) auto begin_node = queue_nodes.begin(); - UInt64 last_task_id = 0; - if (!current_tasks.empty()) + if (first_failed_task_name) { - auto & last_task = current_tasks.back(); - last_task_id = DDLTaskBase::getLogEntryNumber(last_task->entry_name); - begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_task->entry_name); + /// If we had failed tasks, then we should start from the first failed task. + assert(reinitialized); + begin_node = std::lower_bound(queue_nodes.begin(), queue_nodes.end(), first_failed_task_name); } - if (last_skipped_entry_name) + else { - UInt64 last_skipped_entry_id = DDLTaskBase::getLogEntryNumber(*last_skipped_entry_name); - if (last_skipped_entry_id > last_task_id) - begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), *last_skipped_entry_name); + /// We had no failed tasks. Let's just choose the maximum entry we have previously seen. + String last_task_name; + if (!current_tasks.empty()) + last_task_name = current_tasks.back()->entry_name; + if (last_skipped_entry_name && last_task_name < *last_skipped_entry_name) + last_task_name = *last_skipped_entry_name; + begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_task_name); } + if (begin_node == queue_nodes.end()) + LOG_DEBUG(log, "No tasks to schedule"); + else + LOG_DEBUG(log, "Will schedule {} tasks starting from {}", std::distance(begin_node, queue_nodes.end()), *begin_node); + + /// Let's ensure that it's exactly the first task we should process. + /// Maybe such asserts are too paranoid and excessive, + /// but it's easy enough to break DDLWorker in a very unobvious way by making some minor change in code. + [[maybe_unused]] bool have_no_tasks_info = !first_failed_task_name && current_tasks.empty() && !last_skipped_entry_name; + assert(have_no_tasks_info || queue_nodes.end() == std::find_if(queue_nodes.begin(), queue_nodes.end(), [&](const String & entry_name) + { + /// We should return true if some invariants are violated. + String reason; + auto task = initAndCheckTask(entry_name, reason, zookeeper); + bool maybe_currently_processing = current_tasks.end() != std::find_if(current_tasks.begin(), current_tasks.end(), [&](const auto & t) + { + return t->entry_name == entry_name; + }); + /// begin_node is something like a log pointer + if (begin_node == queue_nodes.end() || entry_name < *begin_node) + { + /// Return true if entry should be scheduled. + /// There is a minor race condition: initAndCheckTask(...) may return not null + /// if someone is deleting outdated entry right now (including finished/ nodes), so we also check active/ status dir. + bool maybe_concurrently_deleting = task && !zookeeper->exists(fs::path(task->entry_path) / "active"); + return task && !maybe_concurrently_deleting && !maybe_currently_processing; + } + else + { + /// Return true if entry should not be scheduled. + bool processed = !task && reason == TASK_PROCESSED_OUT_REASON; + return processed || maybe_currently_processing; + } + })); + for (auto it = begin_node; it != queue_nodes.end() && !stop_flag; ++it) { String entry_name = *it; @@ -391,8 +458,18 @@ void DDLWorker::scheduleTasks(bool reinitialized) DDLTaskBase & DDLWorker::saveTask(DDLTaskPtr && task) { current_tasks.remove_if([](const DDLTaskPtr & t) { return t->completely_processed.load(); }); - assert(current_tasks.size() <= pool_size); + /// Tasks are scheduled and executed in main thread <==> Parallel execution is disabled + assert((worker_pool != nullptr) == (1 < pool_size)); + /// Parallel execution is disabled ==> All previous tasks are failed to start or finished, + /// so current tasks list must be empty when we are ready to process new one. + assert(worker_pool || current_tasks.empty()); + /// Parallel execution is enabled ==> Not more than pool_size tasks are currently executing. + /// Note: If current_tasks.size() == pool_size, then all worker threads are busy, + /// so we will wait on worker_pool->scheduleOrThrowOnError(...) + assert(!worker_pool || current_tasks.size() <= pool_size); current_tasks.emplace_back(std::move(task)); + if (first_failed_task_name && *first_failed_task_name == current_tasks.back()->entry_name) + first_failed_task_name.reset(); return *current_tasks.back(); } @@ -479,10 +556,15 @@ void DDLWorker::updateMaxDDLEntryID(const String & entry_name) void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper) { LOG_DEBUG(log, "Processing task {} ({})", task.entry_name, task.entry.query); + assert(!task.completely_processed); String active_node_path = task.getActiveNodePath(); String finished_node_path = task.getFinishedNodePath(); + /// Step 1: Create ephemeral node in active/ status dir. + /// It allows other hosts to understand that task is currently executing (useful for system.distributed_ddl_queue) + /// and protects from concurrent deletion or the task. + /// It will tryRemove(...) on exception auto active_node = zkutil::EphemeralNodeHolder::existing(active_node_path, *zookeeper); @@ -498,7 +580,21 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper) /// Status dirs were not created in enqueueQuery(...) or someone is removing entry if (create_active_res == Coordination::Error::ZNONODE) + { + assert(dynamic_cast(&task) == nullptr); + if (task.was_executed) + { + /// Special case: + /// Task was executed (and we are trying to write status after connection loss) ==> Status dirs were previously created. + /// (Status dirs were previously created AND active/ does not exist) ==> Task was removed. + /// We cannot write status, but it's not required anymore, because no one will try to execute it again. + /// So we consider task as completely processed. + LOG_WARNING(log, "Task {} is executed, but looks like entry {} was deleted, cannot write status", task.entry_name, task.entry_path); + task.completely_processed = true; + return; + } createStatusDirs(task.entry_path, zookeeper); + } if (create_active_res == Coordination::Error::ZNODEEXISTS) { @@ -508,7 +604,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper) String dummy; if (zookeeper->tryGet(active_node_path, dummy, nullptr, eph_node_disappeared)) { - constexpr int timeout_ms = 5000; + constexpr int timeout_ms = 30 * 1000; if (!eph_node_disappeared->tryWait(timeout_ms)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Ephemeral node {} still exists, " "probably it's owned by someone else", active_node_path); @@ -518,6 +614,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper) zookeeper->create(active_node_path, {}, zkutil::CreateMode::Ephemeral); } + /// Step 2: Execute query from the task. if (!task.was_executed) { /// If table and database engine supports it, they will execute task.ops by their own in a single transaction @@ -588,6 +685,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper) updateMaxDDLEntryID(task.entry_name); + /// Step 3: Create node in finished/ status dir and write execution status. /// FIXME: if server fails right here, the task will be executed twice. We need WAL here. /// NOTE: If ZooKeeper connection is lost here, we will try again to write query status. /// NOTE: If both table and database are replicated, task is executed in single ZK transaction. diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index 0ef7456430f..4814f232ce2 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -123,6 +123,7 @@ protected: /// Save state of executed task to avoid duplicate execution on ZK error std::optional last_skipped_entry_name; + std::optional first_failed_task_name; std::list current_tasks; std::shared_ptr queue_updated_event = std::make_shared();