From e78ed9f8024a314f6712443c18fb7cb88a61dde6 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 9 Nov 2015 23:30:54 +0300 Subject: [PATCH] dbms: replication delays: development [#METR-17573]. --- .../ReplicatedMergeTreeBlockOutputStream.h | 8 -- .../DB/Storages/StorageReplicatedMergeTree.h | 4 + .../ReplicatedMergeTreeRestartingThread.cpp | 99 +------------------ .../Storages/StorageReplicatedMergeTree.cpp | 16 +++ 4 files changed, 22 insertions(+), 105 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 79afb5c0efa..6b2c5993e8d 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -159,14 +159,6 @@ public: */ if (quorum) { - static std::once_flag once_flag; - std::call_once(once_flag, [&] - { - zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum", ""); - zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum/last_part", ""); - zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum/failed_parts", ""); - }); - ReplicatedMergeTreeQuorumEntry quorum_entry; quorum_entry.part_name = part_name; quorum_entry.required_number_of_replicas = quorum; diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 1adfa256469..0c7ed307d2c 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -344,6 +344,10 @@ private: */ void createReplica(); + /** Создать узлы в ZK, которые должны быть всегда, но которые могли не существовать при работе старых версий сервера. + */ + void createNewZooKeeperNodes(); + /** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata). * Если нет - бросить исключение. */ diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index e2f37d50597..2411c2cdc6d 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -104,6 +104,7 @@ void ReplicatedMergeTreeRestartingThread::run() time_t new_absolute_delay = 0; time_t new_relative_delay = 0; + /// TODO Ловить здесь исключение. checkReplicationDelays(new_absolute_delay, new_relative_delay); absolute_delay.store(new_absolute_delay, std::memory_order_relaxed); @@ -397,108 +398,12 @@ static time_t findFirstGetPartEntry(zkutil::ZooKeeperPtr & zookeeper, const Stri void ReplicatedMergeTreeRestartingThread::checkReplicationDelays(time_t & out_absolute_delay, time_t & out_relative_delay) { - /** Нужно получить следующую информацию: - * 1. Время последней записи типа GET в логе. - * 2. Время первой записи типа GET в очереди каждой активной реплики - * (или в логе, после log_pointer реплики - то есть, среди записей, ещё не попавших в очередь реплики). - * - * Разница между этими величинами называется (абсолютным) отставанием реплик. - * Кроме абсолютного отставания также будем рассматривать относительное - от реплики с минимальным отставанием. - * - * Если относительное отставание текущей реплики больше некоторого порога, - * и текущая реплика является лидером, то текущая реплика должна уступить лидерство. - * - * Также в случае превышения абсолютного либо относительного отставания некоторого порога, необходимо: - * - не отвечать Ok на некоторую ручку проверки реплик для балансировщика; - * - не принимать соединения для обработки запросов. - * Это делается в других местах. - * - * NOTE Реализация громоздкая, так как нужные значения вынимаются путём обхода списка узлов. - * Могут быть проблемы в случае разрастания лога до большого размера. - */ - out_absolute_delay = 0; out_relative_delay = 0; auto zookeeper = storage.getZooKeeper(); - /// Последняя запись GET в логе. - String log_path = storage.zookeeper_path + "/log"; - Strings log_entries_desc = zookeeper->getChildren(log_path); - std::sort(log_entries_desc.begin(), log_entries_desc.end(), std::greater()); - time_t last_entry_to_get_part = findFirstGetPartEntry(zookeeper, log_entries_desc, log_path); - - /** Возможно, что в логе нет записей типа GET. Тогда считаем, что никто не отстаёт. - * В очередях у реплик могут быть не выполненные старые записи типа GET, - * которые туда добавлены не из лога, а для восстановления битых кусков. - * Не будем считать это отставанием. - */ - - if (!last_entry_to_get_part) - return; - - /// Для каждой активной реплики время первой невыполненной записи типа GET, либо ноль, если таких записей нет. - std::map replicas_first_entry_to_get_part; - - Strings active_replicas = zookeeper->getChildren(storage.zookeeper_path + "/leader_election"); - for (const auto & node : active_replicas) - { - String replica; - if (!zookeeper->tryGet(storage.zookeeper_path + "/leader_election/" + node, replica)) - continue; /// Реплика только что перестала быть активной. - - String queue_path = storage.zookeeper_path + "/replicas/" + replica + "/queue"; - Strings queue_entries = zookeeper->getChildren(queue_path); - std::sort(queue_entries.begin(), queue_entries.end()); - time_t & first_time = replicas_first_entry_to_get_part[replica]; - first_time = findFirstGetPartEntry(zookeeper, queue_entries, queue_path); - - if (!first_time) - { - /// Ищем среди записей лога после log_pointer для реплики. - String log_pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer"); - String log_min_entry = "log-" + storage.padIndex(parse(log_pointer)); - - for (const auto & name : log_entries_desc) - { - if (name < log_min_entry) - break; - - first_time = extractTimeOfLogEntryIfGetPart(zookeeper, name, log_path); - if (first_time) - break; - } - } - } - - if (active_replicas.empty()) - { - /// Нет активных реплик. Очень необычная ситуация - как же тогда у нас была сессия с ZK, чтобы это выяснить? - /// Предполагаем, что всё же может быть потенциальный race condition при установке эфемерной ноды для leader election, а значит, это нормально. - LOG_ERROR(log, "No active replicas when checking replication delays: very strange."); - return; - } - - time_t first_entry_of_most_recent_replica = -1; - for (const auto & replica_time : replicas_first_entry_to_get_part) - { - if (0 == replica_time.second) - { - /// Есть реплика, которая совсем не отстаёт. - first_entry_of_most_recent_replica = 0; - break; - } - - if (replica_time.second > first_entry_of_most_recent_replica) - first_entry_of_most_recent_replica = replica_time.second; - } - - time_t our_first_entry_to_get_part = replicas_first_entry_to_get_part[storage.replica_name]; - if (0 == our_first_entry_to_get_part) - return; /// Если мы совсем не отстаём. - - out_absolute_delay = last_entry_to_get_part - our_first_entry_to_get_part; - out_relative_delay = first_entry_of_most_recent_replica - our_first_entry_to_get_part; + // TODO LOG_TRACE(log, "Absolute delay: " << out_absolute_delay << ". Relative delay: " << out_relative_delay << "."); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 4892a50ee30..502de59c948 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -162,6 +162,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( checkParts(skip_sanity_checks); } + createNewZooKeeperNodes(); + initVirtualParts(); String unreplicated_path = full_path + "unreplicated/"; @@ -194,6 +196,20 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } +void StorageReplicatedMergeTree::createNewZooKeeperNodes() +{ + auto zookeeper = getZooKeeper(); + + /// Работа с кворумом. + zookeeper->createIfNotExists(zookeeper_path + "/quorum", ""); + zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", ""); + zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", ""); + + /// Отслеживание отставания реплик. + zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", ""); +} + + StoragePtr StorageReplicatedMergeTree::create( const String & zookeeper_path_, const String & replica_name_,