This commit is contained in:
Evgeniy Gatov 2015-11-06 19:53:11 +03:00
commit f83e7f7ef9
7 changed files with 445 additions and 97 deletions

View File

@ -1,9 +1,13 @@
#pragma once #pragma once
#include <DB/Core/ErrorCodes.h>
#include <DB/Common/Exception.h> #include <DB/Common/Exception.h>
#include <common/logger_useful.h>
#include <common/singleton.h> #include <common/singleton.h>
#include <Poco/Logger.h>
#include <boost/range/iterator_range.hpp> #include <boost/range/iterator_range.hpp>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <condition_variable>
#include <future> #include <future>
#include <mutex> #include <mutex>
#include <map> #include <map>
@ -64,108 +68,147 @@ class AIOContextPool : public Singleton<AIOContextPool>
friend class Singleton<AIOContextPool>; friend class Singleton<AIOContextPool>;
static const auto max_concurrent_events = 128; static const auto max_concurrent_events = 128;
static const auto max_timeout_nsec = 1000; static const auto timeout_sec = 1;
AIOContext aio_context{max_concurrent_events}; AIOContext aio_context{max_concurrent_events};
std::size_t id{}; using id_t = size_t;
using bytes_read_t = ssize_t;
/// Autoincremental id used to identify completed requests
id_t id{};
mutable std::mutex mutex; mutable std::mutex mutex;
std::map<std::size_t, std::promise<ssize_t>> promises; mutable std::condition_variable have_resources;
std::vector<iocb *> queued_requests; std::map<id_t, std::promise<bytes_read_t>> promises;
std::atomic<bool> cancelled{false}; std::atomic<bool> cancelled{false};
std::thread man_of_his_word{&AIOContextPool::fulfill_promises, this}; std::thread io_completion_monitor{&AIOContextPool::doMonitor, this};
~AIOContextPool() ~AIOContextPool()
{ {
cancelled.store(true, std::memory_order_relaxed); cancelled.store(true, std::memory_order_relaxed);
man_of_his_word.join(); io_completion_monitor.join();
} }
void fulfill_promises() void doMonitor()
{ {
/// array to hold completion events
io_event events[max_concurrent_events] {};
const auto p_events = &events[0];
timespec timeout{0, max_timeout_nsec};
/// continue checking for events unless cancelled /// continue checking for events unless cancelled
while (!cancelled.load(std::memory_order_relaxed)) while (!cancelled.load(std::memory_order_relaxed))
waitForCompletion();
/// wait until all requests have been completed
while (!promises.empty())
waitForCompletion();
}
void waitForCompletion()
{
/// array to hold completion events
io_event events[max_concurrent_events];
try
{ {
try const auto num_events = getCompletionEvents(events, max_concurrent_events);
{ fulfillPromises(events, num_events);
/// number of events signaling on notifyProducers(num_events);
auto num_events = 0; }
catch (...)
/// request 1 to `max_concurrent_events` events {
while ((num_events = io_getevents(aio_context.ctx, 1, max_concurrent_events, p_events, &timeout)) < 0) /// there was an error, log it, return to any producer and continue
if (errno != EINTR) reportExceptionToAnyProducer();
throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", tryLogCurrentException("AIOContextPool::waitForCompletion()");
ErrorCodes::AIO_COMPLETION_ERROR, errno);
/// look at returned events and
for (const auto & event : boost::make_iterator_range(p_events, p_events + num_events))
{
/// get id from event
const auto id = event.data;
/// find corresponding promise, set result and erase promise from map
const std::lock_guard<std::mutex> lock{mutex};
const auto it = promises.find(id);
it->second.set_value(event.res);
promises.erase(it);
}
if (queued_requests.empty())
continue;
const std::lock_guard<std::mutex> lock{mutex};
auto num_requests = 0;
/// submit a batch of requests
while ((num_requests = io_submit(aio_context.ctx, queued_requests.size(), queued_requests.data())) < 0)
if (!(errno == EINTR || errno == EAGAIN))
throwFromErrno("io_submit: Failed to submit batch of " +
std::to_string(queued_requests.size()) + " requests for asynchronous IO",
ErrorCodes::AIO_SUBMIT_ERROR, errno);
if (num_requests <= 0)
continue;
/// erase submitted requests
queued_requests.erase(std::begin(queued_requests),
std::next(std::begin(queued_requests), num_requests));
}
catch (...)
{
/// there was an error, log it, return to any client and continue
const std::lock_guard<std::mutex> lock{mutex};
const auto any_promise_it = std::begin(promises);
any_promise_it->second.set_exception(std::current_exception());
tryLogCurrentException("AIOContextPool::fulfill_promises()");
}
} }
} }
public: int getCompletionEvents(io_event events[], const int max_events)
std::future<ssize_t> post(struct iocb & iocb) {
timespec timeout{timeout_sec};
auto num_events = 0;
/// request 1 to `max_events` events
while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0)
if (errno != EINTR)
throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion",
ErrorCodes::AIO_COMPLETION_ERROR, errno);
return num_events;
}
void fulfillPromises(const io_event events[], const int num_events)
{
if (num_events == 0)
return;
const std::lock_guard<std::mutex> lock{mutex};
/// look at returned events and find corresponding promise, set result and erase promise from map
for (const auto & event : boost::make_iterator_range(events, events + num_events))
{
/// get id from event
const auto id = event.data;
/// set value via promise and release it
const auto it = promises.find(id);
if (it == std::end(promises))
{
LOG_CRITICAL(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id " << id);
continue;
}
it->second.set_value(event.res);
promises.erase(it);
}
}
void notifyProducers(const int num_producers) const
{
if (num_producers == 0)
return;
if (num_producers > 1)
have_resources.notify_all();
else
have_resources.notify_one();
}
void reportExceptionToAnyProducer()
{ {
const std::lock_guard<std::mutex> lock{mutex}; const std::lock_guard<std::mutex> lock{mutex};
const auto any_promise_it = std::begin(promises);
any_promise_it->second.set_exception(std::current_exception());
}
public:
/// Request AIO read operation for iocb, returns a future with number of bytes read
std::future<bytes_read_t> post(struct iocb & iocb)
{
std::unique_lock<std::mutex> lock{mutex};
/// get current id and increment it by one /// get current id and increment it by one
const auto request_id = id++; const auto request_id = id++;
/// create a promise and put request in "queue" /// create a promise and put request in "queue"
promises.emplace(request_id, std::promise<ssize_t>{}); promises.emplace(request_id, std::promise<bytes_read_t>{});
/// store id in AIO request for further identification /// store id in AIO request for further identification
iocb.aio_data = request_id; iocb.aio_data = request_id;
queued_requests.push_back(&iocb);
auto num_requests = 0;
struct iocb * requests[] { &iocb };
/// submit a request
while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0)
{
if (errno == EAGAIN)
/// wait until at least one event has been completed (or a spurious wakeup) and try again
have_resources.wait(lock);
else if (errno != EINTR)
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO",
ErrorCodes::AIO_SUBMIT_ERROR, errno);
}
return promises[request_id].get_future(); return promises[request_id].get_future();
} }
}; };

View File

@ -77,6 +77,20 @@ struct MergeTreeSettings
/// Если отношение количества ошибок к общему количеству кусков меньше указанного значения, то всё-равно можно запускаться. /// Если отношение количества ошибок к общему количеству кусков меньше указанного значения, то всё-равно можно запускаться.
double replicated_max_ratio_of_wrong_parts = 0.05; double replicated_max_ratio_of_wrong_parts = 0.05;
/** Настройки проверки отставания реплик. */
/// Периодичность для проверки отставания и сравнения его с другими репликами.
size_t check_delay_period = 60;
/// Минимальное отставание от других реплик, при котором нужно уступить лидерство. Здесь и далее, если 0 - не ограничено.
size_t min_relative_delay_to_yield_leadership = 120;
/// Минимальное отставание от других реплик, при котором нужно закрыться от запросов и не выдавать Ok для проверки статуса.
size_t min_relative_delay_to_close = 300;
/// Минимальное абсолютное отставание, при котором нужно закрыться от запросов и не выдавать Ok для проверки статуса.
size_t min_absolute_delay_to_close = 0;
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config) void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
{ {
@ -106,6 +120,10 @@ struct MergeTreeSettings
SET_SIZE_T(replicated_max_missing_obsolete_parts); SET_SIZE_T(replicated_max_missing_obsolete_parts);
SET_SIZE_T(replicated_max_missing_active_parts); SET_SIZE_T(replicated_max_missing_active_parts);
SET_DOUBLE(replicated_max_ratio_of_wrong_parts); SET_DOUBLE(replicated_max_ratio_of_wrong_parts);
SET_SIZE_T(check_delay_period);
SET_SIZE_T(min_relative_delay_to_yield_leadership);
SET_SIZE_T(min_relative_delay_to_close);
SET_SIZE_T(min_absolute_delay_to_close);
#undef SET_SIZE_T #undef SET_SIZE_T
#undef SET_DOUBLE #undef SET_DOUBLE

View File

@ -4,6 +4,7 @@
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <DB/Core/Types.h> #include <DB/Core/Types.h>
#include <thread> #include <thread>
#include <atomic>
namespace DB namespace DB
@ -44,6 +45,12 @@ public:
wakeup(); wakeup();
} }
void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) const
{
out_absolute_delay = absolute_delay.load(std::memory_order_relaxed);
out_relative_delay = relative_delay.load(std::memory_order_relaxed);
}
private: private:
StorageReplicatedMergeTree & storage; StorageReplicatedMergeTree & storage;
Logger * log; Logger * log;
@ -55,6 +62,11 @@ private:
std::thread thread; std::thread thread;
/// Отставание реплики.
std::atomic<time_t> absolute_delay {};
std::atomic<time_t> relative_delay {};
void run(); void run();
/// Запустить или остановить фоновые потоки. Используется для частичной переинициализации при пересоздании сессии в ZooKeeper. /// Запустить или остановить фоновые потоки. Используется для частичной переинициализации при пересоздании сессии в ZooKeeper.
@ -73,6 +85,9 @@ private:
/// Запретить запись в таблицу и завершить все фоновые потоки. /// Запретить запись в таблицу и завершить все фоновые потоки.
void goReadOnlyPermanently(); void goReadOnlyPermanently();
/// Получить информацию об отставании реплик.
void checkReplicationDelays(time_t & out_absolute_delay, time_t & out_relative_delay);
}; };

View File

@ -180,6 +180,8 @@ public:
using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>; using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
void getQueue(LogEntriesData & res, String & replica_name); void getQueue(LogEntriesData & res, String & replica_name);
void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) const;
private: private:
void dropUnreplicatedPartition(const Field & partition, bool detach, const Settings & settings); void dropUnreplicatedPartition(const Field & partition, bool detach, const Settings & settings);

View File

@ -38,6 +38,7 @@
#include <DB/Storages/System/StorageSystemColumns.h> #include <DB/Storages/System/StorageSystemColumns.h>
#include <DB/Storages/System/StorageSystemFunctions.h> #include <DB/Storages/System/StorageSystemFunctions.h>
#include <DB/Storages/System/StorageSystemClusters.h> #include <DB/Storages/System/StorageSystemClusters.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/IO/copyData.h> #include <DB/IO/copyData.h>
#include <DB/IO/LimitReadBuffer.h> #include <DB/IO/LimitReadBuffer.h>
@ -160,41 +161,118 @@ private:
void run(); void run();
}; };
/// Отвечает "Ok.\n", если получен любой GET запрос. Используется для проверки живости.
/// Отвечает "Ok.\n". Используется для проверки живости.
class PingRequestHandler : public Poco::Net::HTTPRequestHandler class PingRequestHandler : public Poco::Net::HTTPRequestHandler
{ {
public: public:
PingRequestHandler() void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
try
{
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
}
catch (...)
{
tryLogCurrentException("PingRequestHandler");
}
}
};
/// Отвечает "Ok.\n", если все реплики на этом сервере не слишком сильно отстают. Иначе выводит информацию об отставании. TODO Вынести в отдельный файл.
class ReplicasStatusHandler : public Poco::Net::HTTPRequestHandler
{
private:
Context & context;
public:
ReplicasStatusHandler(Context & context_)
: context(context_)
{ {
LOG_TRACE((&Logger::get("PingRequestHandler")), "Ping request.");
} }
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{ {
try try
{ {
if (request.getURI() == "/" || request.getURI() == "/ping") /// Собираем набор реплицируемых таблиц.
Databases replicated_tables;
{
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
for (const auto & db : context.getDatabases())
for (const auto & table : db.second)
if (typeid_cast<const StorageReplicatedMergeTree *>(table.second.get()))
replicated_tables[db.first][table.first] = table.second;
}
const MergeTreeSettings & settings = context.getMergeTreeSettings();
bool ok = true;
std::stringstream message;
for (const auto & db : replicated_tables)
{
for (const auto & table : db.second)
{
time_t absolute_delay = 0;
time_t relative_delay = 0;
static_cast<const StorageReplicatedMergeTree &>(*table.second).getReplicaDelays(absolute_delay, relative_delay);
if ((settings.min_absolute_delay_to_close && absolute_delay >= static_cast<time_t>(settings.min_absolute_delay_to_close))
|| (settings.min_relative_delay_to_close && relative_delay >= static_cast<time_t>(settings.min_relative_delay_to_close)))
ok = false;
message << backQuoteIfNeed(db.first) << "." << backQuoteIfNeed(table.first)
<< "\tAbsolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".\n";
}
}
if (ok)
{ {
const char * data = "Ok.\n"; const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data)); response.sendBuffer(data, strlen(data));
} }
else else
{ {
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND); response.send() << message.rdbuf();
response.send() << "There is no handle " << request.getURI() << "\n\n"
<< "Use / or /ping for health checks.\n"
<< "Send queries from your program with POST method or GET /?query=...\n\n"
<< "Use clickhouse-client:\n\n"
<< "For interactive data analysis:\n"
<< " clickhouse-client\n\n"
<< "For batch query processing:\n"
<< " clickhouse-client --query='SELECT 1' > result\n"
<< " clickhouse-client < query > result\n";
} }
} }
catch (...) catch (...)
{ {
tryLogCurrentException("PingRequestHandler"); /// TODO Отправлять клиенту.
tryLogCurrentException("ReplicasStatusHandler");
}
}
};
/// Отвечает 404 с подробным объяснением.
class NotFoundHandler : public Poco::Net::HTTPRequestHandler
{
public:
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
try
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND);
response.send() << "There is no handle " << request.getURI() << "\n\n"
<< "Use / or /ping for health checks.\n"
<< "Or /replicas_status for more sophisticated health checks.\n\n"
<< "Send queries from your program with POST method or GET /?query=...\n\n"
<< "Use clickhouse-client:\n\n"
<< "For interactive data analysis:\n"
<< " clickhouse-client\n\n"
<< "For batch query processing:\n"
<< " clickhouse-client --query='SELECT 1' > result\n"
<< " clickhouse-client < query > result\n";
}
catch (...)
{
tryLogCurrentException("NotFoundHandler");
} }
} }
}; };
@ -219,7 +297,9 @@ public:
<< ", Address: " << request.clientAddress().toString() << ", Address: " << request.clientAddress().toString()
<< ", User-Agent: " << (request.has("User-Agent") ? request.get("User-Agent") : "none")); << ", User-Agent: " << (request.has("User-Agent") ? request.get("User-Agent") : "none"));
if (request.getURI().find('?') != std::string::npos const auto & uri = request.getURI();
if (uri.find('?') != std::string::npos
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) || request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
{ {
return new HandlerType(server); return new HandlerType(server);
@ -227,7 +307,12 @@ public:
else if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET else if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD) || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD)
{ {
return new PingRequestHandler(); if (uri == "/" || uri == "/ping")
return new PingRequestHandler;
else if (uri == "/replicas_status")
return new ReplicasStatusHandler(*server.global_context);
else
return new NotFoundHandler;
} }
else else
return nullptr; return nullptr;

View File

@ -31,19 +31,27 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
void ReplicatedMergeTreeRestartingThread::run() void ReplicatedMergeTreeRestartingThread::run()
{ {
constexpr auto retry_delay_ms = 10 * 1000; constexpr auto retry_period_ms = 10 * 1000;
constexpr auto check_delay_ms = 60 * 1000;
/// Периодичность проверки истечения сессии в ZK.
time_t check_period_ms = 60 * 1000;
/// Периодичность проверки величины отставания реплики.
if (check_period_ms > static_cast<time_t>(storage.data.settings.check_delay_period) * 1000)
check_period_ms = storage.data.settings.check_delay_period * 1000;
setThreadName("ReplMTRestart"); setThreadName("ReplMTRestart");
try try
{ {
bool first_time = true; bool first_time = true; /// Активация реплики в первый раз.
bool need_restart = false; /// Перезапуск по собственной инициативе, чтобы отдать лидерство.
time_t prev_time_of_check_delay = 0;
/// Запуск реплики при старте сервера/создании таблицы. Перезапуск реплики при истечении сессии с ZK. /// Запуск реплики при старте сервера/создании таблицы. Перезапуск реплики при истечении сессии с ZK.
while (!need_stop) while (!need_stop)
{ {
if (first_time || storage.getZooKeeper()->expired()) if (first_time || need_restart || storage.getZooKeeper()->expired())
{ {
if (first_time) if (first_time)
{ {
@ -51,7 +59,10 @@ void ReplicatedMergeTreeRestartingThread::run()
} }
else else
{ {
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); if (need_restart)
LOG_WARNING(log, "Will reactivate replica.");
else
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
storage.is_readonly = true; storage.is_readonly = true;
partialShutdown(); partialShutdown();
@ -68,13 +79,13 @@ void ReplicatedMergeTreeRestartingThread::run()
/// Исключение при попытке zookeeper_init обычно бывает, если не работает DNS. Будем пытаться сделать это заново. /// Исключение при попытке zookeeper_init обычно бывает, если не работает DNS. Будем пытаться сделать это заново.
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
wakeup_event.tryWait(retry_delay_ms); wakeup_event.tryWait(retry_period_ms);
continue; continue;
} }
if (!need_stop && !tryStartup()) if (!need_stop && !tryStartup())
{ {
wakeup_event.tryWait(retry_delay_ms); wakeup_event.tryWait(retry_period_ms);
continue; continue;
} }
@ -83,9 +94,35 @@ void ReplicatedMergeTreeRestartingThread::run()
storage.is_readonly = false; storage.is_readonly = false;
first_time = false; first_time = false;
need_restart = false;
} }
wakeup_event.tryWait(check_delay_ms); time_t current_time = time(0);
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage.data.settings.check_delay_period))
{
/// Выясняем отставания реплик.
time_t new_absolute_delay = 0;
time_t new_relative_delay = 0;
checkReplicationDelays(new_absolute_delay, new_relative_delay);
absolute_delay.store(new_absolute_delay, std::memory_order_relaxed);
relative_delay.store(new_relative_delay, std::memory_order_relaxed);
prev_time_of_check_delay = current_time;
/// Уступаем лидерство, если относительное отставание больше порога.
if (storage.is_leader_node && new_relative_delay > static_cast<time_t>(storage.data.settings.min_relative_delay_to_yield_leadership))
{
LOG_INFO(log, "Relative replica delay (" << new_relative_delay << " seconds) is bigger than threshold ("
<< storage.data.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership.");
need_restart = true;
continue;
}
}
wakeup_event.tryWait(check_period_ms);
} }
} }
catch (...) catch (...)
@ -329,4 +366,143 @@ void ReplicatedMergeTreeRestartingThread::goReadOnlyPermanently()
} }
static time_t extractTimeOfLogEntryIfGetPart(zkutil::ZooKeeperPtr & zookeeper, const String & name, const String & path)
{
String content;
zkutil::Stat stat;
if (!zookeeper->tryGet(path + "/" + name, content, &stat))
return 0; /// Узел уже успел удалиться.
ReplicatedMergeTreeLogEntry entry;
entry.parse(content, stat);
if (entry.type == ReplicatedMergeTreeLogEntry::GET_PART)
return entry.create_time;
return 0;
}
/// В массиве имён узлов - элементов очереди/лога находит первый, имеющий тип GET_PART и возвращает его время; либо ноль, если не нашёл.
static time_t findFirstGetPartEntry(zkutil::ZooKeeperPtr & zookeeper, const Strings & nodes, const String & path)
{
for (const auto & name : nodes)
{
time_t res = extractTimeOfLogEntryIfGetPart(zookeeper, name, path);
if (res)
return res;
}
return 0;
}
void ReplicatedMergeTreeRestartingThread::checkReplicationDelays(time_t & out_absolute_delay, time_t & out_relative_delay)
{
/** Нужно получить следующую информацию:
* 1. Время последней записи типа GET в логе.
* 2. Время первой записи типа GET в очереди каждой активной реплики
* (или в логе, после log_pointer реплики - то есть, среди записей, ещё не попавших в очередь реплики).
*
* Разница между этими величинами называется (абсолютным) отставанием реплик.
* Кроме абсолютного отставания также будем рассматривать относительное - от реплики с минимальным отставанием.
*
* Если относительное отставание текущей реплики больше некоторого порога,
* и текущая реплика является лидером, то текущая реплика должна уступить лидерство.
*
* Также в случае превышения абсолютного либо относительного отставания некоторого порога, необходимо:
* - не отвечать Ok на некоторую ручку проверки реплик для балансировщика;
* - не принимать соединения для обработки запросов.
* Это делается в других местах.
*
* NOTE Реализация громоздкая, так как нужные значения вынимаются путём обхода списка узлов.
* Могут быть проблемы в случае разрастания лога до большого размера.
*/
out_absolute_delay = 0;
out_relative_delay = 0;
auto zookeeper = storage.getZooKeeper();
/// Последняя запись GET в логе.
String log_path = storage.zookeeper_path + "/log";
Strings log_entries_desc = zookeeper->getChildren(log_path);
std::sort(log_entries_desc.begin(), log_entries_desc.end(), std::greater<String>());
time_t last_entry_to_get_part = findFirstGetPartEntry(zookeeper, log_entries_desc, log_path);
/** Возможно, что в логе нет записей типа GET. Тогда считаем, что никто не отстаёт.
* В очередях у реплик могут быть не выполненные старые записи типа GET,
* которые туда добавлены не из лога, а для восстановления битых кусков.
* Не будем считать это отставанием.
*/
if (!last_entry_to_get_part)
return;
/// Для каждой активной реплики время первой невыполненной записи типа GET, либо ноль, если таких записей нет.
std::map<String, time_t> replicas_first_entry_to_get_part;
Strings active_replicas = zookeeper->getChildren(storage.zookeeper_path + "/leader_election");
for (const auto & node : active_replicas)
{
String replica;
if (!zookeeper->tryGet(storage.zookeeper_path + "/leader_election/" + node, replica))
continue; /// Реплика только что перестала быть активной.
String queue_path = storage.zookeeper_path + "/replicas/" + replica + "/queue";
Strings queue_entries = zookeeper->getChildren(queue_path);
std::sort(queue_entries.begin(), queue_entries.end());
time_t & first_time = replicas_first_entry_to_get_part[replica];
first_time = findFirstGetPartEntry(zookeeper, queue_entries, queue_path);
if (!first_time)
{
/// Ищем среди записей лога после log_pointer для реплики.
String log_pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
String log_min_entry = "log-" + storage.padIndex(parse<UInt64>(log_pointer));
for (const auto & name : log_entries_desc)
{
if (name < log_min_entry)
break;
first_time = extractTimeOfLogEntryIfGetPart(zookeeper, name, log_path);
if (first_time)
break;
}
}
}
if (active_replicas.empty())
{
/// Нет активных реплик. Очень необычная ситуация - как же тогда у нас была сессия с ZK, чтобы это выяснить?
/// Предполагаем, что всё же может быть потенциальный race condition при установке эфемерной ноды для leader election, а значит, это нормально.
LOG_ERROR(log, "No active replicas when checking replication delays: very strange.");
return;
}
time_t first_entry_of_most_recent_replica = -1;
for (const auto & replica_time : replicas_first_entry_to_get_part)
{
if (0 == replica_time.second)
{
/// Есть реплика, которая совсем не отстаёт.
first_entry_of_most_recent_replica = 0;
break;
}
if (replica_time.second > first_entry_of_most_recent_replica)
first_entry_of_most_recent_replica = replica_time.second;
}
time_t our_first_entry_to_get_part = replicas_first_entry_to_get_part[storage.replica_name];
if (0 == our_first_entry_to_get_part)
return; /// Если мы совсем не отстаём.
out_absolute_delay = last_entry_to_get_part - our_first_entry_to_get_part;
out_relative_delay = first_entry_of_most_recent_replica - our_first_entry_to_get_part;
LOG_TRACE(log, "Absolute delay: " << out_absolute_delay << ". Relative delay: " << out_relative_delay << ".");
}
} }

View File

@ -3330,6 +3330,15 @@ void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica
} }
void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay) const
{
if (!restarting_thread)
throw Exception("Table was shutted down or is in readonly mode.", ErrorCodes::TABLE_IS_READ_ONLY);
restarting_thread->getReplicaDelays(out_absolute_delay, out_relative_delay);
}
void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings) void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings)
{ {
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();