From b92f1ff48046a736cf5721b7d600ad2dc0dd7d8a Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Wed, 2 Apr 2014 17:45:39 +0400 Subject: [PATCH] Merge --- dbms/include/DB/Core/ErrorCodes.h | 1 + dbms/include/DB/IO/ReadBufferFromHTTP.h | 26 ++-- dbms/include/DB/IO/ReadHelpers.h | 2 + dbms/include/DB/IO/RemoteReadBuffer.h | 21 ++- dbms/include/DB/IO/WriteHelpers.h | 2 + dbms/include/DB/IO/copyData.h | 4 + .../DB/Storages/MergeTree/MergeTreeData.h | 19 ++- .../DB/Storages/MergeTree/MergeTreeReader.h | 3 - .../ReplicatedMergeTreeBlockOutputStream.h | 46 +++--- .../ReplicatedMergeTreePartsExchange.h | 131 ++++++++++++++++++ .../DB/Storages/StorageReplicatedMergeTree.h | 17 --- dbms/src/IO/copyData.cpp | 14 ++ dbms/src/Storages/MergeTree/MergeTreeData.cpp | 24 ++-- .../Storages/StorageReplicatedMergeTree.cpp | 18 +-- .../include/zkutil/KeeperException.h | 4 + 15 files changed, 249 insertions(+), 83 deletions(-) create mode 100644 dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 6213ece69cf..f016e90b3fb 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -239,6 +239,7 @@ namespace ErrorCodes NOT_FOUND_EXPECTED_DATA_PART, TOO_MANY_UNEXPECTED_DATA_PARTS, NO_SUCH_DATA_PART, + BAD_DATA_PART_NAME, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/IO/ReadBufferFromHTTP.h b/dbms/include/DB/IO/ReadBufferFromHTTP.h index d94f8d53bbc..9e23cffc413 100644 --- a/dbms/include/DB/IO/ReadBufferFromHTTP.h +++ b/dbms/include/DB/IO/ReadBufferFromHTTP.h @@ -24,32 +24,42 @@ class ReadBufferFromHTTP : public ReadBuffer private: std::string host; int port; - std::string params; Poco::Net::HTTPClientSession session; std::istream * istr; /// этим владеет session Poco::SharedPtr impl; public: + typedef std::vector > Params; + ReadBufferFromHTTP( const std::string & host_, int port_, - const std::string & params_, + const Params & params, size_t timeout_ = 0, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) - : ReadBuffer(NULL, 0), host(host_), port(port_), params(params_) + : ReadBuffer(NULL, 0), host(host_), port(port_) { - std::string encoded_path; - Poco::URI::encode(path, "&#", encoded_path); - std::stringstream uri; - uri << "http://" << host << ":" << port << "/?" << params; + uri << "http://" << host << ":" << port << "/"; + + bool first = true; + for (const auto & it : params) + { + uri << (first ? "?" : "&"); + first = false; + String encoded_key; + String encoded_value; + Poco::URI::encode(it.first, "=&#", encoded_key); + Poco::URI::encode(it.second, "&#", encoded_value); + uri << encoded_key << "=" << encoded_value; + } session.setHost(host); session.setPort(port); /// устанавливаем таймаут - session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_REMOTE_READ_BUFFER_TIMEOUT, 0)); + session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0)); Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str()); Poco::Net::HTTPResponse response; diff --git a/dbms/include/DB/IO/ReadHelpers.h b/dbms/include/DB/IO/ReadHelpers.h index 724246d6d83..0f364c01c29 100644 --- a/dbms/include/DB/IO/ReadHelpers.h +++ b/dbms/include/DB/IO/ReadHelpers.h @@ -18,6 +18,7 @@ #include #include +#include #define DEFAULT_MAX_STRING_SIZE 0x00FFFFFFULL @@ -417,6 +418,7 @@ inline void readBinary(Float32 & x, ReadBuffer & buf) { readPODBinary(x, buf); } inline void readBinary(Float64 & x, ReadBuffer & buf) { readPODBinary(x, buf); } inline void readBinary(String & x, ReadBuffer & buf) { readStringBinary(x, buf); } inline void readBinary(bool & x, ReadBuffer & buf) { readPODBinary(x, buf); } +inline void readBinary(uint128 & x, ReadBuffer & buf) { readPODBinary(x, buf); } inline void readBinary(VisitID_t & x, ReadBuffer & buf) { readPODBinary(x, buf); } inline void readBinary(mysqlxx::Date & x, ReadBuffer & buf) { readPODBinary(x, buf); } diff --git a/dbms/include/DB/IO/RemoteReadBuffer.h b/dbms/include/DB/IO/RemoteReadBuffer.h index 0366da65a18..ef80b364634 100644 --- a/dbms/include/DB/IO/RemoteReadBuffer.h +++ b/dbms/include/DB/IO/RemoteReadBuffer.h @@ -23,13 +23,12 @@ public: size_t timeout = 0, size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE) { - std::string encoded_path; - Poco::URI::encode(path, "&#", encoded_path); - - std::stringstream params; - params << "action=read&path=" << encoded_path << "&compress=" << (compress ? "true" : "false"); + ReadBufferFromHTTP::Params params = { + std::make_pair("action", "read"), + std::make_pair("path", path), + std::make_pair("compress", (compress ? "true" : "false"))}; - impl = new ReadBufferFromHTTP(host, port, params.str, timeout, buffer_size); + impl = new ReadBufferFromHTTP(host, port, params, timeout, buffer_size); } bool nextImpl() @@ -48,13 +47,11 @@ public: const std::string & path, size_t timeout = 0) { - std::string encoded_path; - Poco::URI::encode(path, "&#", encoded_path); + ReadBufferFromHTTP::Params params = { + std::make_pair("action", "list"), + std::make_pair("path", path)}; - std::stringstream params; - params << "action=list&path=" << encoded_path; - - ReadBufferFromHTTP in(host, port, params.str(), timeout); + ReadBufferFromHTTP in(host, port, params, timeout); std::vector files; while (!in.eof()) diff --git a/dbms/include/DB/IO/WriteHelpers.h b/dbms/include/DB/IO/WriteHelpers.h index 9d9cd563e89..06bad60e829 100644 --- a/dbms/include/DB/IO/WriteHelpers.h +++ b/dbms/include/DB/IO/WriteHelpers.h @@ -18,6 +18,7 @@ #include #include #include +#include #define WRITE_HELPERS_DEFAULT_FLOAT_PRECISION 6U @@ -448,6 +449,7 @@ inline void writeBinary(const Float32 & x, WriteBuffer & buf) { writePODBinary( inline void writeBinary(const Float64 & x, WriteBuffer & buf) { writePODBinary(x, buf); } inline void writeBinary(const String & x, WriteBuffer & buf) { writeStringBinary(x, buf); } inline void writeBinary(const bool & x, WriteBuffer & buf) { writePODBinary(x, buf); } +inline void writeBinary(const uint128 & x, WriteBuffer & buf) { writePODBinary(x, buf); } inline void writeBinary(const VisitID_t & x, WriteBuffer & buf) { writePODBinary(static_cast(x), buf); } inline void writeBinary(const mysqlxx::Date & x, WriteBuffer & buf) { writePODBinary(x, buf); } diff --git a/dbms/include/DB/IO/copyData.h b/dbms/include/DB/IO/copyData.h index c4e50bdff47..c44698954c4 100644 --- a/dbms/include/DB/IO/copyData.h +++ b/dbms/include/DB/IO/copyData.h @@ -12,6 +12,10 @@ namespace DB */ void copyData(ReadBuffer & from, WriteBuffer & to); +/** Копирует bytes байт из ReadBuffer в WriteBuffer + */ +void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes); + } #endif diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 9aba07f95dd..6957902fbd8 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -8,8 +8,13 @@ #include #include #include +#include #include + +#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t)) + + namespace DB { @@ -147,7 +152,7 @@ public: } }; - DataPart(MergeTreeData & storage_) : storage(storage_), size_in_bytes(0) {} + DataPart(MergeTreeData & storage_) : storage(storage_), size(0), size_in_bytes(0) {} MergeTreeData & storage; DayNum_t left_date; @@ -237,14 +242,20 @@ public: && right >= rhs.right; } - /// Загрузить индекс и вычислить размер. + /// Загрузить индекс и вычислить размер. Если size=0, вычислить его тоже. void loadIndex() { + /// Размер - в количестве засечек. + if (!size) + size = Poco::File(storage.full_path + name + "/" + escapeForFileName(storage.columns->front().first) + ".mrk") + .getSize() / MERGE_TREE_MARK_SIZE; + size_t key_size = storage.sort_descr.size(); index.resize(key_size * size); String index_path = storage.full_path + name + "/primary.idx"; - ReadBufferFromFile index_file(index_path, std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize())); + ReadBufferFromFile index_file(index_path, + std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize())); for (size_t i = 0; i < size; ++i) for (size_t j = 0; j < key_size; ++j) @@ -318,7 +329,7 @@ public: bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches) const; /// Кладет в DataPart данные из имени кусочка. - void parsePartName(const String & file_name, const Poco::RegularExpression::MatchVec & matches, DataPart & part); + void parsePartName(const String & file_name, DataPart & part, const Poco::RegularExpression::MatchVec * matches = nullptr); std::string getTableName() { return ""; } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h index 3f0f7173e4a..041edc68989 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h @@ -12,9 +12,6 @@ #include -#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t)) - - namespace DB { diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 3fb706c25ae..87175a54f43 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -11,7 +11,7 @@ class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream { public: ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_) - : storage(storage_), insert_id(insert_id_), block_index(0) {} + : storage(storage_), insert_id(insert_id_), block_index(0), log(&Logger::get("ReplicatedMergeTreeBlockOutputStream")) {} void write(const Block & block) override { @@ -33,6 +33,8 @@ public: if (!block_id.empty() && storage.zookeeper.tryGet( storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str)) { + LOG_INFO(log, "Block with this ID already exists; ignoring it"); + /// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять. auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str); expected_checksums.check(part->checksums); @@ -49,28 +51,38 @@ public: log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART; log_entry.new_part_name = part->name; + String checksums_str = part->checksums.toString(); + /// Одновременно добавим информацию о куске во все нужные места в ZooKeeper и снимем block_number_lock. zkutil::Ops ops; - ops.push_back(new zkutil::Op::Create( - storage.zookeeper_path + "/blocks/" + block_id, - "", - storage.zookeeper.getDefaultACL(), - zkutil::CreateMode::Persistent)); - ops.push_back(new zkutil::Op::Create( - storage.zookeeper_path + "/blocks/" + block_id + "/checksums", - part->checksums.toString(), - storage.zookeeper.getDefaultACL(), - zkutil::CreateMode::Persistent)); - ops.push_back(new zkutil::Op::Create( - storage.zookeeper_path + "/blocks/" + block_id + "/number", - toString(part_number), - storage.zookeeper.getDefaultACL(), - zkutil::CreateMode::Persistent)); + if (!block_id.empty()) + { + ops.push_back(new zkutil::Op::Create( + storage.zookeeper_path + "/blocks/" + block_id, + "", + storage.zookeeper.getDefaultACL(), + zkutil::CreateMode::Persistent)); + ops.push_back(new zkutil::Op::Create( + storage.zookeeper_path + "/blocks/" + block_id + "/checksums", + checksums_str, + storage.zookeeper.getDefaultACL(), + zkutil::CreateMode::Persistent)); + ops.push_back(new zkutil::Op::Create( + storage.zookeeper_path + "/blocks/" + block_id + "/number", + toString(part_number), + storage.zookeeper.getDefaultACL(), + zkutil::CreateMode::Persistent)); + } ops.push_back(new zkutil::Op::Create( storage.replica_path + "/parts/" + part->name, "", storage.zookeeper.getDefaultACL(), zkutil::CreateMode::Persistent)); + ops.push_back(new zkutil::Op::Create( + storage.replica_path + "/parts/" + part->name + "/checksums", + checksums_str, + storage.zookeeper.getDefaultACL(), + zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create( storage.replica_path + "/log/log-", log_entry.toString(), @@ -86,6 +98,8 @@ private: StorageReplicatedMergeTree & storage; String insert_id; size_t block_index; + + Logger * log; }; } diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h new file mode 100644 index 00000000000..c509bb526c9 --- /dev/null +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h @@ -0,0 +1,131 @@ +#pragma once + +#include +#include +#include +#include +#include + + +namespace DB +{ + +class ReplicatedMergeTreePartsServer : public InterserverIOEndpoint +{ +public: + ReplicatedMergeTreePartsServer(MergeTreeData & data_, StoragePtr owned_storage_) : data(data_), + owned_storage(owned_storage_), log(&Logger::get("ReplicatedMergeTreePartsServer")) {} + + void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override + { + String part_name = params.get("part"); + LOG_TRACE(log, "Sending part " << part_name); + + auto storage_lock = owned_storage->lockStructure(false); + + MergeTreeData::DataPartPtr part = findPart(part_name); + + /// Список файлов возьмем из списка контрольных сумм. + MergeTreeData::DataPart::Checksums checksums = part->checksums; + checksums.files["checksums.txt"]; + + writeBinary(checksums.files.size(), out); + for (const auto & it : checksums.files) + { + String path = data.getFullPath() + part_name + "/" + it.first; + UInt64 size = Poco::File(path).getSize(); + + writeStringBinary(it.first, out); + writeBinary(size, out); + + ReadBufferFromFile file_in(path); + HashingWriteBuffer hashing_out(out); + copyData(file_in, hashing_out); + + if (hashing_out.count() != size) + throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART); + + writeBinary(hashing_out.getHash(), out); + } + } + +private: + MergeTreeData & data; + StoragePtr owned_storage; + + Logger * log; + + MergeTreeData::DataPartPtr findPart(const String & name) + { + MergeTreeData::DataParts parts = data.getDataParts(); + for (const auto & part : parts) + { + if (part->name == name) + return part; + } + throw Exception("No part " + name + " in table"); + } +}; + +class ReplicatedMergeTreePartsFetcher +{ +public: + ReplicatedMergeTreePartsFetcher(MergeTreeData & data_) : data(data_), log(&Logger::get("ReplicatedMergeTreePartsFetcher")) {} + + /// Скачивает кусок в tmp_директорию, проверяет чексуммы. + MergeTreeData::MutableDataPartPtr fetchPart( + const String & part_name, + const String & replica_path, + const String & host, + int port) + { + LOG_TRACE(log, "Fetching part " << part_name); + ReadBufferFromHTTP::Params params = { + std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path), + std::make_pair("part", part_name)}; + ReadBufferFromHTTP in(host, port, params); + + String part_path = data.getFullPath() + "tmp_" + part_name + "/"; + if (!Poco::File(part_path).createDirectory()) + throw Exception("Directory " + part_path + " already exists"); + + size_t files; + readBinary(files, in); + for (size_t i = 0; i < files; ++i) + { + String file_name; + UInt64 file_size; + + readStringBinary(file_name, in); + readBinary(file_size, in); + + WriteBufferFromFile file_out(part_path + file_name); + HashingWriteBuffer hashing_out(file_out); + copyData(in, hashing_out, file_size); + + uint128 expected_hash; + readBinary(expected_hash, in); + + if (expected_hash != hashing_out.getHash()) + throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path); + } + + assertEOF(in); + + MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); + data.parsePartName(part_name, *new_data_part); + new_data_part->name = "tmp_" + part_name; + new_data_part->modification_time = time(0); + new_data_part->loadIndex(); + new_data_part->loadChecksums(); + + return new_data_part; + } + +private: + MergeTreeData & data; + + Logger * log; +}; + +} diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 018386ca0dc..0c1b01f0377 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -103,23 +103,6 @@ private: typedef std::list LogEntries; - class MyInterserverIOEndpoint : public InterserverIOEndpoint - { - public: - MyInterserverIOEndpoint(StorageReplicatedMergeTree & storage_) : storage(storage_) {} - - void setOwnedStorage(StoragePtr owned_storage_) - { - owned_storage = owned_storage_; - } - - void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override; - - private: - StorageReplicatedMergeTree & storage; - StoragePtr owned_storage; - }; - Context & context; zkutil::ZooKeeper & zookeeper; diff --git a/dbms/src/IO/copyData.cpp b/dbms/src/IO/copyData.cpp index adf1e116f1a..4eed3510b2f 100644 --- a/dbms/src/IO/copyData.cpp +++ b/dbms/src/IO/copyData.cpp @@ -14,5 +14,19 @@ void copyData(ReadBuffer & from, WriteBuffer & to) from.position() = from.buffer().end(); } } + +void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes) +{ + while (bytes > 0 && !from.eof()) + { + size_t count = std::min(bytes, static_cast(from.buffer().end() - from.position())); + to.write(from.position(), count); + from.position() += count; + bytes -= count; + } + + if (bytes > 0) + throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF); +} } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index a5d5d081179..bd534fbd002 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -2,7 +2,6 @@ #include #include #include -#include #include #include #include @@ -120,8 +119,18 @@ String MergeTreeData::getPartName(DayNum_t left_date, DayNum_t right_date, UInt6 } -void MergeTreeData::parsePartName(const String & file_name, const Poco::RegularExpression::MatchVec & matches, DataPart & part) +void MergeTreeData::parsePartName(const String & file_name, DataPart & part, const Poco::RegularExpression::MatchVec * matches_p) { + Poco::RegularExpression::MatchVec match_vec; + if (!matches_p) + { + if (!isPartDirectory(file_name, match_vec)) + throw Exception("Unexpected part name: " + file_name, ErrorCodes::BAD_DATA_PART_NAME); + matches_p = &match_vec; + } + + const Poco::RegularExpression::MatchVec & matches = *matches_p; + DateLUTSingleton & date_lut = DateLUTSingleton::instance(); part.left_date = date_lut.toDayNum(OrderedIdentifier2Date(file_name.substr(matches[1].offset, matches[1].length))); @@ -181,7 +190,7 @@ void MergeTreeData::loadDataParts() continue; MutableDataPartPtr part = std::make_shared(*this); - parsePartName(file_name, matches, *part); + parsePartName(file_name, *part, &matches); part->name = file_name; /// Для битых кусков, которые могут образовываться после грубого перезапуска сервера, попытаться восстановить куски, из которых они сделаны. @@ -202,10 +211,6 @@ void MergeTreeData::loadDataParts() continue; } - /// Размер - в количестве засечек. - part->size = Poco::File(full_path + file_name + "/" + escapeForFileName(columns->front().first) + ".mrk").getSize() - / MERGE_TREE_MARK_SIZE; - part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime(); try @@ -618,9 +623,8 @@ Strings MergeTreeData::tryRestorePart(const String & path, const String & file_n Poco::RegularExpression::MatchVec matches; Strings restored_parts; - isPartDirectory(file_name, matches); DataPart broken_part(*this); - parsePartName(file_name, matches, broken_part); + parsePartName(file_name, broken_part); for (int i = static_cast(old_parts.size()) - 1; i >= 0; --i) { @@ -632,7 +636,7 @@ Strings MergeTreeData::tryRestorePart(const String & path, const String & file_n old_parts.erase(old_parts.begin() + i); continue; } - parsePartName(name, matches, old_part); + parsePartName(name, old_part, &matches); if (broken_part.contains(old_part)) { /// Восстанавливаем все содержащиеся куски. Если некоторые из них содержатся в других, их удалит loadDataParts. diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index d0772fb0c21..5ad7779d7cd 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -7,14 +8,6 @@ namespace DB { -void StorageReplicatedMergeTree::MyInterserverIOEndpoint::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) -{ - writeString("Hello. You requested part ", out); - writeString(params.get("part"), out); - writeString(".", out); -} - - StorageReplicatedMergeTree::StorageReplicatedMergeTree( const String & zookeeper_path_, const String & replica_name_, @@ -59,10 +52,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( checkParts(); } - String endpoint_name = "ReplicatedMergeTree:" + replica_path; - InterserverIOEndpointPtr endpoint = new MyInterserverIOEndpoint(*this); - endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, context.getInterserverIOHandler()); - activateReplica(); } @@ -84,7 +73,9 @@ StoragePtr StorageReplicatedMergeTree::create( path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_); StoragePtr res_ptr = res->thisPtr(); - dynamic_cast(*res->endpoint_holder->getEndpoint()).setOwnedStorage(res_ptr); + String endpoint_name = "ReplicatedMergeTree:" + res->replica_path; + InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr); + res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler()); return res_ptr; } @@ -127,6 +118,7 @@ void StorageReplicatedMergeTree::createTable() zookeeper.create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent); zookeeper.create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent); zookeeper.create(zookeeper_path + "/block-numbers", "", zkutil::CreateMode::Persistent); + zookeeper.create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent); } /** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata). diff --git a/libs/libzkutil/include/zkutil/KeeperException.h b/libs/libzkutil/include/zkutil/KeeperException.h index ff5d96f6278..46c671a6e22 100644 --- a/libs/libzkutil/include/zkutil/KeeperException.h +++ b/libs/libzkutil/include/zkutil/KeeperException.h @@ -15,6 +15,10 @@ public: KeeperException(ReturnCode::type code_) : DB::Exception(ReturnCode::toString(code_)), code(code_) {} + const char * name() const throw() { return "zkutil::KeeperException"; } + const char * className() const throw() { return "zkutil::KeeperException"; } + KeeperException * clone() const { return new KeeperException(message(), code); } + ReturnCode::type code; };