dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-29 15:13:21 +03:00
parent 1455194519
commit 60430d7ddc
8 changed files with 91 additions and 47 deletions

View File

@ -109,6 +109,9 @@ public:
/// Проверить, есть ли данные, которые можно прочитать.
bool poll(size_t timeout_microseconds = 0);
/// Проверить, есть ли данные в буфере для чтения.
bool hasReadBufferPendingData();
/// Получить пакет от сервера.
Packet receivePacket();

View File

@ -45,7 +45,7 @@ namespace DB
private:
/// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
/// Возвращает соединение на реплику, с которой можно прочитать данные, если такая есть.
/// Возвращает соединение на такую реплику, если оно найдётся.
Connection ** waitForReadEvent();
private:

View File

@ -143,6 +143,12 @@ public:
return read(to, n);
}
/** Проверить, есть ли данные в буфере для чтения. */
bool hasPendingData()
{
return offset() != buffer().size();
}
private:
/** Прочитать следующие данные и заполнить ими буфер.
* Вернуть false в случае конца, true иначе.

View File

@ -6,13 +6,13 @@ namespace DB
{
/** Этот класс разбивает объект типа RangesInDataParts (см. MergeTreeDataSelectExecutor)
* на указанное количество частей.
* на не больше, чем указанное количество сегментов.
*/
class PartsWithRangesSplitter final
{
public:
PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
size_t min_segment_size_, size_t max_segments_count_);
PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
size_t granularity_, size_t min_segment_size_, size_t max_segments_count_);
~PartsWithRangesSplitter() = default;
PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete;
@ -43,8 +43,8 @@ private:
MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part;
size_t total_size;
size_t remaining_size;
const size_t granularity;
const size_t min_segment_size;
const size_t max_segments_count;

View File

@ -360,6 +360,12 @@ bool Connection::poll(size_t timeout_microseconds)
}
bool Connection::hasReadBufferPendingData()
{
return static_cast<ReadBufferFromPocoSocket &>(*in).hasPendingData();
}
Connection::Packet Connection::receivePacket()
{
//LOG_TRACE(log_wrapper.get(), "Receiving packet (" << getServerAddress() << ")");

View File

@ -1,4 +1,5 @@
#include <DB/Client/ShardReplicas.h>
#include <boost/concept_check.hpp>
namespace DB
{
@ -21,8 +22,8 @@ namespace DB
void ShardReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
if (sent_query)
throw Exception("Cannot send external tables data: query already sent.");
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.");
if (data.size() < active_connection_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
@ -51,8 +52,8 @@ namespace DB
Connection * connection = e.second;
if (connection != nullptr)
{
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
++offset;
}
}
@ -190,23 +191,32 @@ namespace DB
Connection ** ShardReplicas::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
read_list.reserve(active_connection_count);
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000);
if (n == 0)
return nullptr;
if (read_list.empty())
{
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
auto & socket = read_list[rand() % n];
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000);
if (n == 0)
return nullptr;
}
auto & socket = read_list[rand() % read_list.size()];
auto it = replica_hash.find(socket.impl()->sockfd());
if (it == replica_hash.end())
throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA);

View File

@ -246,20 +246,24 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
parts_with_ranges.push_back(ranges);
}
if (settings.parallel_replicas_count > 1)
UInt64 parallel_replicas_count = UInt64(settings.parallel_replicas_count);
UInt64 parallel_replica_offset = UInt64(settings.parallel_replica_offset);
if (parallel_replicas_count > 1)
{
PartsWithRangesSplitter splitter(parts_with_ranges, data.settings.min_rows_for_seek,
settings.parallel_replicas_count);
// Разбиваем массив на не больше, чем N сегментов, где N - количество реплик,
PartsWithRangesSplitter splitter(parts_with_ranges, data.index_granularity, data.settings.min_rows_for_seek,
parallel_replicas_count);
auto segments = splitter.perform();
if (settings.parallel_replica_offset >= segments.size())
return BlockInputStreams();
if (segments.size() > 1)
if (!segments.empty())
{
/// Для каждого элемента массива segments, вычисляем его хэш
/// Сортируем массив segments по хэшу.
/// Выбираем k-й элемент массива segments, где k - наш номер реплики.
if (parallel_replica_offset >= segments.size())
return BlockInputStreams();
/// Для каждого сегмента, вычисляем его хэш.
/// Сортируем массив сегментов по хэшу.
/// Выбираем сегмент соответствующий текущей реплике.
using Entry = std::pair<std::pair<UInt64, UInt64>, RangesInDataParts *>;
std::vector<Entry> hashed_segments;
@ -276,7 +280,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
return lhs.first < rhs.first;
});
parts_with_ranges = std::move(*(hashed_segments[settings.parallel_replica_offset].second));
parts_with_ranges = std::move(*(hashed_segments[parallel_replica_offset].second));
}
else
{
/// Получаем данные только от первой реплики.
if (parallel_replica_offset > 0)
return BlockInputStreams();
}
}

View File

@ -3,12 +3,12 @@
namespace DB
{
PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
size_t min_segment_size_, size_t max_segments_count_)
PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
size_t granularity_, size_t min_segment_size_, size_t max_segments_count_)
: input(input_),
current_output_part(nullptr),
total_size(0),
remaining_size(0),
granularity(granularity_),
min_segment_size(min_segment_size_),
max_segments_count(max_segments_count_),
segment_size(0),
@ -27,28 +27,40 @@ PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecut
for (const auto & range : ranges)
total_size += range.end - range.begin;
}
total_size *= granularity;
if ((total_size == 0) || (min_segment_size == 0) || (max_segments_count < 2)
|| (total_size < min_segment_size))
if ((total_size == 0) || (min_segment_size == 0) || (max_segments_count < 2))
throw Exception("One or more arguments are invalid.", ErrorCodes::BAD_ARGUMENTS);
}
std::vector<MergeTreeDataSelectExecutor::RangesInDataParts> PartsWithRangesSplitter::perform()
{
init();
while (emitRange()) {}
if (total_size > min_segment_size)
{
init();
while (emitRange()) {}
}
return output_segments;
}
void PartsWithRangesSplitter::init()
{
remaining_size = total_size;
output_segments.clear();
size_t segments_count = max_segments_count;
while ((segments_count > 0) && (total_size < (min_segment_size * segments_count)))
--segments_count;
// Вычислить размер сегментов так, чтобы он был кратен granularity
segment_size = total_size / std::min(max_segments_count, (total_size / min_segment_size));
unsigned int scale = segment_size / granularity;
if (segment_size % granularity != 0) {
++scale;
}
segment_size = granularity * scale;
// Посчитать количество сегментов.
size_t segments_count = total_size / segment_size;
if (total_size % segment_size != 0) {
++segments_count;
}
segment_size = total_size / segments_count;
output_segments.resize(segments_count);
input_part = input.begin();
@ -65,10 +77,12 @@ void PartsWithRangesSplitter::init()
bool PartsWithRangesSplitter::emitRange()
{
size_t new_size = std::min(range_end - range_begin, segment_end - segment_begin);
current_output_part->ranges.push_back(MarkRange(range_begin, range_begin + new_size));
size_t new_size = std::min((range_end - range_begin) * granularity, segment_end - segment_begin);
size_t end = range_begin + new_size / granularity;
range_begin += new_size;
current_output_part->ranges.push_back(MarkRange(range_begin, end));
range_begin = end;
segment_begin += new_size;
if (isSegmentConsumed())
@ -121,13 +135,8 @@ void PartsWithRangesSplitter::initRangeInfo()
void PartsWithRangesSplitter::initSegmentInfo()
{
addPart();
segment_begin = 0;
segment_end = segment_size;
remaining_size -= segment_size;
if (remaining_size < segment_size)
segment_end += remaining_size;
}
void PartsWithRangesSplitter::addPart()