dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-30 17:06:51 +03:00
parent 35356191c9
commit a7f0279e1c
6 changed files with 16 additions and 14 deletions

View File

@ -109,7 +109,7 @@ public:
bool poll(size_t timeout_microseconds = 0);
/// Проверить, есть ли данные в буфере для чтения.
bool hasReadBufferPendingData();
bool hasReadBufferPendingData() const;
/// Получить пакет от сервера.
Packet receivePacket();

View File

@ -144,9 +144,9 @@ public:
}
/** Проверить, есть ли данные в буфере для чтения. */
bool hasPendingData()
bool hasPendingData() const
{
return offset() != buffer().size();
return offset() != working_buffer.size();
}
private:

View File

@ -5,6 +5,8 @@
namespace DB
{
using Segments = std::vector<MergeTreeDataSelectExecutor::RangesInDataParts>;
/** Этот класс разбивает объект типа RangesInDataParts (см. MergeTreeDataSelectExecutor)
* на не больше, чем указанное количество сегментов.
*/
@ -18,7 +20,7 @@ public:
PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete;
PartsWithRangesSplitter & operator=(const PartsWithRangesSplitter &) = delete;
std::vector<MergeTreeDataSelectExecutor::RangesInDataParts> perform();
Segments perform();
private:
void init();
@ -35,11 +37,11 @@ private:
// Входные данные.
const MergeTreeDataSelectExecutor::RangesInDataParts & input;
MergeTreeDataSelectExecutor::RangesInDataParts::const_iterator input_part;
std::vector<MarkRange>::const_iterator input_range;
MarkRanges::const_iterator input_range;
// Выходные данные.
std::vector<MergeTreeDataSelectExecutor::RangesInDataParts> output_segments;
std::vector<MergeTreeDataSelectExecutor::RangesInDataParts>::iterator current_output_segment;
Segments output_segments;
Segments::iterator current_output_segment;
MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part;
size_t total_size;

View File

@ -360,9 +360,9 @@ bool Connection::poll(size_t timeout_microseconds)
}
bool Connection::hasReadBufferPendingData()
bool Connection::hasReadBufferPendingData() const
{
return static_cast<ReadBufferFromPocoSocket &>(*in).hasPendingData();
return static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData();
}

View File

@ -15,10 +15,10 @@
namespace
{
std::pair<UInt64, UInt64> computeHash(const DB::MergeTreeDataSelectExecutor::RangesInDataParts & cluster)
std::pair<UInt64, UInt64> computeHash(const DB::MergeTreeDataSelectExecutor::RangesInDataParts & segment)
{
SipHash hash;
for (const auto & part_with_ranges : cluster)
for (const auto & part_with_ranges : segment)
{
const auto & part = *(part_with_ranges.data_part);
hash.update(part.name.c_str(), part.name.length());

View File

@ -29,13 +29,13 @@ PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecut
}
total_size *= granularity;
if ((total_size == 0) || (min_segment_size == 0) || (max_segments_count < 2))
if ((granularity == 0) || (min_segment_size == 0) || (max_segments_count == 0) || (total_size == 0))
throw Exception("One or more arguments are invalid.", ErrorCodes::BAD_ARGUMENTS);
}
std::vector<MergeTreeDataSelectExecutor::RangesInDataParts> PartsWithRangesSplitter::perform()
Segments PartsWithRangesSplitter::perform()
{
if (total_size > min_segment_size)
if ((max_segments_count > 1) && (total_size > min_segment_size))
{
init();
while (emitRange()) {}