dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-29 15:13:21 +03:00
parent c64917506d
commit de5c67024e
8 changed files with 91 additions and 47 deletions

View File

@ -109,6 +109,9 @@ public:
/// Проверить, есть ли данные, которые можно прочитать. /// Проверить, есть ли данные, которые можно прочитать.
bool poll(size_t timeout_microseconds = 0); bool poll(size_t timeout_microseconds = 0);
/// Проверить, есть ли данные в буфере для чтения.
bool hasReadBufferPendingData();
/// Получить пакет от сервера. /// Получить пакет от сервера.
Packet receivePacket(); Packet receivePacket();

View File

@ -45,7 +45,7 @@ namespace DB
private: private:
/// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
/// Возвращает соединение на реплику, с которой можно прочитать данные, если такая есть. /// Возвращает соединение на такую реплику, если оно найдётся.
Connection ** waitForReadEvent(); Connection ** waitForReadEvent();
private: private:

View File

@ -143,6 +143,12 @@ public:
return read(to, n); return read(to, n);
} }
/** Проверить, есть ли данные в буфере для чтения. */
bool hasPendingData()
{
return offset() != buffer().size();
}
private: private:
/** Прочитать следующие данные и заполнить ими буфер. /** Прочитать следующие данные и заполнить ими буфер.
* Вернуть false в случае конца, true иначе. * Вернуть false в случае конца, true иначе.

View File

@ -6,13 +6,13 @@ namespace DB
{ {
/** Этот класс разбивает объект типа RangesInDataParts (см. MergeTreeDataSelectExecutor) /** Этот класс разбивает объект типа RangesInDataParts (см. MergeTreeDataSelectExecutor)
* на указанное количество частей. * на не больше, чем указанное количество сегментов.
*/ */
class PartsWithRangesSplitter final class PartsWithRangesSplitter final
{ {
public: public:
PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
size_t min_segment_size_, size_t max_segments_count_); size_t granularity_, size_t min_segment_size_, size_t max_segments_count_);
~PartsWithRangesSplitter() = default; ~PartsWithRangesSplitter() = default;
PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete; PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete;
@ -43,8 +43,8 @@ private:
MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part; MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part;
size_t total_size; size_t total_size;
size_t remaining_size;
const size_t granularity;
const size_t min_segment_size; const size_t min_segment_size;
const size_t max_segments_count; const size_t max_segments_count;

View File

@ -360,6 +360,12 @@ bool Connection::poll(size_t timeout_microseconds)
} }
bool Connection::hasReadBufferPendingData()
{
return static_cast<ReadBufferFromPocoSocket &>(*in).hasPendingData();
}
Connection::Packet Connection::receivePacket() Connection::Packet Connection::receivePacket()
{ {
//LOG_TRACE(log_wrapper.get(), "Receiving packet (" << getServerAddress() << ")"); //LOG_TRACE(log_wrapper.get(), "Receiving packet (" << getServerAddress() << ")");

View File

@ -1,4 +1,5 @@
#include <DB/Client/ShardReplicas.h> #include <DB/Client/ShardReplicas.h>
#include <boost/concept_check.hpp>
namespace DB namespace DB
{ {
@ -21,8 +22,8 @@ namespace DB
void ShardReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data) void ShardReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{ {
if (sent_query) if (!sent_query)
throw Exception("Cannot send external tables data: query already sent."); throw Exception("Cannot send external tables data: query not yet sent.");
if (data.size() < active_connection_count) if (data.size() < active_connection_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
@ -51,8 +52,8 @@ namespace DB
Connection * connection = e.second; Connection * connection = e.second;
if (connection != nullptr) if (connection != nullptr)
{ {
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
query_settings.parallel_replica_offset = offset; query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
++offset; ++offset;
} }
} }
@ -190,23 +191,32 @@ namespace DB
Connection ** ShardReplicas::waitForReadEvent() Connection ** ShardReplicas::waitForReadEvent()
{ {
Poco::Net::Socket::SocketList read_list; Poco::Net::Socket::SocketList read_list;
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
read_list.reserve(active_connection_count); read_list.reserve(active_connection_count);
for (auto & e : replica_hash) for (auto & e : replica_hash)
{ {
Connection * connection = e.second; Connection * connection = e.second;
if (connection != nullptr) if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket); read_list.push_back(connection->socket);
} }
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000); if (read_list.empty())
if (n == 0) {
return nullptr; Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
auto & socket = read_list[rand() % n]; for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000);
if (n == 0)
return nullptr;
}
auto & socket = read_list[rand() % read_list.size()];
auto it = replica_hash.find(socket.impl()->sockfd()); auto it = replica_hash.find(socket.impl()->sockfd());
if (it == replica_hash.end()) if (it == replica_hash.end())
throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA); throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA);

View File

@ -246,20 +246,24 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
parts_with_ranges.push_back(ranges); parts_with_ranges.push_back(ranges);
} }
if (settings.parallel_replicas_count > 1) UInt64 parallel_replicas_count = UInt64(settings.parallel_replicas_count);
UInt64 parallel_replica_offset = UInt64(settings.parallel_replica_offset);
if (parallel_replicas_count > 1)
{ {
PartsWithRangesSplitter splitter(parts_with_ranges, data.settings.min_rows_for_seek, // Разбиваем массив на не больше, чем N сегментов, где N - количество реплик,
settings.parallel_replicas_count); PartsWithRangesSplitter splitter(parts_with_ranges, data.index_granularity, data.settings.min_rows_for_seek,
parallel_replicas_count);
auto segments = splitter.perform(); auto segments = splitter.perform();
if (settings.parallel_replica_offset >= segments.size()) if (!segments.empty())
return BlockInputStreams();
if (segments.size() > 1)
{ {
/// Для каждого элемента массива segments, вычисляем его хэш if (parallel_replica_offset >= segments.size())
/// Сортируем массив segments по хэшу. return BlockInputStreams();
/// Выбираем k-й элемент массива segments, где k - наш номер реплики.
/// Для каждого сегмента, вычисляем его хэш.
/// Сортируем массив сегментов по хэшу.
/// Выбираем сегмент соответствующий текущей реплике.
using Entry = std::pair<std::pair<UInt64, UInt64>, RangesInDataParts *>; using Entry = std::pair<std::pair<UInt64, UInt64>, RangesInDataParts *>;
std::vector<Entry> hashed_segments; std::vector<Entry> hashed_segments;
@ -276,7 +280,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
return lhs.first < rhs.first; return lhs.first < rhs.first;
}); });
parts_with_ranges = std::move(*(hashed_segments[settings.parallel_replica_offset].second)); parts_with_ranges = std::move(*(hashed_segments[parallel_replica_offset].second));
}
else
{
/// Получаем данные только от первой реплики.
if (parallel_replica_offset > 0)
return BlockInputStreams();
} }
} }

View File

@ -3,12 +3,12 @@
namespace DB namespace DB
{ {
PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
size_t min_segment_size_, size_t max_segments_count_) size_t granularity_, size_t min_segment_size_, size_t max_segments_count_)
: input(input_), : input(input_),
current_output_part(nullptr), current_output_part(nullptr),
total_size(0), total_size(0),
remaining_size(0), granularity(granularity_),
min_segment_size(min_segment_size_), min_segment_size(min_segment_size_),
max_segments_count(max_segments_count_), max_segments_count(max_segments_count_),
segment_size(0), segment_size(0),
@ -27,28 +27,40 @@ PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecut
for (const auto & range : ranges) for (const auto & range : ranges)
total_size += range.end - range.begin; total_size += range.end - range.begin;
} }
total_size *= granularity;
if ((total_size == 0) || (min_segment_size == 0) || (max_segments_count < 2) if ((total_size == 0) || (min_segment_size == 0) || (max_segments_count < 2))
|| (total_size < min_segment_size))
throw Exception("One or more arguments are invalid.", ErrorCodes::BAD_ARGUMENTS); throw Exception("One or more arguments are invalid.", ErrorCodes::BAD_ARGUMENTS);
} }
std::vector<MergeTreeDataSelectExecutor::RangesInDataParts> PartsWithRangesSplitter::perform() std::vector<MergeTreeDataSelectExecutor::RangesInDataParts> PartsWithRangesSplitter::perform()
{ {
init(); if (total_size > min_segment_size)
while (emitRange()) {} {
init();
while (emitRange()) {}
}
return output_segments; return output_segments;
} }
void PartsWithRangesSplitter::init() void PartsWithRangesSplitter::init()
{ {
remaining_size = total_size; output_segments.clear();
size_t segments_count = max_segments_count; // Вычислить размер сегментов так, чтобы он был кратен granularity
while ((segments_count > 0) && (total_size < (min_segment_size * segments_count))) segment_size = total_size / std::min(max_segments_count, (total_size / min_segment_size));
--segments_count; unsigned int scale = segment_size / granularity;
if (segment_size % granularity != 0) {
++scale;
}
segment_size = granularity * scale;
// Посчитать количество сегментов.
size_t segments_count = total_size / segment_size;
if (total_size % segment_size != 0) {
++segments_count;
}
segment_size = total_size / segments_count;
output_segments.resize(segments_count); output_segments.resize(segments_count);
input_part = input.begin(); input_part = input.begin();
@ -65,10 +77,12 @@ void PartsWithRangesSplitter::init()
bool PartsWithRangesSplitter::emitRange() bool PartsWithRangesSplitter::emitRange()
{ {
size_t new_size = std::min(range_end - range_begin, segment_end - segment_begin); size_t new_size = std::min((range_end - range_begin) * granularity, segment_end - segment_begin);
current_output_part->ranges.push_back(MarkRange(range_begin, range_begin + new_size)); size_t end = range_begin + new_size / granularity;
range_begin += new_size; current_output_part->ranges.push_back(MarkRange(range_begin, end));
range_begin = end;
segment_begin += new_size; segment_begin += new_size;
if (isSegmentConsumed()) if (isSegmentConsumed())
@ -121,13 +135,8 @@ void PartsWithRangesSplitter::initRangeInfo()
void PartsWithRangesSplitter::initSegmentInfo() void PartsWithRangesSplitter::initSegmentInfo()
{ {
addPart(); addPart();
segment_begin = 0; segment_begin = 0;
segment_end = segment_size; segment_end = segment_size;
remaining_size -= segment_size;
if (remaining_size < segment_size)
segment_end += remaining_size;
} }
void PartsWithRangesSplitter::addPart() void PartsWithRangesSplitter::addPart()