From 8fc53a70206e4ccc36c9a0b3df4061e339a9c6d7 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Wed, 2 Apr 2014 11:59:43 +0400 Subject: [PATCH] Merge --- dbms/include/DB/Core/ErrorCodes.h | 3 + .../DB/Interpreters/InterserverIOHandler.h | 10 +- .../MergeTree/AbandonableLockInZooKeeper.h | 99 +++++++++++++++++++ .../DB/Storages/MergeTree/MergeTreeData.h | 21 +++- .../ReplicatedMergeTreeBlockOutputStream.h | 80 +++++++++++++++ .../DB/Storages/StorageReplicatedMergeTree.h | 9 +- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 10 +- .../MergeTree/MergeTreeDataWriter.cpp | 2 +- .../Storages/StorageReplicatedMergeTree.cpp | 51 +++++++++- 9 files changed, 274 insertions(+), 11 deletions(-) create mode 100644 dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 604c9807f55..6213ece69cf 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -236,6 +236,9 @@ namespace ErrorCodes NO_FILE_IN_DATA_PART, UNEXPECTED_FILE_IN_DATA_PART, BAD_SIZE_OF_FILE_IN_DATA_PART, + NOT_FOUND_EXPECTED_DATA_PART, + TOO_MANY_UNEXPECTED_DATA_PARTS, + NO_SUCH_DATA_PART, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/Interpreters/InterserverIOHandler.h b/dbms/include/DB/Interpreters/InterserverIOHandler.h index 16248887918..9a7bfef7291 100644 --- a/dbms/include/DB/Interpreters/InterserverIOHandler.h +++ b/dbms/include/DB/Interpreters/InterserverIOHandler.h @@ -63,12 +63,17 @@ private: class InterserverIOEndpointHolder { public: - InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint, InterserverIOHandler & handler_) - : name(name_), handler(handler_) + InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint_, InterserverIOHandler & handler_) + : name(name_), endpoint(endpoint_), handler(handler_) { handler.addEndpoint(name, endpoint); } + InterserverIOEndpointPtr getEndpoint() + { + return endpoint; + } + ~InterserverIOEndpointHolder() { try @@ -83,6 +88,7 @@ public: private: String name; + InterserverIOEndpointPtr endpoint; InterserverIOHandler & handler; }; diff --git a/dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h b/dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h new file mode 100644 index 00000000000..6a65d33bb73 --- /dev/null +++ b/dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h @@ -0,0 +1,99 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +/** Примитив синхронизации. Работает следующим образом: + * При создании создает неэфемерную инкрементную ноду и помечает ее как заблокированную (LOCKED). + * unlock() разблокирует ее (UNLOCKED). + * При вызове деструктора или завершении сессии в ZooKeeper, переходит в состояние ABANDONED. + * (В том числе при падении программы) + */ +class AbandonableLockInZooKeeper +{ +public: + enum State + { + UNLOCKED, + LOCKED, + ABANDONED, + }; + + AbandonableLockInZookeeper( + const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_) + : zookeeper(zookeeper_), path_prefix(path_prefix_) + { + /// Создадим вспомогательную эфемерную ноду. + holder_path = zookeeper.create(temp_path + "/abandonable-lock-", "", zkutil::CreateMode::EphemeralSequential); + + /// Запишем в основную ноду путь к вспомогательной. + path = zookeeper.create(path_prefix, holder_path, zkutil::CreateMode::PersistentSequential); + } + + String getPath() + { + return path; + } + + /// Распарсить число в конце пути. + UInt64 getNumber() + { + return static_cast(atol(path.substr(path_prefix.size()))); + } + + void unlock() + { + zookeeper.remove(path); + zookeeper.remove(holder_path); + } + + ~AbandonableLockInZookeeper() + { + try + { + zookeeper.remove(holder_path); + zookeeper.set(path, ""); /// Это не обязательно. + } + catch (...) + { + tryLogCurrentException(); + } + } + + static State check(const String & path, zkutil::ZooKeeper & zookeeper) + { + String holder_path; + + /// Если нет основной ноды, UNLOCKED. + if (!zookeeper.tryGet(path, holder_path)) + return UNLOCKED; + + /// Если в основной ноде нет пути к вспомогательной, ABANDONED. + if (holder_path.empty()) + return ABANDONED; + + /// Если вспомогательная нода жива, LOCKED. + if (zookeeper.exists(holder_path)) + return LOCKED; + + /// Если вспомогательной ноды нет, нужно еще раз проверить существование основной ноды, + /// потому что за это время могли успеть вызвать unlock(). + /// Заодно уберем оттуда путь к вспомогательной ноде. + if (zookeeper.trySet(path, "") == zkutil::ReturnCode::Ok) + return ABANDONED; + + return UNLOCKED; + } + +private: + zkutil::ZooKeeper & zookeeper; + String path_prefix; + String path; + String holder_path; +}; + +} diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 9ec69ea1952..044331d7cdb 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -7,6 +7,7 @@ #include #include #include +#include #include namespace DB @@ -125,6 +126,15 @@ public: { return files.empty(); } + + static Checksums parse(const String & s) + { + ReadBufferFromString in(s); + Checksums res; + res.readText(in); + assertEOF(in); + return res; + } }; DataPart(MergeTreeData & storage_) : storage(storage_), size_in_bytes(0) {} @@ -176,10 +186,11 @@ public: Poco::File(to).remove(true); } - void renameToOld() const + /// Переименовывает кусок, дописав к имени префикс. + void renameAddPrefix(const String & prefix) const { String from = storage.full_path + name + "/"; - String to = storage.full_path + "old_" + name + "/"; + String to = storage.full_path + prefix + name + "/"; Poco::File f(from); f.setLastModified(Poco::Timestamp::fromEpochTime(time(0))); @@ -318,6 +329,12 @@ public: */ void renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment); + /** Переименовывает кусок в prefix_кусок и убирает его из рабочего набора. + * Лучше использовать только когда никто не может читать или писать этот кусок + * (например, при инициализации таблицы). + */ + void renameAndRemovePart(DataPartPtr part, const String & prefix); + /** Удалить неактуальные куски. */ void clearOldParts(); diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index e69de29bb2d..52899cfc496 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -0,0 +1,80 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream +{ +public: + ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_) + : storage(storage_), insert_id(insert_id_), block_index(0) {} + + void write(const Block & block) override + { + auto part_blocks = storage.writer.splitBlockIntoParts(block); + for (auto & current_block : part_blocks) + { + ++block_index; + String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index); + + AbandonableLockInZooKeeper block_number_lock( + storage.zookeeper_path + "/block-numbers/block-", + storage.zookeeper_path + "/temp", storage.zookeeper); + + UInt64 part_number = block_number_lock.getNumber(); + + MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, part_number); + + String expected_checksums_str; + if (!block_id.empty() && storage.zookeeper.tryGet( + storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str)) + { + /// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять. + auto expected_checksums = MergeTreeData::DataPart::Checksums.parse(expected_checksums_str); + expected_checksums.check(part->checksums); + + /// Бросаем block_number_lock. + continue; + } + + storage.data.renameTempPartAndAdd(part); + + zkutil::Ops ops; + ops.push_back(new zkutil::Op::Create( + storage.zookeeper_path + "/blocks/" + block_id, + "", + storage.zookeeper.getDefaultACL(), + zkutil::CreateMode::Persistent)); + ops.push_back(new zkutil::Op::Create( + storage.zookeeper_path + "/blocks/" + block_id + "/checksums", + part->checksums.toString(), + storage.zookeeper.getDefaultACL(), + zkutil::CreateMode::Persistent)); + ops.push_back(new zkutil::Op::Create( + storage.zookeeper_path + "/blocks/" + block_id + "/number", + toString(part_numbre), + storage.zookeeper.getDefaultACL(), + zkutil::CreateMode::Persistent)); + ops.push_back(new zkutil::Op::Create( + storage.replica_path + "/parts/" + block_id + "/number", + toString(part_numbre), + storage.zookeeper.getDefaultACL(), + zkutil::CreateMode::Persistent)); + + const std::vector& acl, CreateMode::type mode); + + block_number_lock.unlock(); + } + } + +private: + StorageReplicatedMergeTree & storage; + String insert_id; + size_t block_index; +}; + +} diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index aad819a6d29..e71da14cb93 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -62,6 +62,8 @@ public: void drop() override; private: + friend class ReplicatedMergeTreeBlockOutputStream; + struct LogEntry { enum Type @@ -82,7 +84,12 @@ private: class MyInterserverIOEndpoint : public InterserverIOEndpoint { public: - MyInterserverIOEndpoint(StorageReplicatedMergeTree & storage_) : storage(storage_), owned_storage(storage.thisPtr()) {} + MyInterserverIOEndpoint(StorageReplicatedMergeTree & storage_) : storage(storage_) {} + + void setOwnedStorage(StoragePtr owned_storage_) + { + owned_storage = owned_storage_; + } void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index ea0e81fbb9f..509e57a1e72 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -290,7 +290,7 @@ void MergeTreeData::clearOldParts() { LOG_DEBUG(log, "'Removing' part " << (*it)->name << " (prepending old_ to its name)"); - (*it)->renameToOld(); + (*it)->renameAddPrefix("old_"); all_data_parts.erase(it++); } else @@ -703,6 +703,14 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * in all_data_parts.insert(part); } +void MergeTreeData::renameAndRemovePart(DataPartPtr part, const String & prefix) +{ + Poco::ScopedLock lock(data_parts_mutex); + if (!data_parts.erase(part)) + throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART); + part->renameAddPrefix(prefix); +} + MergeTreeData::DataParts MergeTreeData::getDataParts() { Poco::ScopedLock lock(data_parts_mutex); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 6d6cbbe6a12..edf9e3b0dfa 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -114,7 +114,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa new_data_part->left = temp_index; new_data_part->right = temp_index; new_data_part->level = 0; - new_data_part->name = tmp_part_name; + new_data_part->name = tmp_part_name; (no tmp_ here?) new_data_part->size = part_size; new_data_part->modification_time = time(0); new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index d1feb2fdae3..58153c2f18c 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -55,7 +55,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( else { checkTableStructure(); - //checkParts(); + checkParts(); } String endpoint_name = "ReplicatedMergeTree:" + replica_path; @@ -79,8 +79,12 @@ StoragePtr StorageReplicatedMergeTree::create( const String & sign_column_, const MergeTreeSettings & settings_) { - return (new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach, path_, name_, columns_, context_, primary_expr_ast_, - date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_))->thisPtr(); + StorageReplicatedMergeTree * res = new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach, + path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, + index_granularity_, mode_, sign_column_, settings_); + StoragePtr res_ptr = res->thisPtr(); + dynamic_cast(*res->endpoint_holder->getEndpoint()).setOwnedStorage(res_ptr); + return res_ptr; } static String formattedAST(const ASTPtr & ast) @@ -156,6 +160,7 @@ void StorageReplicatedMergeTree::checkTableStructure() assertString(it.second->getName(), buf); assertString("\n", buf); } + assertEOF(buf); } void StorageReplicatedMergeTree::createReplica() @@ -170,6 +175,8 @@ void StorageReplicatedMergeTree::createReplica() void StorageReplicatedMergeTree::activateReplica() { + throw Exception("test"); + std::stringstream host; host << "host: " << context.getInterserverIOHost() << std::endl; host << "port: " << context.getInterserverIOPort() << std::endl; @@ -194,7 +201,42 @@ bool StorageReplicatedMergeTree::isTableEmpty() return true; } -void StorageReplicatedMergeTree::checkParts() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); } +void StorageReplicatedMergeTree::checkParts() +{ + Strings expected_parts_vec = zookeeper.getChildren(replica_path + "/parts"); + NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end()); + + MergeTreeData::DataParts parts = data.getDataParts(); + + MergeTreeData::DataPartsVector unexpected_parts; + for (const auto & part : parts) + { + if (expected_parts.count(part->name)) + { + expected_parts.erase(part->name); + } + else + { + unexpected_parts.push_back(part); + } + } + + if (!expected_parts.empty()) + throw Exception("Not found " + toString(expected_parts.size()) + + " (including " + *expected_parts.begin() + ") parts in table " + data.getTableName(), + ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART); + + if (unexpected_parts.size() > 1) + throw Exception("More than one unexpected part (including " + unexpected_parts[0]->name + + ") in table " + data.getTableName(), + ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS); + + for (MergeTreeData::DataPartPtr part : unexpected_parts) + { + LOG_ERROR(log, "Unexpected part " << part->name << ". Renaming it to ignored_" + part->name); + data.renameAndRemovePart(part, "ignored_"); + } +} void StorageReplicatedMergeTree::loadQueue() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); } @@ -207,6 +249,7 @@ void StorageReplicatedMergeTree::executeSomeQueueEntry() { throw Exception("Not bool StorageReplicatedMergeTree::tryExecute(const LogEntry & entry) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); } String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); } + void StorageReplicatedMergeTree::getPart(const String & name, const String & replica_name) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); } void StorageReplicatedMergeTree::shutdown()