diff --git a/dbms/include/DB/Common/AIO.h b/dbms/include/DB/Common/AIO.h index b8496af0915..5512ac053cf 100644 --- a/dbms/include/DB/Common/AIO.h +++ b/dbms/include/DB/Common/AIO.h @@ -1,9 +1,13 @@ #pragma once +#include #include +#include #include +#include #include #include +#include #include #include #include @@ -64,108 +68,147 @@ class AIOContextPool : public Singleton friend class Singleton; static const auto max_concurrent_events = 128; - static const auto max_timeout_nsec = 1000; + static const auto timeout_sec = 1; AIOContext aio_context{max_concurrent_events}; - std::size_t id{}; + using id_t = size_t; + using bytes_read_t = ssize_t; + + /// Autoincremental id used to identify completed requests + id_t id{}; mutable std::mutex mutex; - std::map> promises; - std::vector queued_requests; + mutable std::condition_variable have_resources; + std::map> promises; std::atomic cancelled{false}; - std::thread man_of_his_word{&AIOContextPool::fulfill_promises, this}; + std::thread io_completion_monitor{&AIOContextPool::doMonitor, this}; ~AIOContextPool() { cancelled.store(true, std::memory_order_relaxed); - man_of_his_word.join(); + io_completion_monitor.join(); } - void fulfill_promises() + void doMonitor() { - /// array to hold completion events - io_event events[max_concurrent_events] {}; - const auto p_events = &events[0]; - timespec timeout{0, max_timeout_nsec}; - /// continue checking for events unless cancelled while (!cancelled.load(std::memory_order_relaxed)) + waitForCompletion(); + + /// wait until all requests have been completed + while (!promises.empty()) + waitForCompletion(); + } + + void waitForCompletion() + { + /// array to hold completion events + io_event events[max_concurrent_events]; + + try { - try - { - /// number of events signaling on - auto num_events = 0; - - /// request 1 to `max_concurrent_events` events - while ((num_events = io_getevents(aio_context.ctx, 1, max_concurrent_events, p_events, &timeout)) < 0) - if (errno != EINTR) - throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", - ErrorCodes::AIO_COMPLETION_ERROR, errno); - - /// look at returned events and - for (const auto & event : boost::make_iterator_range(p_events, p_events + num_events)) - { - /// get id from event - const auto id = event.data; - - /// find corresponding promise, set result and erase promise from map - const std::lock_guard lock{mutex}; - - const auto it = promises.find(id); - it->second.set_value(event.res); - promises.erase(it); - } - - if (queued_requests.empty()) - continue; - - const std::lock_guard lock{mutex}; - auto num_requests = 0; - - /// submit a batch of requests - while ((num_requests = io_submit(aio_context.ctx, queued_requests.size(), queued_requests.data())) < 0) - if (!(errno == EINTR || errno == EAGAIN)) - throwFromErrno("io_submit: Failed to submit batch of " + - std::to_string(queued_requests.size()) + " requests for asynchronous IO", - ErrorCodes::AIO_SUBMIT_ERROR, errno); - - if (num_requests <= 0) - continue; - - /// erase submitted requests - queued_requests.erase(std::begin(queued_requests), - std::next(std::begin(queued_requests), num_requests)); - } - catch (...) - { - /// there was an error, log it, return to any client and continue - const std::lock_guard lock{mutex}; - - const auto any_promise_it = std::begin(promises); - any_promise_it->second.set_exception(std::current_exception()); - - tryLogCurrentException("AIOContextPool::fulfill_promises()"); - } + const auto num_events = getCompletionEvents(events, max_concurrent_events); + fulfillPromises(events, num_events); + notifyProducers(num_events); + } + catch (...) + { + /// there was an error, log it, return to any producer and continue + reportExceptionToAnyProducer(); + tryLogCurrentException("AIOContextPool::waitForCompletion()"); } } -public: - std::future post(struct iocb & iocb) + int getCompletionEvents(io_event events[], const int max_events) + { + timespec timeout{timeout_sec}; + + auto num_events = 0; + + /// request 1 to `max_events` events + while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0) + if (errno != EINTR) + throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", + ErrorCodes::AIO_COMPLETION_ERROR, errno); + + return num_events; + } + + void fulfillPromises(const io_event events[], const int num_events) + { + if (num_events == 0) + return; + + const std::lock_guard lock{mutex}; + + /// look at returned events and find corresponding promise, set result and erase promise from map + for (const auto & event : boost::make_iterator_range(events, events + num_events)) + { + /// get id from event + const auto id = event.data; + + /// set value via promise and release it + const auto it = promises.find(id); + if (it == std::end(promises)) + { + LOG_CRITICAL(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id " << id); + continue; + } + + it->second.set_value(event.res); + promises.erase(it); + } + } + + void notifyProducers(const int num_producers) const + { + if (num_producers == 0) + return; + + if (num_producers > 1) + have_resources.notify_all(); + else + have_resources.notify_one(); + } + + void reportExceptionToAnyProducer() { const std::lock_guard lock{mutex}; + const auto any_promise_it = std::begin(promises); + any_promise_it->second.set_exception(std::current_exception()); + } + +public: + /// Request AIO read operation for iocb, returns a future with number of bytes read + std::future post(struct iocb & iocb) + { + std::unique_lock lock{mutex}; + /// get current id and increment it by one const auto request_id = id++; /// create a promise and put request in "queue" - promises.emplace(request_id, std::promise{}); + promises.emplace(request_id, std::promise{}); /// store id in AIO request for further identification iocb.aio_data = request_id; - queued_requests.push_back(&iocb); + + auto num_requests = 0; + struct iocb * requests[] { &iocb }; + + /// submit a request + while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0) + { + if (errno == EAGAIN) + /// wait until at least one event has been completed (or a spurious wakeup) and try again + have_resources.wait(lock); + else if (errno != EINTR) + throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", + ErrorCodes::AIO_SUBMIT_ERROR, errno); + } return promises[request_id].get_future(); - } }; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h index e3e14aa214d..8705c50f64c 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h @@ -77,6 +77,20 @@ struct MergeTreeSettings /// Если отношение количества ошибок к общему количеству кусков меньше указанного значения, то всё-равно можно запускаться. double replicated_max_ratio_of_wrong_parts = 0.05; + /** Настройки проверки отставания реплик. */ + + /// Периодичность для проверки отставания и сравнения его с другими репликами. + size_t check_delay_period = 60; + + /// Минимальное отставание от других реплик, при котором нужно уступить лидерство. Здесь и далее, если 0 - не ограничено. + size_t min_relative_delay_to_yield_leadership = 120; + + /// Минимальное отставание от других реплик, при котором нужно закрыться от запросов и не выдавать Ok для проверки статуса. + size_t min_relative_delay_to_close = 300; + + /// Минимальное абсолютное отставание, при котором нужно закрыться от запросов и не выдавать Ok для проверки статуса. + size_t min_absolute_delay_to_close = 0; + void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config) { @@ -106,6 +120,10 @@ struct MergeTreeSettings SET_SIZE_T(replicated_max_missing_obsolete_parts); SET_SIZE_T(replicated_max_missing_active_parts); SET_DOUBLE(replicated_max_ratio_of_wrong_parts); + SET_SIZE_T(check_delay_period); + SET_SIZE_T(min_relative_delay_to_yield_leadership); + SET_SIZE_T(min_relative_delay_to_close); + SET_SIZE_T(min_absolute_delay_to_close); #undef SET_SIZE_T #undef SET_DOUBLE diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h index 0ac34f34c87..b9a454b3238 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB @@ -44,6 +45,12 @@ public: wakeup(); } + void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) const + { + out_absolute_delay = absolute_delay.load(std::memory_order_relaxed); + out_relative_delay = relative_delay.load(std::memory_order_relaxed); + } + private: StorageReplicatedMergeTree & storage; Logger * log; @@ -55,6 +62,11 @@ private: std::thread thread; + /// Отставание реплики. + std::atomic absolute_delay {}; + std::atomic relative_delay {}; + + void run(); /// Запустить или остановить фоновые потоки. Используется для частичной переинициализации при пересоздании сессии в ZooKeeper. @@ -73,6 +85,9 @@ private: /// Запретить запись в таблицу и завершить все фоновые потоки. void goReadOnlyPermanently(); + + /// Получить информацию об отставании реплик. + void checkReplicationDelays(time_t & out_absolute_delay, time_t & out_relative_delay); }; diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 02e70818f58..1adfa256469 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -180,6 +180,8 @@ public: using LogEntriesData = std::vector; void getQueue(LogEntriesData & res, String & replica_name); + void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) const; + private: void dropUnreplicatedPartition(const Field & partition, bool detach, const Settings & settings); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index b0fe342442c..4a12bc5ab8b 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -160,41 +161,118 @@ private: void run(); }; -/// Отвечает "Ok.\n", если получен любой GET запрос. Используется для проверки живости. + +/// Отвечает "Ok.\n". Используется для проверки живости. class PingRequestHandler : public Poco::Net::HTTPRequestHandler { public: - PingRequestHandler() + void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) + { + try + { + const char * data = "Ok.\n"; + response.sendBuffer(data, strlen(data)); + } + catch (...) + { + tryLogCurrentException("PingRequestHandler"); + } + } +}; + + +/// Отвечает "Ok.\n", если все реплики на этом сервере не слишком сильно отстают. Иначе выводит информацию об отставании. TODO Вынести в отдельный файл. +class ReplicasStatusHandler : public Poco::Net::HTTPRequestHandler +{ +private: + Context & context; + +public: + ReplicasStatusHandler(Context & context_) + : context(context_) { - LOG_TRACE((&Logger::get("PingRequestHandler")), "Ping request."); } void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) { try { - if (request.getURI() == "/" || request.getURI() == "/ping") + /// Собираем набор реплицируемых таблиц. + Databases replicated_tables; + { + Poco::ScopedLock lock(context.getMutex()); + + for (const auto & db : context.getDatabases()) + for (const auto & table : db.second) + if (typeid_cast(table.second.get())) + replicated_tables[db.first][table.first] = table.second; + } + + const MergeTreeSettings & settings = context.getMergeTreeSettings(); + + bool ok = true; + std::stringstream message; + + for (const auto & db : replicated_tables) + { + for (const auto & table : db.second) + { + time_t absolute_delay = 0; + time_t relative_delay = 0; + + static_cast(*table.second).getReplicaDelays(absolute_delay, relative_delay); + + if ((settings.min_absolute_delay_to_close && absolute_delay >= static_cast(settings.min_absolute_delay_to_close)) + || (settings.min_relative_delay_to_close && relative_delay >= static_cast(settings.min_relative_delay_to_close))) + ok = false; + + message << backQuoteIfNeed(db.first) << "." << backQuoteIfNeed(table.first) + << "\tAbsolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".\n"; + } + } + + if (ok) { const char * data = "Ok.\n"; response.sendBuffer(data, strlen(data)); } else { - response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND); - response.send() << "There is no handle " << request.getURI() << "\n\n" - << "Use / or /ping for health checks.\n" - << "Send queries from your program with POST method or GET /?query=...\n\n" - << "Use clickhouse-client:\n\n" - << "For interactive data analysis:\n" - << " clickhouse-client\n\n" - << "For batch query processing:\n" - << " clickhouse-client --query='SELECT 1' > result\n" - << " clickhouse-client < query > result\n"; + response.send() << message.rdbuf(); } } catch (...) { - tryLogCurrentException("PingRequestHandler"); + /// TODO Отправлять клиенту. + tryLogCurrentException("ReplicasStatusHandler"); + } + } +}; + + +/// Отвечает 404 с подробным объяснением. +class NotFoundHandler : public Poco::Net::HTTPRequestHandler +{ +public: + void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) + { + try + { + response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND); + response.send() << "There is no handle " << request.getURI() << "\n\n" + << "Use / or /ping for health checks.\n" + << "Or /replicas_status for more sophisticated health checks.\n\n" + << "Send queries from your program with POST method or GET /?query=...\n\n" + << "Use clickhouse-client:\n\n" + << "For interactive data analysis:\n" + << " clickhouse-client\n\n" + << "For batch query processing:\n" + << " clickhouse-client --query='SELECT 1' > result\n" + << " clickhouse-client < query > result\n"; + } + catch (...) + { + tryLogCurrentException("NotFoundHandler"); } } }; @@ -219,7 +297,9 @@ public: << ", Address: " << request.clientAddress().toString() << ", User-Agent: " << (request.has("User-Agent") ? request.get("User-Agent") : "none")); - if (request.getURI().find('?') != std::string::npos + const auto & uri = request.getURI(); + + if (uri.find('?') != std::string::npos || request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) { return new HandlerType(server); @@ -227,7 +307,12 @@ public: else if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD) { - return new PingRequestHandler(); + if (uri == "/" || uri == "/ping") + return new PingRequestHandler; + else if (uri == "/replicas_status") + return new ReplicasStatusHandler(*server.global_context); + else + return new NotFoundHandler; } else return nullptr; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 183139a5864..b91cff233da 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -31,19 +31,27 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage void ReplicatedMergeTreeRestartingThread::run() { - constexpr auto retry_delay_ms = 10 * 1000; - constexpr auto check_delay_ms = 60 * 1000; + constexpr auto retry_period_ms = 10 * 1000; + + /// Периодичность проверки истечения сессии в ZK. + time_t check_period_ms = 60 * 1000; + + /// Периодичность проверки величины отставания реплики. + if (check_period_ms > static_cast(storage.data.settings.check_delay_period) * 1000) + check_period_ms = storage.data.settings.check_delay_period * 1000; setThreadName("ReplMTRestart"); try { - bool first_time = true; + bool first_time = true; /// Активация реплики в первый раз. + bool need_restart = false; /// Перезапуск по собственной инициативе, чтобы отдать лидерство. + time_t prev_time_of_check_delay = 0; /// Запуск реплики при старте сервера/создании таблицы. Перезапуск реплики при истечении сессии с ZK. while (!need_stop) { - if (first_time || storage.getZooKeeper()->expired()) + if (first_time || need_restart || storage.getZooKeeper()->expired()) { if (first_time) { @@ -51,7 +59,10 @@ void ReplicatedMergeTreeRestartingThread::run() } else { - LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); + if (need_restart) + LOG_WARNING(log, "Will reactivate replica."); + else + LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); storage.is_readonly = true; partialShutdown(); @@ -68,13 +79,13 @@ void ReplicatedMergeTreeRestartingThread::run() /// Исключение при попытке zookeeper_init обычно бывает, если не работает DNS. Будем пытаться сделать это заново. tryLogCurrentException(__PRETTY_FUNCTION__); - wakeup_event.tryWait(retry_delay_ms); + wakeup_event.tryWait(retry_period_ms); continue; } if (!need_stop && !tryStartup()) { - wakeup_event.tryWait(retry_delay_ms); + wakeup_event.tryWait(retry_period_ms); continue; } @@ -83,9 +94,35 @@ void ReplicatedMergeTreeRestartingThread::run() storage.is_readonly = false; first_time = false; + need_restart = false; } - wakeup_event.tryWait(check_delay_ms); + time_t current_time = time(0); + if (current_time >= prev_time_of_check_delay + static_cast(storage.data.settings.check_delay_period)) + { + /// Выясняем отставания реплик. + time_t new_absolute_delay = 0; + time_t new_relative_delay = 0; + + checkReplicationDelays(new_absolute_delay, new_relative_delay); + + absolute_delay.store(new_absolute_delay, std::memory_order_relaxed); + relative_delay.store(new_relative_delay, std::memory_order_relaxed); + + prev_time_of_check_delay = current_time; + + /// Уступаем лидерство, если относительное отставание больше порога. + if (storage.is_leader_node && new_relative_delay > static_cast(storage.data.settings.min_relative_delay_to_yield_leadership)) + { + LOG_INFO(log, "Relative replica delay (" << new_relative_delay << " seconds) is bigger than threshold (" + << storage.data.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership."); + + need_restart = true; + continue; + } + } + + wakeup_event.tryWait(check_period_ms); } } catch (...) @@ -329,4 +366,143 @@ void ReplicatedMergeTreeRestartingThread::goReadOnlyPermanently() } +static time_t extractTimeOfLogEntryIfGetPart(zkutil::ZooKeeperPtr & zookeeper, const String & name, const String & path) +{ + String content; + zkutil::Stat stat; + if (!zookeeper->tryGet(path + "/" + name, content, &stat)) + return 0; /// Узел уже успел удалиться. + + ReplicatedMergeTreeLogEntry entry; + entry.parse(content, stat); + + if (entry.type == ReplicatedMergeTreeLogEntry::GET_PART) + return entry.create_time; + + return 0; +} + +/// В массиве имён узлов - элементов очереди/лога находит первый, имеющий тип GET_PART и возвращает его время; либо ноль, если не нашёл. +static time_t findFirstGetPartEntry(zkutil::ZooKeeperPtr & zookeeper, const Strings & nodes, const String & path) +{ + for (const auto & name : nodes) + { + time_t res = extractTimeOfLogEntryIfGetPart(zookeeper, name, path); + if (res) + return res; + } + + return 0; +} + + +void ReplicatedMergeTreeRestartingThread::checkReplicationDelays(time_t & out_absolute_delay, time_t & out_relative_delay) +{ + /** Нужно получить следующую информацию: + * 1. Время последней записи типа GET в логе. + * 2. Время первой записи типа GET в очереди каждой активной реплики + * (или в логе, после log_pointer реплики - то есть, среди записей, ещё не попавших в очередь реплики). + * + * Разница между этими величинами называется (абсолютным) отставанием реплик. + * Кроме абсолютного отставания также будем рассматривать относительное - от реплики с минимальным отставанием. + * + * Если относительное отставание текущей реплики больше некоторого порога, + * и текущая реплика является лидером, то текущая реплика должна уступить лидерство. + * + * Также в случае превышения абсолютного либо относительного отставания некоторого порога, необходимо: + * - не отвечать Ok на некоторую ручку проверки реплик для балансировщика; + * - не принимать соединения для обработки запросов. + * Это делается в других местах. + * + * NOTE Реализация громоздкая, так как нужные значения вынимаются путём обхода списка узлов. + * Могут быть проблемы в случае разрастания лога до большого размера. + */ + + out_absolute_delay = 0; + out_relative_delay = 0; + + auto zookeeper = storage.getZooKeeper(); + + /// Последняя запись GET в логе. + String log_path = storage.zookeeper_path + "/log"; + Strings log_entries_desc = zookeeper->getChildren(log_path); + std::sort(log_entries_desc.begin(), log_entries_desc.end(), std::greater()); + time_t last_entry_to_get_part = findFirstGetPartEntry(zookeeper, log_entries_desc, log_path); + + /** Возможно, что в логе нет записей типа GET. Тогда считаем, что никто не отстаёт. + * В очередях у реплик могут быть не выполненные старые записи типа GET, + * которые туда добавлены не из лога, а для восстановления битых кусков. + * Не будем считать это отставанием. + */ + + if (!last_entry_to_get_part) + return; + + /// Для каждой активной реплики время первой невыполненной записи типа GET, либо ноль, если таких записей нет. + std::map replicas_first_entry_to_get_part; + + Strings active_replicas = zookeeper->getChildren(storage.zookeeper_path + "/leader_election"); + for (const auto & node : active_replicas) + { + String replica; + if (!zookeeper->tryGet(storage.zookeeper_path + "/leader_election/" + node, replica)) + continue; /// Реплика только что перестала быть активной. + + String queue_path = storage.zookeeper_path + "/replicas/" + replica + "/queue"; + Strings queue_entries = zookeeper->getChildren(queue_path); + std::sort(queue_entries.begin(), queue_entries.end()); + time_t & first_time = replicas_first_entry_to_get_part[replica]; + first_time = findFirstGetPartEntry(zookeeper, queue_entries, queue_path); + + if (!first_time) + { + /// Ищем среди записей лога после log_pointer для реплики. + String log_pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer"); + String log_min_entry = "log-" + storage.padIndex(parse(log_pointer)); + + for (const auto & name : log_entries_desc) + { + if (name < log_min_entry) + break; + + first_time = extractTimeOfLogEntryIfGetPart(zookeeper, name, log_path); + if (first_time) + break; + } + } + } + + if (active_replicas.empty()) + { + /// Нет активных реплик. Очень необычная ситуация - как же тогда у нас была сессия с ZK, чтобы это выяснить? + /// Предполагаем, что всё же может быть потенциальный race condition при установке эфемерной ноды для leader election, а значит, это нормально. + LOG_ERROR(log, "No active replicas when checking replication delays: very strange."); + return; + } + + time_t first_entry_of_most_recent_replica = -1; + for (const auto & replica_time : replicas_first_entry_to_get_part) + { + if (0 == replica_time.second) + { + /// Есть реплика, которая совсем не отстаёт. + first_entry_of_most_recent_replica = 0; + break; + } + + if (replica_time.second > first_entry_of_most_recent_replica) + first_entry_of_most_recent_replica = replica_time.second; + } + + time_t our_first_entry_to_get_part = replicas_first_entry_to_get_part[storage.replica_name]; + if (0 == our_first_entry_to_get_part) + return; /// Если мы совсем не отстаём. + + out_absolute_delay = last_entry_to_get_part - our_first_entry_to_get_part; + out_relative_delay = first_entry_of_most_recent_replica - our_first_entry_to_get_part; + + LOG_TRACE(log, "Absolute delay: " << out_absolute_delay << ". Relative delay: " << out_relative_delay << "."); +} + + } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 14a6a53620a..0591603c6f1 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3330,6 +3330,15 @@ void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica } +void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) const +{ + if (!restarting_thread) + throw Exception("Table was shutted down or is in readonly mode.", ErrorCodes::TABLE_IS_READ_ONLY); + + restarting_thread->getReplicaDelays(out_absolute_delay, out_relative_delay); +} + + void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings) { auto zookeeper = getZooKeeper();