From de5c67024e269b91bd894c098988773c99974801 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Thu, 29 Jan 2015 15:13:21 +0300 Subject: [PATCH] dbms: Server: queries with several replicas: development [#METR-14410] --- dbms/include/DB/Client/Connection.h | 3 ++ dbms/include/DB/Client/ShardReplicas.h | 2 +- dbms/include/DB/IO/ReadBuffer.h | 6 +++ .../MergeTree/PartsWithRangesSplitter.h | 8 +-- dbms/src/Client/Connection.cpp | 6 +++ dbms/src/Client/ShardReplicas.cpp | 32 +++++++----- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 32 +++++++----- .../MergeTree/PartsWithRangesSplitter.cpp | 49 +++++++++++-------- 8 files changed, 91 insertions(+), 47 deletions(-) diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index ec691394539..fa54d376f7c 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -109,6 +109,9 @@ public: /// Проверить, есть ли данные, которые можно прочитать. bool poll(size_t timeout_microseconds = 0); + /// Проверить, есть ли данные в буфере для чтения. + bool hasReadBufferPendingData(); + /// Получить пакет от сервера. Packet receivePacket(); diff --git a/dbms/include/DB/Client/ShardReplicas.h b/dbms/include/DB/Client/ShardReplicas.h index 41d1ad61d06..afca1781917 100644 --- a/dbms/include/DB/Client/ShardReplicas.h +++ b/dbms/include/DB/Client/ShardReplicas.h @@ -45,7 +45,7 @@ namespace DB private: /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. - /// Возвращает соединение на реплику, с которой можно прочитать данные, если такая есть. + /// Возвращает соединение на такую реплику, если оно найдётся. Connection ** waitForReadEvent(); private: diff --git a/dbms/include/DB/IO/ReadBuffer.h b/dbms/include/DB/IO/ReadBuffer.h index f1429ac80ab..48e4ab60c8e 100644 --- a/dbms/include/DB/IO/ReadBuffer.h +++ b/dbms/include/DB/IO/ReadBuffer.h @@ -143,6 +143,12 @@ public: return read(to, n); } + /** Проверить, есть ли данные в буфере для чтения. */ + bool hasPendingData() + { + return offset() != buffer().size(); + } + private: /** Прочитать следующие данные и заполнить ими буфер. * Вернуть false в случае конца, true иначе. diff --git a/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h b/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h index d8b5c46534d..4945e991803 100644 --- a/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h +++ b/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h @@ -6,13 +6,13 @@ namespace DB { /** Этот класс разбивает объект типа RangesInDataParts (см. MergeTreeDataSelectExecutor) - * на указанное количество частей. + * на не больше, чем указанное количество сегментов. */ class PartsWithRangesSplitter final { public: - PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, - size_t min_segment_size_, size_t max_segments_count_); + PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, + size_t granularity_, size_t min_segment_size_, size_t max_segments_count_); ~PartsWithRangesSplitter() = default; PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete; @@ -43,8 +43,8 @@ private: MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part; size_t total_size; - size_t remaining_size; + const size_t granularity; const size_t min_segment_size; const size_t max_segments_count; diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 3f14786085d..2ab93f91b1d 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -360,6 +360,12 @@ bool Connection::poll(size_t timeout_microseconds) } +bool Connection::hasReadBufferPendingData() +{ + return static_cast(*in).hasPendingData(); +} + + Connection::Packet Connection::receivePacket() { //LOG_TRACE(log_wrapper.get(), "Receiving packet (" << getServerAddress() << ")"); diff --git a/dbms/src/Client/ShardReplicas.cpp b/dbms/src/Client/ShardReplicas.cpp index 1adc33dbd68..693020097b0 100644 --- a/dbms/src/Client/ShardReplicas.cpp +++ b/dbms/src/Client/ShardReplicas.cpp @@ -1,4 +1,5 @@ #include +#include namespace DB { @@ -21,8 +22,8 @@ namespace DB void ShardReplicas::sendExternalTablesData(std::vector & data) { - if (sent_query) - throw Exception("Cannot send external tables data: query already sent."); + if (!sent_query) + throw Exception("Cannot send external tables data: query not yet sent."); if (data.size() < active_connection_count) throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); @@ -51,8 +52,8 @@ namespace DB Connection * connection = e.second; if (connection != nullptr) { - connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); query_settings.parallel_replica_offset = offset; + connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); ++offset; } } @@ -190,23 +191,32 @@ namespace DB Connection ** ShardReplicas::waitForReadEvent() { Poco::Net::Socket::SocketList read_list; - Poco::Net::Socket::SocketList write_list; - Poco::Net::Socket::SocketList except_list; - read_list.reserve(active_connection_count); for (auto & e : replica_hash) { Connection * connection = e.second; - if (connection != nullptr) + if ((connection != nullptr) && connection->hasReadBufferPendingData()) read_list.push_back(connection->socket); } - int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000); - if (n == 0) - return nullptr; + if (read_list.empty()) + { + Poco::Net::Socket::SocketList write_list; + Poco::Net::Socket::SocketList except_list; - auto & socket = read_list[rand() % n]; + for (auto & e : replica_hash) + { + Connection * connection = e.second; + if (connection != nullptr) + read_list.push_back(connection->socket); + } + int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000); + if (n == 0) + return nullptr; + } + + auto & socket = read_list[rand() % read_list.size()]; auto it = replica_hash.find(socket.impl()->sockfd()); if (it == replica_hash.end()) throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 9d486b93b28..c93aed31bda 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -246,20 +246,24 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( parts_with_ranges.push_back(ranges); } - if (settings.parallel_replicas_count > 1) + UInt64 parallel_replicas_count = UInt64(settings.parallel_replicas_count); + UInt64 parallel_replica_offset = UInt64(settings.parallel_replica_offset); + + if (parallel_replicas_count > 1) { - PartsWithRangesSplitter splitter(parts_with_ranges, data.settings.min_rows_for_seek, - settings.parallel_replicas_count); + // Разбиваем массив на не больше, чем N сегментов, где N - количество реплик, + PartsWithRangesSplitter splitter(parts_with_ranges, data.index_granularity, data.settings.min_rows_for_seek, + parallel_replicas_count); auto segments = splitter.perform(); - if (settings.parallel_replica_offset >= segments.size()) - return BlockInputStreams(); - - if (segments.size() > 1) + if (!segments.empty()) { - /// Для каждого элемента массива segments, вычисляем его хэш - /// Сортируем массив segments по хэшу. - /// Выбираем k-й элемент массива segments, где k - наш номер реплики. + if (parallel_replica_offset >= segments.size()) + return BlockInputStreams(); + + /// Для каждого сегмента, вычисляем его хэш. + /// Сортируем массив сегментов по хэшу. + /// Выбираем сегмент соответствующий текущей реплике. using Entry = std::pair, RangesInDataParts *>; std::vector hashed_segments; @@ -276,7 +280,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( return lhs.first < rhs.first; }); - parts_with_ranges = std::move(*(hashed_segments[settings.parallel_replica_offset].second)); + parts_with_ranges = std::move(*(hashed_segments[parallel_replica_offset].second)); + } + else + { + /// Получаем данные только от первой реплики. + if (parallel_replica_offset > 0) + return BlockInputStreams(); } } diff --git a/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp b/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp index 9f07e0c705d..e9322f7d58f 100644 --- a/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp +++ b/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp @@ -3,12 +3,12 @@ namespace DB { -PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, - size_t min_segment_size_, size_t max_segments_count_) +PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, + size_t granularity_, size_t min_segment_size_, size_t max_segments_count_) : input(input_), current_output_part(nullptr), total_size(0), - remaining_size(0), + granularity(granularity_), min_segment_size(min_segment_size_), max_segments_count(max_segments_count_), segment_size(0), @@ -27,28 +27,40 @@ PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecut for (const auto & range : ranges) total_size += range.end - range.begin; } + total_size *= granularity; - if ((total_size == 0) || (min_segment_size == 0) || (max_segments_count < 2) - || (total_size < min_segment_size)) + if ((total_size == 0) || (min_segment_size == 0) || (max_segments_count < 2)) throw Exception("One or more arguments are invalid.", ErrorCodes::BAD_ARGUMENTS); } std::vector PartsWithRangesSplitter::perform() { - init(); - while (emitRange()) {} + if (total_size > min_segment_size) + { + init(); + while (emitRange()) {} + } return output_segments; } void PartsWithRangesSplitter::init() { - remaining_size = total_size; + output_segments.clear(); - size_t segments_count = max_segments_count; - while ((segments_count > 0) && (total_size < (min_segment_size * segments_count))) - --segments_count; + // Вычислить размер сегментов так, чтобы он был кратен granularity + segment_size = total_size / std::min(max_segments_count, (total_size / min_segment_size)); + unsigned int scale = segment_size / granularity; + if (segment_size % granularity != 0) { + ++scale; + } + segment_size = granularity * scale; + + // Посчитать количество сегментов. + size_t segments_count = total_size / segment_size; + if (total_size % segment_size != 0) { + ++segments_count; + } - segment_size = total_size / segments_count; output_segments.resize(segments_count); input_part = input.begin(); @@ -65,10 +77,12 @@ void PartsWithRangesSplitter::init() bool PartsWithRangesSplitter::emitRange() { - size_t new_size = std::min(range_end - range_begin, segment_end - segment_begin); - current_output_part->ranges.push_back(MarkRange(range_begin, range_begin + new_size)); + size_t new_size = std::min((range_end - range_begin) * granularity, segment_end - segment_begin); + size_t end = range_begin + new_size / granularity; - range_begin += new_size; + current_output_part->ranges.push_back(MarkRange(range_begin, end)); + + range_begin = end; segment_begin += new_size; if (isSegmentConsumed()) @@ -121,13 +135,8 @@ void PartsWithRangesSplitter::initRangeInfo() void PartsWithRangesSplitter::initSegmentInfo() { addPart(); - segment_begin = 0; segment_end = segment_size; - - remaining_size -= segment_size; - if (remaining_size < segment_size) - segment_end += remaining_size; } void PartsWithRangesSplitter::addPart()