From 50ba6ca8e1e5a41e8367e307f589dbb13fd97a05 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Thu, 3 Apr 2014 16:49:01 +0400 Subject: [PATCH] Merge --- .../ReplicatedMergeTreeBlockOutputStream.h | 2 +- dbms/src/Interpreters/loadMetadata.cpp | 3 +- .../Storages/StorageReplicatedMergeTree.cpp | 97 +++++++++++++------ 3 files changed, 72 insertions(+), 30 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 87175a54f43..cad531aa01d 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -22,7 +22,7 @@ public: String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index); AbandonableLockInZooKeeper block_number_lock( - storage.zookeeper_path + "/block-numbers/block-", + storage.zookeeper_path + "/block_numbers/block-", storage.zookeeper_path + "/temp", storage.zookeeper); UInt64 part_number = block_number_lock.getNumber(); diff --git a/dbms/src/Interpreters/loadMetadata.cpp b/dbms/src/Interpreters/loadMetadata.cpp index 97812b819f8..fe364b6a92d 100644 --- a/dbms/src/Interpreters/loadMetadata.cpp +++ b/dbms/src/Interpreters/loadMetadata.cpp @@ -133,7 +133,8 @@ void loadMetadata(Context & context) } catch (const Exception & e) { - throw Exception("Cannot create table from metadata file " + tables[j] + ", error: " + e.displayText(), + throw Exception("Cannot create table from metadata file " + tables[j] + ", error: " + e.displayText() + + ", stack trace:\n" + e.getStackTrace().toString(), ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA); } } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 9c8a9c5f499..2e675bbe627 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -129,7 +129,7 @@ void StorageReplicatedMergeTree::createTable() /// Создадим нужные "директории". zookeeper.create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent); zookeeper.create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent); - zookeeper.create(zookeeper_path + "/block-numbers", "", zkutil::CreateMode::Persistent); + zookeeper.create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent); zookeeper.create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent); } @@ -249,7 +249,7 @@ void StorageReplicatedMergeTree::loadQueue() std::sort(children.begin(), children.end()); for (const String & child : children) { - String s = zookeeper.get(child); + String s = zookeeper.get(replica_path + "/queue/" + child); LogEntry entry = LogEntry::parse(s); entry.znode_name = child; queue.push_back(entry); @@ -260,47 +260,88 @@ void StorageReplicatedMergeTree::pullLogsToQueue() { Poco::ScopedLock lock(queue_mutex); + /// Сольем все логи в хронологическом порядке. + + struct LogIterator + { + String replica; /// Имя реплики. + UInt64 index; /// Номер записи в логе (суффикс имени ноды). + + Int64 timestamp; /// Время (czxid) создания записи в логе. + String entry_str; /// Сама запись. + + bool operator<(const LogIterator & rhs) const + { + return timestamp < rhs.timestamp; + } + + bool readEntry(zkutil::ZooKeeper & zookeeper, const String & zookeeper_path) + { + String index_str = toString(index); + while (index_str.size() < 10) + index_str = '0' + index_str; + zkutil::Stat stat; + if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log/log-" + index_str, entry_str, &stat)) + return false; + timestamp = stat.getczxid(); + return true; + } + }; + + typedef std::priority_queue PriorityQueue; + PriorityQueue priority_queue; + Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas"); + for (const String & replica : replicas) { - String log_path = zookeeper_path + "/" + replica + "/log"; + String index_str; + UInt64 index; - String pointer_str; - UInt64 pointer; - - if (zookeeper.tryGet(replica_path + "/log_pointers/" + replica, pointer_str)) + if (zookeeper.tryGet(replica_path + "/log_pointers/" + replica, index_str)) { - pointer = Poco::NumberParser::parseUnsigned64(pointer_str); + index = Poco::NumberParser::parseUnsigned64(index_str); } else { /// Если у нас еще нет указателя на лог этой реплики, поставим указатель на первую запись в нем. - Strings entries = zookeeper.getChildren(log_path); + Strings entries = zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/log"); std::sort(entries.begin(), entries.end()); - pointer = entries.empty() ? 0 : Poco::NumberParser::parseUnsigned64(entries[0].substr(strlen("log-"))); + index = entries.empty() ? 0 : Poco::NumberParser::parseUnsigned64(entries[0].substr(strlen("log-"))); - zookeeper.create(replica_path + "/log_pointers/" + replica, toString(pointer), zkutil::CreateMode::Persistent); + zookeeper.create(replica_path + "/log_pointers/" + replica, toString(index), zkutil::CreateMode::Persistent); } - String entry_str; - while (zookeeper.tryGet(log_path + "/log-" + toString(pointer), entry_str)) - { - LogEntry entry = LogEntry::parse(entry_str); + LogIterator iterator; + iterator.replica = replica; + iterator.index = index; - /// Одновременно добавим запись в очередь и продвинем указатель на лог. - zkutil::Ops ops; - ops.push_back(new zkutil::Op::Create( - replica_path + "/queue/queue-", entry_str, zookeeper.getDefaultACL(), zkutil::CreateMode::PersistentSequential)); - ops.push_back(new zkutil::Op::SetData( - replica_path + "/log_pointers/" + replica, toString(pointer + 1), -1)); - auto results = zookeeper.multi(ops); + if (iterator.readEntry(zookeeper, zookeeper_path)) + priority_queue.push(iterator); + } - String path_created = dynamic_cast((*results)[0]).getPathCreated(); - entry.znode_name = path.substr(path.find_last_of('/') + 1); - queue.push_back(entry); + while (!priority_queue.empty()) + { + LogIterator iterator = priority_queue.top(); + priority_queue.pop(); - ++pointer; - } + LogEntry entry = LogEntry::parse(iterator.entry_str); + + /// Одновременно добавим запись в очередь и продвинем указатель на лог. + zkutil::Ops ops; + ops.push_back(new zkutil::Op::Create( + replica_path + "/queue/queue-", iterator.entry_str, zookeeper.getDefaultACL(), zkutil::CreateMode::PersistentSequential)); + ops.push_back(new zkutil::Op::SetData( + replica_path + "/log_pointers/" + iterator.replica, toString(iterator.index + 1), -1)); + auto results = zookeeper.multi(ops); + + String path_created = dynamic_cast((*results)[0]).getPathCreated(); + entry.znode_name = path.substr(path.find_last_of('/') + 1); + queue.push_back(entry); + + ++iterator.index; + if (iterator.readEntry(zookeeper, zookeeper_path)) + priority_queue.push(iterator); } } @@ -441,7 +482,7 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin assertString("\n", buf); assertEOF(buf); - MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(name, zookeeper_path + "/replicas/" + replica_name, host, port); + MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port); data.renameTempPartAndAdd(part, nullptr); zkutil::Ops ops;