From 249a5922fe4df3a447c32e98f44f1135c0c97cce Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Thu, 5 Nov 2015 16:52:06 +0300 Subject: [PATCH 1/7] AIOContextPool: refactor, extract member functions, employ a condition_variable to minimize CPU waste [#METR-18618] --- dbms/include/DB/Common/AIO.h | 139 ++++++++++++++++++++--------------- 1 file changed, 80 insertions(+), 59 deletions(-) diff --git a/dbms/include/DB/Common/AIO.h b/dbms/include/DB/Common/AIO.h index b8496af0915..65d3b73e263 100644 --- a/dbms/include/DB/Common/AIO.h +++ b/dbms/include/DB/Common/AIO.h @@ -1,9 +1,11 @@ #pragma once +#include #include #include #include #include +#include #include #include #include @@ -64,96 +66,103 @@ class AIOContextPool : public Singleton friend class Singleton; static const auto max_concurrent_events = 128; - static const auto max_timeout_nsec = 1000; + static const auto max_timeout_sec = 1; + static const auto max_timeout_nsec = 0; AIOContext aio_context{max_concurrent_events}; std::size_t id{}; mutable std::mutex mutex; + mutable std::condition_variable have_resources; std::map> promises; - std::vector queued_requests; std::atomic cancelled{false}; - std::thread man_of_his_word{&AIOContextPool::fulfill_promises, this}; + std::thread io_completion_monitor{&AIOContextPool::monitorForCompletion, this}; ~AIOContextPool() { cancelled.store(true, std::memory_order_relaxed); - man_of_his_word.join(); + io_completion_monitor.join(); } - void fulfill_promises() + void monitorForCompletion() { /// array to hold completion events - io_event events[max_concurrent_events] {}; - const auto p_events = &events[0]; - timespec timeout{0, max_timeout_nsec}; + io_event events[max_concurrent_events]; /// continue checking for events unless cancelled while (!cancelled.load(std::memory_order_relaxed)) { try { - /// number of events signaling on - auto num_events = 0; - - /// request 1 to `max_concurrent_events` events - while ((num_events = io_getevents(aio_context.ctx, 1, max_concurrent_events, p_events, &timeout)) < 0) - if (errno != EINTR) - throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", - ErrorCodes::AIO_COMPLETION_ERROR, errno); - - /// look at returned events and - for (const auto & event : boost::make_iterator_range(p_events, p_events + num_events)) - { - /// get id from event - const auto id = event.data; - - /// find corresponding promise, set result and erase promise from map - const std::lock_guard lock{mutex}; - - const auto it = promises.find(id); - it->second.set_value(event.res); - promises.erase(it); - } - - if (queued_requests.empty()) - continue; - - const std::lock_guard lock{mutex}; - auto num_requests = 0; - - /// submit a batch of requests - while ((num_requests = io_submit(aio_context.ctx, queued_requests.size(), queued_requests.data())) < 0) - if (!(errno == EINTR || errno == EAGAIN)) - throwFromErrno("io_submit: Failed to submit batch of " + - std::to_string(queued_requests.size()) + " requests for asynchronous IO", - ErrorCodes::AIO_SUBMIT_ERROR, errno); - - if (num_requests <= 0) - continue; - - /// erase submitted requests - queued_requests.erase(std::begin(queued_requests), - std::next(std::begin(queued_requests), num_requests)); + const auto num_events = getCompletionEvents(events, max_concurrent_events); + fulfillPromises(events, num_events); + notifyProducers(num_events); } catch (...) { - /// there was an error, log it, return to any client and continue - const std::lock_guard lock{mutex}; - - const auto any_promise_it = std::begin(promises); - any_promise_it->second.set_exception(std::current_exception()); - + /// there was an error, log it, return to any producer and continue + reportExceptionToAnyProducer(); tryLogCurrentException("AIOContextPool::fulfill_promises()"); } } } + int getCompletionEvents(io_event events[], const int max_events) + { + timespec timeout{max_timeout_sec, max_timeout_nsec}; + + auto num_events = 0; + + /// request 1 to `max_concurrent_events` events + while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0) + if (errno != EINTR) + throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", + ErrorCodes::AIO_COMPLETION_ERROR, errno); + + return num_events; + } + + void fulfillPromises(const io_event events[], const int num_events) + { + const std::lock_guard lock{mutex}; + + /// look at returned events and find corresponding promise, set result and erase promise from map + for (const auto & event : boost::make_iterator_range(events, events + num_events)) + { + /// get id from event + const auto id = event.data; + + /// set value via promise and release it + const auto it = promises.find(id); + it->second.set_value(event.res); + promises.erase(it); + } + } + + void notifyProducers(const int num_producers) const + { + if (num_producers == 0) + return; + + if (num_producers > 1) + have_resources.notify_all(); + else + have_resources.notify_one(); + } + + void reportExceptionToAnyProducer() + { + const std::lock_guard lock{mutex}; + + const auto any_promise_it = std::begin(promises); + any_promise_it->second.set_exception(std::current_exception()); + } + public: std::future post(struct iocb & iocb) { - const std::lock_guard lock{mutex}; + std::unique_lock lock{mutex}; /// get current id and increment it by one const auto request_id = id++; @@ -162,10 +171,22 @@ public: promises.emplace(request_id, std::promise{}); /// store id in AIO request for further identification iocb.aio_data = request_id; - queued_requests.push_back(&iocb); + + auto num_requests = 0; + struct iocb * requests[] { &iocb }; + + /// submit a request + while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0) + { + if (errno == EAGAIN) + /// wait until at least one event has been completed (or a spurious wakeup) and try again + have_resources.wait(lock); + else if (errno != EINTR) + throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", + ErrorCodes::AIO_SUBMIT_ERROR, errno); + } return promises[request_id].get_future(); - } }; From ad27ad476df210b45ff27ebace5cf11f1c8fcffb Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Thu, 5 Nov 2015 17:11:09 +0300 Subject: [PATCH 2/7] AIOContextPool: type aliases for clarity, commentary corrections [#METR-18618] --- dbms/include/DB/Common/AIO.h | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/dbms/include/DB/Common/AIO.h b/dbms/include/DB/Common/AIO.h index 65d3b73e263..ecca6461220 100644 --- a/dbms/include/DB/Common/AIO.h +++ b/dbms/include/DB/Common/AIO.h @@ -66,15 +66,18 @@ class AIOContextPool : public Singleton friend class Singleton; static const auto max_concurrent_events = 128; - static const auto max_timeout_sec = 1; - static const auto max_timeout_nsec = 0; + static const auto timeout_sec = 1; AIOContext aio_context{max_concurrent_events}; - std::size_t id{}; + using id_t = size_t; + using bytes_read_t = ssize_t; + + /// Autoincremental id used to identify completed requests + id_t id{}; mutable std::mutex mutex; mutable std::condition_variable have_resources; - std::map> promises; + std::map> promises; std::atomic cancelled{false}; std::thread io_completion_monitor{&AIOContextPool::monitorForCompletion, this}; @@ -110,11 +113,11 @@ class AIOContextPool : public Singleton int getCompletionEvents(io_event events[], const int max_events) { - timespec timeout{max_timeout_sec, max_timeout_nsec}; + timespec timeout{timeout_sec}; auto num_events = 0; - /// request 1 to `max_concurrent_events` events + /// request 1 to `max_events` events while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0) if (errno != EINTR) throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", @@ -160,7 +163,8 @@ class AIOContextPool : public Singleton } public: - std::future post(struct iocb & iocb) + /// Request AIO read operation for iocb, returns a future with number of bytes read + std::future post(struct iocb & iocb) { std::unique_lock lock{mutex}; @@ -168,7 +172,7 @@ public: const auto request_id = id++; /// create a promise and put request in "queue" - promises.emplace(request_id, std::promise{}); + promises.emplace(request_id, std::promise{}); /// store id in AIO request for further identification iocb.aio_data = request_id; From 977954f9cee6910ef773979fcdff853a3bd2fa4c Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Thu, 5 Nov 2015 18:40:01 +0300 Subject: [PATCH 3/7] AIOContextPool: wait for all io_events in flight before destruction [#METR-18618] --- dbms/include/DB/Common/AIO.h | 50 ++++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/dbms/include/DB/Common/AIO.h b/dbms/include/DB/Common/AIO.h index ecca6461220..5512ac053cf 100644 --- a/dbms/include/DB/Common/AIO.h +++ b/dbms/include/DB/Common/AIO.h @@ -2,7 +2,9 @@ #include #include +#include #include +#include #include #include #include @@ -80,7 +82,7 @@ class AIOContextPool : public Singleton std::map> promises; std::atomic cancelled{false}; - std::thread io_completion_monitor{&AIOContextPool::monitorForCompletion, this}; + std::thread io_completion_monitor{&AIOContextPool::doMonitor, this}; ~AIOContextPool() { @@ -88,26 +90,33 @@ class AIOContextPool : public Singleton io_completion_monitor.join(); } - void monitorForCompletion() + void doMonitor() + { + /// continue checking for events unless cancelled + while (!cancelled.load(std::memory_order_relaxed)) + waitForCompletion(); + + /// wait until all requests have been completed + while (!promises.empty()) + waitForCompletion(); + } + + void waitForCompletion() { /// array to hold completion events io_event events[max_concurrent_events]; - /// continue checking for events unless cancelled - while (!cancelled.load(std::memory_order_relaxed)) + try { - try - { - const auto num_events = getCompletionEvents(events, max_concurrent_events); - fulfillPromises(events, num_events); - notifyProducers(num_events); - } - catch (...) - { - /// there was an error, log it, return to any producer and continue - reportExceptionToAnyProducer(); - tryLogCurrentException("AIOContextPool::fulfill_promises()"); - } + const auto num_events = getCompletionEvents(events, max_concurrent_events); + fulfillPromises(events, num_events); + notifyProducers(num_events); + } + catch (...) + { + /// there was an error, log it, return to any producer and continue + reportExceptionToAnyProducer(); + tryLogCurrentException("AIOContextPool::waitForCompletion()"); } } @@ -128,6 +137,9 @@ class AIOContextPool : public Singleton void fulfillPromises(const io_event events[], const int num_events) { + if (num_events == 0) + return; + const std::lock_guard lock{mutex}; /// look at returned events and find corresponding promise, set result and erase promise from map @@ -138,6 +150,12 @@ class AIOContextPool : public Singleton /// set value via promise and release it const auto it = promises.find(id); + if (it == std::end(promises)) + { + LOG_CRITICAL(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id " << id); + continue; + } + it->second.set_value(event.res); promises.erase(it); } From 11447a4e1a603f7441b977764287644aaedbf3cb Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Nov 2015 20:38:24 +0300 Subject: [PATCH 4/7] dbms: accurate checking of replication delays (incomplete) [#METR-17573]. --- .../ReplicatedMergeTreeRestartingThread.h | 3 + .../ReplicatedMergeTreeRestartingThread.cpp | 141 ++++++++++++++++++ 2 files changed, 144 insertions(+) diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h index 0ac34f34c87..1593ef0d43c 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h @@ -73,6 +73,9 @@ private: /// Запретить запись в таблицу и завершить все фоновые потоки. void goReadOnlyPermanently(); + + /// Получить информацию об отставании реплик. + void checkReplicationDelays(time_t & out_absolute_delay, time_t & out_relative_delay); }; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 183139a5864..0dde3f16247 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -85,6 +85,11 @@ void ReplicatedMergeTreeRestartingThread::run() first_time = false; } + /// Выясняем отставания реплик. + time_t absolute_delay = 0; + time_t relative_delay = 0; + checkReplicationDelays(absolute_delay, relative_delay); + wakeup_event.tryWait(check_delay_ms); } } @@ -329,4 +334,140 @@ void ReplicatedMergeTreeRestartingThread::goReadOnlyPermanently() } +static time_t extractTimeOfLogEntryIfGetPart(zkutil::ZooKeeperPtr & zookeeper, const String & name, const String & path) +{ + String content; + zkutil::Stat stat; + if (!zookeeper->tryGet(path + "/" + name, content, &stat)) + return 0; /// Узел уже успел удалиться. + + ReplicatedMergeTreeLogEntry entry; + entry.parse(content, stat); + + if (entry.type == ReplicatedMergeTreeLogEntry::GET_PART) + return entry.create_time; + + return 0; +} + +/// В массиве имён узлов - элементов очереди/лога находит первый, имеющий тип GET_PART и возвращает его время; либо ноль, если не нашёл. +static time_t findFirstGetPartEntry(zkutil::ZooKeeperPtr & zookeeper, const Strings & nodes, const String & path) +{ + for (const auto & name : nodes) + { + time_t res = extractTimeOfLogEntryIfGetPart(zookeeper, name, path); + if (res) + return res; + } + + return 0; +} + + +void ReplicatedMergeTreeRestartingThread::checkReplicationDelays(time_t & out_absolute_delay, time_t & out_relative_delay) +{ + /** Нужно получить следующую информацию: + * 1. Время последней записи типа GET в логе. + * 2. Время первой записи типа GET в очереди каждой активной реплики + * (или в логе, после log_pointer реплики - то есть, среди записей, ещё не попавших в очередь реплики). + * + * Разница между этими величинами называется (абсолютным) отставанием реплик. + * Кроме абсолютного отставания также будем рассматривать относительное - от реплики с минимальным отставанием. + * + * Если относительное отставание текущей реплики больше некоторого порога, + * и текущая реплика является лидером, то текущая реплика должна уступить лидерство. + * + * Также в случае превышения абсолютного либо относительного отставания некоторого порога, необходимо: + * - не отвечать Ok на некоторую ручку проверки реплик для балансировщика; + * - не принимать соединения для обработки запросов. + * Это делается в других местах. + */ + + out_absolute_delay = 0; + out_relative_delay = 0; + + auto zookeeper = storage.getZooKeeper(); + + /// Последняя запись GET в логе. + String log_path = storage.zookeeper_path + "/log"; + Strings log_entries_desc = zookeeper->getChildren(log_path); + std::sort(log_entries_desc.begin(), log_entries_desc.end(), std::greater()); + time_t last_entry_to_get_part = findFirstGetPartEntry(zookeeper, log_entries_desc, log_path); + + /** Возможно, что в логе нет записей типа GET. Тогда считаем, что никто не отстаёт. + * В очередях у реплик могут быть не выполненные старые записи типа GET, + * которые туда добавлены не из лога, а для восстановления битых кусков. + * Не будем считать это отставанием. + */ + + if (!last_entry_to_get_part) + return; + + /// Для каждой активной реплики время первой невыполненной записи типа GET, либо ноль, если таких записей нет. + std::map replicas_first_entry_to_get_part; + + Strings active_replicas = zookeeper->getChildren(storage.zookeeper_path + "/leader_election"); + for (const auto & node : active_replicas) + { + String replica; + if (!zookeeper->tryGet(storage.zookeeper_path + "/leader_election/" + node, replica)) + continue; /// Реплика только что перестала быть активной. + + String queue_path = storage.zookeeper_path + "/replicas/" + replica + "/queue"; + Strings queue_entries = zookeeper->getChildren(queue_path); + std::sort(queue_entries.begin(), queue_entries.end()); + time_t & first_time = replicas_first_entry_to_get_part[replica]; + first_time = findFirstGetPartEntry(zookeeper, queue_entries, queue_path); + + if (!first_time) + { + /// Ищем среди записей лога после log_pointer для реплики. + String log_pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer"); + String log_min_entry = "log-" + storage.padIndex(parse(log_pointer)); + + for (const auto & name : log_entries_desc) + { + if (name < log_min_entry) + break; + + first_time = extractTimeOfLogEntryIfGetPart(zookeeper, name, log_path); + if (first_time) + break; + } + } + } + + if (active_replicas.empty()) + { + /// Нет активных реплик. Очень необычная ситуация - как же тогда у нас была сессия с ZK, чтобы это выяснить? + /// Предполагаем, что всё же может быть потенциальный race condition при установке эфемерной ноды для leader election, а значит, это нормально. + LOG_ERROR(log, "No active replicas when checking replication delays: very strange."); + return; + } + + time_t first_entry_of_most_recent_replica = -1; + for (const auto & replica_time : replicas_first_entry_to_get_part) + { + if (0 == replica_time.second) + { + /// Есть реплика, которая совсем не отстаёт. + first_entry_of_most_recent_replica = 0; + break; + } + + if (replica_time.second > first_entry_of_most_recent_replica) + first_entry_of_most_recent_replica = replica_time.second; + } + + time_t our_first_entry_to_get_part = replicas_first_entry_to_get_part[storage.replica_name]; + if (0 == our_first_entry_to_get_part) + return; /// Если мы совсем не отстаём. + + out_absolute_delay = last_entry_to_get_part - our_first_entry_to_get_part; + out_relative_delay = first_entry_of_most_recent_replica - our_first_entry_to_get_part; + + LOG_TRACE(log, "Absolute delay: " << out_absolute_delay << ". Relative delay: " << out_relative_delay << "."); +} + + } From e29df2814cb5eac8ef18746271c7af7819a33a35 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Nov 2015 22:44:19 +0300 Subject: [PATCH 5/7] dbms: checking replication delays [#METR-17573]. --- .../DB/Storages/MergeTree/MergeTreeSettings.h | 18 ++++++ .../ReplicatedMergeTreeRestartingThread.h | 12 ++++ .../ReplicatedMergeTreeRestartingThread.cpp | 59 +++++++++++++++---- 3 files changed, 77 insertions(+), 12 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h index e3e14aa214d..8705c50f64c 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h @@ -77,6 +77,20 @@ struct MergeTreeSettings /// Если отношение количества ошибок к общему количеству кусков меньше указанного значения, то всё-равно можно запускаться. double replicated_max_ratio_of_wrong_parts = 0.05; + /** Настройки проверки отставания реплик. */ + + /// Периодичность для проверки отставания и сравнения его с другими репликами. + size_t check_delay_period = 60; + + /// Минимальное отставание от других реплик, при котором нужно уступить лидерство. Здесь и далее, если 0 - не ограничено. + size_t min_relative_delay_to_yield_leadership = 120; + + /// Минимальное отставание от других реплик, при котором нужно закрыться от запросов и не выдавать Ok для проверки статуса. + size_t min_relative_delay_to_close = 300; + + /// Минимальное абсолютное отставание, при котором нужно закрыться от запросов и не выдавать Ok для проверки статуса. + size_t min_absolute_delay_to_close = 0; + void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config) { @@ -106,6 +120,10 @@ struct MergeTreeSettings SET_SIZE_T(replicated_max_missing_obsolete_parts); SET_SIZE_T(replicated_max_missing_active_parts); SET_DOUBLE(replicated_max_ratio_of_wrong_parts); + SET_SIZE_T(check_delay_period); + SET_SIZE_T(min_relative_delay_to_yield_leadership); + SET_SIZE_T(min_relative_delay_to_close); + SET_SIZE_T(min_absolute_delay_to_close); #undef SET_SIZE_T #undef SET_DOUBLE diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h index 1593ef0d43c..b9a454b3238 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB @@ -44,6 +45,12 @@ public: wakeup(); } + void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) const + { + out_absolute_delay = absolute_delay.load(std::memory_order_relaxed); + out_relative_delay = relative_delay.load(std::memory_order_relaxed); + } + private: StorageReplicatedMergeTree & storage; Logger * log; @@ -55,6 +62,11 @@ private: std::thread thread; + /// Отставание реплики. + std::atomic absolute_delay {}; + std::atomic relative_delay {}; + + void run(); /// Запустить или остановить фоновые потоки. Используется для частичной переинициализации при пересоздании сессии в ZooKeeper. diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 0dde3f16247..b91cff233da 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -31,19 +31,27 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage void ReplicatedMergeTreeRestartingThread::run() { - constexpr auto retry_delay_ms = 10 * 1000; - constexpr auto check_delay_ms = 60 * 1000; + constexpr auto retry_period_ms = 10 * 1000; + + /// Периодичность проверки истечения сессии в ZK. + time_t check_period_ms = 60 * 1000; + + /// Периодичность проверки величины отставания реплики. + if (check_period_ms > static_cast(storage.data.settings.check_delay_period) * 1000) + check_period_ms = storage.data.settings.check_delay_period * 1000; setThreadName("ReplMTRestart"); try { - bool first_time = true; + bool first_time = true; /// Активация реплики в первый раз. + bool need_restart = false; /// Перезапуск по собственной инициативе, чтобы отдать лидерство. + time_t prev_time_of_check_delay = 0; /// Запуск реплики при старте сервера/создании таблицы. Перезапуск реплики при истечении сессии с ZK. while (!need_stop) { - if (first_time || storage.getZooKeeper()->expired()) + if (first_time || need_restart || storage.getZooKeeper()->expired()) { if (first_time) { @@ -51,7 +59,10 @@ void ReplicatedMergeTreeRestartingThread::run() } else { - LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); + if (need_restart) + LOG_WARNING(log, "Will reactivate replica."); + else + LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); storage.is_readonly = true; partialShutdown(); @@ -68,13 +79,13 @@ void ReplicatedMergeTreeRestartingThread::run() /// Исключение при попытке zookeeper_init обычно бывает, если не работает DNS. Будем пытаться сделать это заново. tryLogCurrentException(__PRETTY_FUNCTION__); - wakeup_event.tryWait(retry_delay_ms); + wakeup_event.tryWait(retry_period_ms); continue; } if (!need_stop && !tryStartup()) { - wakeup_event.tryWait(retry_delay_ms); + wakeup_event.tryWait(retry_period_ms); continue; } @@ -83,14 +94,35 @@ void ReplicatedMergeTreeRestartingThread::run() storage.is_readonly = false; first_time = false; + need_restart = false; } - /// Выясняем отставания реплик. - time_t absolute_delay = 0; - time_t relative_delay = 0; - checkReplicationDelays(absolute_delay, relative_delay); + time_t current_time = time(0); + if (current_time >= prev_time_of_check_delay + static_cast(storage.data.settings.check_delay_period)) + { + /// Выясняем отставания реплик. + time_t new_absolute_delay = 0; + time_t new_relative_delay = 0; - wakeup_event.tryWait(check_delay_ms); + checkReplicationDelays(new_absolute_delay, new_relative_delay); + + absolute_delay.store(new_absolute_delay, std::memory_order_relaxed); + relative_delay.store(new_relative_delay, std::memory_order_relaxed); + + prev_time_of_check_delay = current_time; + + /// Уступаем лидерство, если относительное отставание больше порога. + if (storage.is_leader_node && new_relative_delay > static_cast(storage.data.settings.min_relative_delay_to_yield_leadership)) + { + LOG_INFO(log, "Relative replica delay (" << new_relative_delay << " seconds) is bigger than threshold (" + << storage.data.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership."); + + need_restart = true; + continue; + } + } + + wakeup_event.tryWait(check_period_ms); } } catch (...) @@ -381,6 +413,9 @@ void ReplicatedMergeTreeRestartingThread::checkReplicationDelays(time_t & out_ab * - не отвечать Ok на некоторую ручку проверки реплик для балансировщика; * - не принимать соединения для обработки запросов. * Это делается в других местах. + * + * NOTE Реализация громоздкая, так как нужные значения вынимаются путём обхода списка узлов. + * Могут быть проблемы в случае разрастания лога до большого размера. */ out_absolute_delay = 0; From 53dd07b8caab0a744a0e92386c8f82dd919fa81e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Nov 2015 23:08:18 +0300 Subject: [PATCH 6/7] dbms: checking replication delays: development [#METR-17573]. --- .../DB/Storages/StorageReplicatedMergeTree.h | 2 + dbms/src/Server/Server.cpp | 86 +++++++++++++------ .../Storages/StorageReplicatedMergeTree.cpp | 9 ++ 3 files changed, 73 insertions(+), 24 deletions(-) diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 02e70818f58..1adfa256469 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -180,6 +180,8 @@ public: using LogEntriesData = std::vector; void getQueue(LogEntriesData & res, String & replica_name); + void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) const; + private: void dropUnreplicatedPartition(const Field & partition, bool detach, const Settings & settings); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index b0fe342442c..103507d319a 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -160,41 +160,72 @@ private: void run(); }; -/// Отвечает "Ok.\n", если получен любой GET запрос. Используется для проверки живости. + +/// Отвечает "Ok.\n". Используется для проверки живости. class PingRequestHandler : public Poco::Net::HTTPRequestHandler { public: - PingRequestHandler() + void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) { - LOG_TRACE((&Logger::get("PingRequestHandler")), "Ping request."); + try + { + const char * data = "Ok.\n"; + response.sendBuffer(data, strlen(data)); + } + catch (...) + { + tryLogCurrentException("PingRequestHandler"); + } + } +}; + + +/// Отвечает "Ok.\n", если все реплики на этом сервере не слишком сильно отстают. +class ReplicasStatusHandler : public Poco::Net::HTTPRequestHandler +{ +public: + ReplicasStatusHandler() + { + // TODO } void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) { try { - if (request.getURI() == "/" || request.getURI() == "/ping") - { - const char * data = "Ok.\n"; - response.sendBuffer(data, strlen(data)); - } - else - { - response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND); - response.send() << "There is no handle " << request.getURI() << "\n\n" - << "Use / or /ping for health checks.\n" - << "Send queries from your program with POST method or GET /?query=...\n\n" - << "Use clickhouse-client:\n\n" - << "For interactive data analysis:\n" - << " clickhouse-client\n\n" - << "For batch query processing:\n" - << " clickhouse-client --query='SELECT 1' > result\n" - << " clickhouse-client < query > result\n"; - } + // TODO } catch (...) { - tryLogCurrentException("PingRequestHandler"); + tryLogCurrentException("ReplicasStatusHandler"); + } + } +}; + + +/// Отвечает 404 с подробным объяснением. +class NotFoundHandler : public Poco::Net::HTTPRequestHandler +{ +public: + void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) + { + try + { + response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND); + response.send() << "There is no handle " << request.getURI() << "\n\n" + << "Use / or /ping for health checks.\n" + << "Or /replicas_status for more sophisticated health checks.\n\n" + << "Send queries from your program with POST method or GET /?query=...\n\n" + << "Use clickhouse-client:\n\n" + << "For interactive data analysis:\n" + << " clickhouse-client\n\n" + << "For batch query processing:\n" + << " clickhouse-client --query='SELECT 1' > result\n" + << " clickhouse-client < query > result\n"; + } + catch (...) + { + tryLogCurrentException("NotFoundHandler"); } } }; @@ -219,7 +250,9 @@ public: << ", Address: " << request.clientAddress().toString() << ", User-Agent: " << (request.has("User-Agent") ? request.get("User-Agent") : "none")); - if (request.getURI().find('?') != std::string::npos + const auto & uri = request.getURI(); + + if (uri.find('?') != std::string::npos || request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) { return new HandlerType(server); @@ -227,7 +260,12 @@ public: else if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD) { - return new PingRequestHandler(); + if (uri == "/" || uri == "/ping") + return new PingRequestHandler; + else if (uri == "/replicas_status") + return new ReplicasStatusHandler; + else + return new NotFoundHandler; } else return nullptr; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 14a6a53620a..0591603c6f1 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3330,6 +3330,15 @@ void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica } +void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) const +{ + if (!restarting_thread) + throw Exception("Table was shutted down or is in readonly mode.", ErrorCodes::TABLE_IS_READ_ONLY); + + restarting_thread->getReplicaDelays(out_absolute_delay, out_relative_delay); +} + + void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings) { auto zookeeper = getZooKeeper(); From 0572f5aed70dd4db1093eb82058fcfc4d079c398 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Nov 2015 23:24:27 +0300 Subject: [PATCH 7/7] dbms: added /replicas_status handle [#METR-17573]. --- dbms/src/Server/Server.cpp | 57 ++++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 103507d319a..4a12bc5ab8b 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -180,23 +181,69 @@ public: }; -/// Отвечает "Ok.\n", если все реплики на этом сервере не слишком сильно отстают. +/// Отвечает "Ok.\n", если все реплики на этом сервере не слишком сильно отстают. Иначе выводит информацию об отставании. TODO Вынести в отдельный файл. class ReplicasStatusHandler : public Poco::Net::HTTPRequestHandler { +private: + Context & context; + public: - ReplicasStatusHandler() + ReplicasStatusHandler(Context & context_) + : context(context_) { - // TODO } void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) { try { - // TODO + /// Собираем набор реплицируемых таблиц. + Databases replicated_tables; + { + Poco::ScopedLock lock(context.getMutex()); + + for (const auto & db : context.getDatabases()) + for (const auto & table : db.second) + if (typeid_cast(table.second.get())) + replicated_tables[db.first][table.first] = table.second; + } + + const MergeTreeSettings & settings = context.getMergeTreeSettings(); + + bool ok = true; + std::stringstream message; + + for (const auto & db : replicated_tables) + { + for (const auto & table : db.second) + { + time_t absolute_delay = 0; + time_t relative_delay = 0; + + static_cast(*table.second).getReplicaDelays(absolute_delay, relative_delay); + + if ((settings.min_absolute_delay_to_close && absolute_delay >= static_cast(settings.min_absolute_delay_to_close)) + || (settings.min_relative_delay_to_close && relative_delay >= static_cast(settings.min_relative_delay_to_close))) + ok = false; + + message << backQuoteIfNeed(db.first) << "." << backQuoteIfNeed(table.first) + << "\tAbsolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".\n"; + } + } + + if (ok) + { + const char * data = "Ok.\n"; + response.sendBuffer(data, strlen(data)); + } + else + { + response.send() << message.rdbuf(); + } } catch (...) { + /// TODO Отправлять клиенту. tryLogCurrentException("ReplicasStatusHandler"); } } @@ -263,7 +310,7 @@ public: if (uri == "/" || uri == "/ping") return new PingRequestHandler; else if (uri == "/replicas_status") - return new ReplicasStatusHandler; + return new ReplicasStatusHandler(*server.global_context); else return new NotFoundHandler; }