From d53f173478195a2e3cc06f317c4cf7c570a460e5 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 11 Sep 2015 05:13:59 +0300 Subject: [PATCH] dbms: quorum inserts: development [#METR-16779]. --- dbms/include/DB/Core/ErrorCodes.h | 1 + .../ReplicatedMergeTreeBlockOutputStream.h | 108 ++++++++++++++-- .../ReplicatedMergeTreeQuorumEntry.h | 72 +++++++++++ .../DB/Storages/StorageReplicatedMergeTree.h | 3 +- .../Storages/StorageReplicatedMergeTree.cpp | 121 +++++++++++++++--- libs/libzkutil/include/zkutil/ZooKeeper.h | 4 + libs/libzkutil/src/ZooKeeper.cpp | 17 +++ 7 files changed, 297 insertions(+), 29 deletions(-) create mode 100644 dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index bdfd2893352..30763e02cbf 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -288,6 +288,7 @@ namespace ErrorCodes INCORRECT_INDEX = 282, UNKNOWN_GLOBAL_SUBQUERIES_METHOD = 284, TOO_LESS_LIVE_REPLICAS = 285, + UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE = 286, KEEPER_EXCEPTION = 999, POCO_EXCEPTION = 1000, diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index ae89ff016b3..d943975e303 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -14,7 +15,12 @@ class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream public: ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_, size_t quorum_) : storage(storage_), insert_id(insert_id_), quorum(quorum_), - log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)")) {} + log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)")) + { + /// Значение кворума 1 имеет такой же смысл, как если он отключён. + if (quorum == 1) + quorum = 0; + } void writePrefix() override { @@ -29,8 +35,9 @@ public: assertSessionIsNotExpired(zookeeper); /** Если запись с кворумом, то проверим, что требуемое количество реплик сейчас живо, - * а также что у нас есть все предыдущие куски, которые были записаны с кворумом. + * а также что для всех предыдущих кусков, для которых требуется кворум, этот кворум достигнут. */ + String quorum_status_path = storage.zookeeper_path + "/quorum/status"; if (quorum) { /// Список живых реплик. Все они регистрируют эфемерную ноду для leader_election. @@ -55,11 +62,21 @@ public: ErrorCodes::TOO_LESS_LIVE_REPLICAS); } - /// Разумеется, реплики могут перестать быть живыми после этой проверки. Это не проблема. - - /** Есть ли у нас последний кусок, записанный с кворумом? - * В ZK будем иметь следующую структуру директорий: + /** Достигнут ли кворум для последнего куска, для которого нужен кворум? + * Запись всех кусков с включенным кворумом линейно упорядочена. + * Это значит, что в любой момент времени может быть только один кусок, + * для которого нужен, но ещё не достигнут кворум. + * Информация о таком куске будет расположена в ноде /quorum/status. + * Если кворум достигнут, то нода удаляется. */ + + String quorum_status; + bool quorum_unsatisfied = zookeeper->tryGet(quorum_status_path, quorum_status); + + if (quorum_unsatisfied) + throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); + + /// Обе проверки неявно делаются и позже (иначе был бы race condition). } auto part_blocks = storage.writer.splitBlockIntoParts(block); @@ -83,48 +100,96 @@ public: /// NOTE: Если такая дедупликация не нужна, можно вместо этого оставлять block_id пустым. /// Можно для этого сделать настройку или синтаксис в запросе (например, ID=null). if (block_id.empty()) + { block_id = part->checksums.summaryDataChecksum(); + if (block_id.empty()) + throw Exception("Logical error: block_id is empty.", ErrorCodes::LOGICAL_ERROR); + } + LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id << ", " << current_block.block.rows() << " rows"); StorageReplicatedMergeTree::LogEntry log_entry; log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART; log_entry.source_replica = storage.replica_name; log_entry.new_part_name = part_name; + log_entry.quorum = quorum; /// Одновременно добавим информацию о куске во все нужные места в ZooKeeper и снимем block_number_lock. + + /// Информация о блоке. zkutil::Ops ops; - if (!block_id.empty()) - { - ops.push_back(new zkutil::Op::Create( + + ops.push_back( + new zkutil::Op::Create( storage.zookeeper_path + "/blocks/" + block_id, "", zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); - ops.push_back(new zkutil::Op::Create( + ops.push_back( + new zkutil::Op::Create( storage.zookeeper_path + "/blocks/" + block_id + "/columns", part->columns.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); - ops.push_back(new zkutil::Op::Create( + ops.push_back( + new zkutil::Op::Create( storage.zookeeper_path + "/blocks/" + block_id + "/checksums", part->checksums.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); - ops.push_back(new zkutil::Op::Create( + ops.push_back( + new zkutil::Op::Create( storage.zookeeper_path + "/blocks/" + block_id + "/number", toString(part_number), zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); - } + + /// Информация о куске, в данных реплики. storage.checkPartAndAddToZooKeeper(part, ops, part_name); + + /// Лог репликации. ops.push_back(new zkutil::Op::Create( storage.zookeeper_path + "/log/log-", log_entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential)); + + /// Удаление информации о том, что номер блока используется для записи. block_number_lock.getUnlockOps(ops); + /** Если нужен кворум - создание узла, в котором отслеживается кворум. + * (Если такой узел уже существует - значит кто-то успел одновременно сделать другую кворумную запись, но для неё кворум ещё не достигнут. + * Делать в это время следующую кворумную запись нельзя.) + */ + if (quorum) + { + static std::once_flag once_flag; + std::call_once(once_flag, [&] + { + zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum", ""); + }); + + ReplicatedMergeTreeQuorumEntry quorum_entry; + quorum_entry.part_name = part_name; + quorum_entry.required_number_of_replicas = quorum; + quorum_entry.replicas.insert(storage.replica_name); + + /** В данный момент, этот узел будет содержать информацию о том, что текущая реплика получила кусок. + * Когда другие реплики будут получать этот кусок (обычным способом, обрабатывая лог репликации), + * они будут добавлять себя в содержимое этого узла. + * Когда в нём будет информация о quorum количестве реплик, этот узел удаляется, + * что говорит о том, что кворум достигнут. + */ + + ops.push_back( + new zkutil::Op::Create( + quorum_status_path, + quorum_entry.toString(), + zookeeper->getDefaultACL(), + zkutil::CreateMode::Persistent)); + } + MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора. storage.data.renameTempPartAndAdd(part, nullptr, &transaction); @@ -154,8 +219,17 @@ public: part.reset(); transaction.rollback(); } + else if (zookeeper->exists(quorum_status_path)) + { + part.reset(); + transaction.rollback(); + + throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); + } else { + /// Сюда можем попасть также, если узел с кворумом существовал, но потом быстро был удалён. + throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": " + zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); } @@ -180,6 +254,14 @@ public: throw; } + + if (quorum) + { + /// Дожидаемся достижения кворума. TODO Настраиваемый таймаут. + LOG_TRACE(log, "Waiting for quorum"); + zookeeper->waitForDisappear(quorum_status_path); + LOG_TRACE(log, "Quorum satisfied"); + } } } diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h new file mode 100644 index 00000000000..f416fbeda81 --- /dev/null +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h @@ -0,0 +1,72 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +/** Для реализации функциональности "кворумная запись". + * Информация о том, на каких репликах появился вставленный кусок данных, + * и на скольких репликах он должен быть. + */ +struct ReplicatedMergeTreeQuorumEntry +{ + String part_name; + size_t required_number_of_replicas; + std::set replicas; + + void writeText(WriteBuffer & out) const + { + out << "version: 1\n" + << "part_name: " << part_name << "\n" + << "required_number_of_replicas: " << required_number_of_replicas << "\n" + << "actual_number_of_replicas: " << replicas.size() << "\n" + << "replicas:\n"; + + for (const auto & replica : replicas) + out << escape << replica << "\n"; + } + + void readText(ReadBuffer & in) + { + size_t actual_number_of_replicas = 0; + + in >> "version: 1\n" + >> "part_name: " >> part_name >> "\n" + >> "required_number_of_replicas: " >> required_number_of_replicas >> "\n" + >> "actual_number_of_replicas: " >> actual_number_of_replicas >> "\n" + >> "replicas:\n"; + + for (size_t i = 0; i < actual_number_of_replicas; ++i) + { + String replica; + in >> escape >> replica >> "\n"; + replicas.insert(replica); + } + } + + String toString() const + { + String res; + { + WriteBufferFromString out(res); + writeText(out); + } + return res; + } + + void fromString(const String & str) + { + ReadBufferFromString in(str); + readText(in); + } +}; + +} diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index a6fa9091c3c..450f28c86fb 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -379,8 +379,9 @@ private: /** Скачать указанный кусок с указанной реплики. * Если to_detached, то кусок помещается в директорию detached. + * Если quorum != 0, то обновляется узел для отслеживания кворума. */ - void fetchPart(const String & part_name, const String & replica_path, bool to_detached = false); + void fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum); AbandonableLockInZooKeeper allocateBlockNumber(const String & month_name); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 674e58f789c..011492ec9c6 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -921,10 +922,13 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro if (replica.empty()) { ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches); + + // TODO Проверить, может быть куска нет ни на одной живой или мёртвой реплике, и он исчез навсегда? + throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART); } - fetchPart(entry.new_part_name, zookeeper_path + "/replicas/" + replica); + fetchPart(entry.new_part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum); if (entry.type == LogEntry::MERGE_PARTS) ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged); @@ -1271,6 +1275,24 @@ void StorageReplicatedMergeTree::mergeSelectingThread() String month_name = left->name.substr(0, 6); auto zookeeper = getZooKeeper(); + /// Нельзя сливать куски, среди которых находится кусок, для которого неудовлетворён кворум. + /// Замечание: теоретически, это можно было бы разрешить. Но это сделает логику более сложной. + String quorum_node_value; + if (zookeeper->tryGet(zookeeper_path + "/quorum/status", quorum_node_value)) + { + ReplicatedMergeTreeQuorumEntry quorum_entry; + quorum_entry.fromString(quorum_node_value); + + ActiveDataPartSet::Part part_info; + ActiveDataPartSet::parsePartName(quorum_entry.part_name, part_info); + + if (part_info.left != part_info.right) + throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR); + + if (left->right <= part_info.left && right->left >= part_info.right) + return false; + } + /// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам. /// Номера до RESERVED_BLOCK_NUMBERS всегда не соответствуют никаким блокам. for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, left->right + 1); number <= right->left - 1; ++number) @@ -1908,7 +1930,81 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam } -void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached) +/** Если для куска отслеживается кворум, то обновить информацию о нём в ZK. + */ +static void updateQuorum( + zkutil::ZooKeeperPtr & zookeeper, + const String & zookeeper_path, + const String & replica_name, + const String & part_name, + size_t quorum) +{ + if (!quorum) + return; + + const String quorum_status_path = zookeeper_path + "/quorum/status"; + String value; + zkutil::Stat stat; + + /// Если узла нет, значит по всем кворумным INSERT-ам уже был достигнут кворум, и ничего делать не нужно. + while (zookeeper->tryGet(quorum_status_path, value, &stat)) + { + ReplicatedMergeTreeQuorumEntry quorum_entry; + quorum_entry.fromString(value); + + if (quorum_entry.part_name != part_name) + { + /// Кворум уже был достигнут. Более того, уже начался другой INSERT с кворумом. + break; + } + + if (quorum_entry.required_number_of_replicas != quorum) + throw Exception("Logical error: quorum size in log entry is different than quorum size in node /quorum/status", + ErrorCodes::LOGICAL_ERROR); + + quorum_entry.replicas.insert(replica_name); + + if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas) + { + /// Кворум достигнут. Удаляем узел. + auto code = zookeeper->tryRemove(quorum_status_path, stat.version); + + if (code == ZNONODE) + { + /// Кворум уже был достигнут. + break; + } + else if (code == ZBADVERSION) + { + /// Узел успели обновить. Надо заново его прочитать и повторить все действия. + continue; + } + else + throw zkutil::KeeperException(code, quorum_status_path); + } + else + { + /// Обновляем узел, прописывая туда на одну реплику больше. + auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version); + + if (code == ZNONODE) + { + /// Кворум уже был достигнут. + break; + } + else if (code == ZBADVERSION) + { + /// Узел успели обновить. Надо заново его прочитать и повторить все действия. + continue; + } + else + throw zkutil::KeeperException(code, quorum_status_path); + } + } +} + + +void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum) { auto zookeeper = getZooKeeper(); @@ -1942,6 +2038,12 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin zookeeper->multi(ops); transaction.commit(); + + /** Если для этого куска отслеживается кворум, то надо его обновить. + * TODO Обработка в случае неизвестной ошибки, потери сессии, при перезапуске сервера. + */ + updateQuorum(zookeeper, zookeeper_path, replica_name, part_name, quorum); + merge_selecting_event.set(); for (const auto & removed_part : removed_parts) @@ -2636,18 +2738,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue"); /// Третье - дождемся, пока запись исчезнет из очереди реплики. - - while (true) - { - zkutil::EventPtr event = new Poco::Event; - - String unused; - /// get вместо exists, чтобы не утек watch, если ноды уже нет. - if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for, unused, nullptr, event)) - break; - - event->wait(); - } + zookeeper->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for); } @@ -2865,7 +2956,7 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S { try { - fetchPart(part, best_replica_path, true); + fetchPart(part, best_replica_path, true, 0); } catch (const DB::Exception & e) { diff --git a/libs/libzkutil/include/zkutil/ZooKeeper.h b/libs/libzkutil/include/zkutil/ZooKeeper.h index d7c7c077d01..1db73ba0a54 100644 --- a/libs/libzkutil/include/zkutil/ZooKeeper.h +++ b/libs/libzkutil/include/zkutil/ZooKeeper.h @@ -167,6 +167,10 @@ public: */ void tryRemoveRecursive(const std::string & path); + /** Подождать, пока нода перестанет существовать или вернуть сразу, если нода не существует. + */ + void waitForDisappear(const std::string & path); + /** Асинхронный интерфейс (реализовано небольшое подмножество операций). * diff --git a/libs/libzkutil/src/ZooKeeper.cpp b/libs/libzkutil/src/ZooKeeper.cpp index 52b0a0eb545..7e9b1e022c3 100644 --- a/libs/libzkutil/src/ZooKeeper.cpp +++ b/libs/libzkutil/src/ZooKeeper.cpp @@ -558,6 +558,23 @@ void ZooKeeper::tryRemoveRecursive(const std::string & path) tryRemove(path); } + +void ZooKeeper::waitForDisappear(const std::string & path) +{ + while (true) + { + zkutil::EventPtr event = new Poco::Event; + + std::string unused; + /// get вместо exists, чтобы не утек watch, если ноды уже нет. + if (!tryGet(path, unused, nullptr, event)) + break; + + event->wait(); + } +} + + ZooKeeper::~ZooKeeper() { LOG_INFO(&Logger::get("~ZooKeeper"), "Closing ZooKeeper session");