dbms: replication delays: development [#METR-17573].

This commit is contained in:
Alexey Milovidov 2015-11-09 23:30:54 +03:00
parent 4b29a48124
commit e78ed9f802
4 changed files with 22 additions and 105 deletions

View File

@ -159,14 +159,6 @@ public:
*/
if (quorum)
{
static std::once_flag once_flag;
std::call_once(once_flag, [&]
{
zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum", "");
zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum/last_part", "");
zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum/failed_parts", "");
});
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.part_name = part_name;
quorum_entry.required_number_of_replicas = quorum;

View File

@ -344,6 +344,10 @@ private:
*/
void createReplica();
/** Создать узлы в ZK, которые должны быть всегда, но которые могли не существовать при работе старых версий сервера.
*/
void createNewZooKeeperNodes();
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
* Если нет - бросить исключение.
*/

View File

@ -104,6 +104,7 @@ void ReplicatedMergeTreeRestartingThread::run()
time_t new_absolute_delay = 0;
time_t new_relative_delay = 0;
/// TODO Ловить здесь исключение.
checkReplicationDelays(new_absolute_delay, new_relative_delay);
absolute_delay.store(new_absolute_delay, std::memory_order_relaxed);
@ -397,108 +398,12 @@ static time_t findFirstGetPartEntry(zkutil::ZooKeeperPtr & zookeeper, const Stri
void ReplicatedMergeTreeRestartingThread::checkReplicationDelays(time_t & out_absolute_delay, time_t & out_relative_delay)
{
/** Нужно получить следующую информацию:
* 1. Время последней записи типа GET в логе.
* 2. Время первой записи типа GET в очереди каждой активной реплики
* (или в логе, после log_pointer реплики - то есть, среди записей, ещё не попавших в очередь реплики).
*
* Разница между этими величинами называется (абсолютным) отставанием реплик.
* Кроме абсолютного отставания также будем рассматривать относительное - от реплики с минимальным отставанием.
*
* Если относительное отставание текущей реплики больше некоторого порога,
* и текущая реплика является лидером, то текущая реплика должна уступить лидерство.
*
* Также в случае превышения абсолютного либо относительного отставания некоторого порога, необходимо:
* - не отвечать Ok на некоторую ручку проверки реплик для балансировщика;
* - не принимать соединения для обработки запросов.
* Это делается в других местах.
*
* NOTE Реализация громоздкая, так как нужные значения вынимаются путём обхода списка узлов.
* Могут быть проблемы в случае разрастания лога до большого размера.
*/
out_absolute_delay = 0;
out_relative_delay = 0;
auto zookeeper = storage.getZooKeeper();
/// Последняя запись GET в логе.
String log_path = storage.zookeeper_path + "/log";
Strings log_entries_desc = zookeeper->getChildren(log_path);
std::sort(log_entries_desc.begin(), log_entries_desc.end(), std::greater<String>());
time_t last_entry_to_get_part = findFirstGetPartEntry(zookeeper, log_entries_desc, log_path);
/** Возможно, что в логе нет записей типа GET. Тогда считаем, что никто не отстаёт.
* В очередях у реплик могут быть не выполненные старые записи типа GET,
* которые туда добавлены не из лога, а для восстановления битых кусков.
* Не будем считать это отставанием.
*/
if (!last_entry_to_get_part)
return;
/// Для каждой активной реплики время первой невыполненной записи типа GET, либо ноль, если таких записей нет.
std::map<String, time_t> replicas_first_entry_to_get_part;
Strings active_replicas = zookeeper->getChildren(storage.zookeeper_path + "/leader_election");
for (const auto & node : active_replicas)
{
String replica;
if (!zookeeper->tryGet(storage.zookeeper_path + "/leader_election/" + node, replica))
continue; /// Реплика только что перестала быть активной.
String queue_path = storage.zookeeper_path + "/replicas/" + replica + "/queue";
Strings queue_entries = zookeeper->getChildren(queue_path);
std::sort(queue_entries.begin(), queue_entries.end());
time_t & first_time = replicas_first_entry_to_get_part[replica];
first_time = findFirstGetPartEntry(zookeeper, queue_entries, queue_path);
if (!first_time)
{
/// Ищем среди записей лога после log_pointer для реплики.
String log_pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
String log_min_entry = "log-" + storage.padIndex(parse<UInt64>(log_pointer));
for (const auto & name : log_entries_desc)
{
if (name < log_min_entry)
break;
first_time = extractTimeOfLogEntryIfGetPart(zookeeper, name, log_path);
if (first_time)
break;
}
}
}
if (active_replicas.empty())
{
/// Нет активных реплик. Очень необычная ситуация - как же тогда у нас была сессия с ZK, чтобы это выяснить?
/// Предполагаем, что всё же может быть потенциальный race condition при установке эфемерной ноды для leader election, а значит, это нормально.
LOG_ERROR(log, "No active replicas when checking replication delays: very strange.");
return;
}
time_t first_entry_of_most_recent_replica = -1;
for (const auto & replica_time : replicas_first_entry_to_get_part)
{
if (0 == replica_time.second)
{
/// Есть реплика, которая совсем не отстаёт.
first_entry_of_most_recent_replica = 0;
break;
}
if (replica_time.second > first_entry_of_most_recent_replica)
first_entry_of_most_recent_replica = replica_time.second;
}
time_t our_first_entry_to_get_part = replicas_first_entry_to_get_part[storage.replica_name];
if (0 == our_first_entry_to_get_part)
return; /// Если мы совсем не отстаём.
out_absolute_delay = last_entry_to_get_part - our_first_entry_to_get_part;
out_relative_delay = first_entry_of_most_recent_replica - our_first_entry_to_get_part;
// TODO
LOG_TRACE(log, "Absolute delay: " << out_absolute_delay << ". Relative delay: " << out_relative_delay << ".");
}

View File

@ -162,6 +162,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
checkParts(skip_sanity_checks);
}
createNewZooKeeperNodes();
initVirtualParts();
String unreplicated_path = full_path + "unreplicated/";
@ -194,6 +196,20 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
void StorageReplicatedMergeTree::createNewZooKeeperNodes()
{
auto zookeeper = getZooKeeper();
/// Работа с кворумом.
zookeeper->createIfNotExists(zookeeper_path + "/quorum", "");
zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", "");
zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", "");
/// Отслеживание отставания реплик.
zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", "");
}
StoragePtr StorageReplicatedMergeTree::create(
const String & zookeeper_path_,
const String & replica_name_,