dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-21 17:35:49 +03:00
parent dc106fe423
commit 928b4f3a1d

View File

@ -254,8 +254,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
} }
} }
RangesInDataParts * final_parts_with_ranges = nullptr;
if (settings.parallel_replicas_count > 1) if (settings.parallel_replicas_count > 1)
{ {
PartsWithRangesSplitter splitter(parts_with_ranges, sum_marks, data.settings.min_rows_for_seek, PartsWithRangesSplitter splitter(parts_with_ranges, sum_marks, data.settings.min_rows_for_seek,
@ -285,20 +283,18 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
return lhs.first < rhs.first; return lhs.first < rhs.first;
}); });
final_parts_with_ranges = hashed_segments[settings.parallel_replica_offset].second; parts_with_ranges = std::move(*(hashed_segments[settings.parallel_replica_offset].second));
/// Пересчитываем количество засечек и диапазонов. /// Пересчитываем количество засечек и диапазонов.
sum_marks = 0; sum_marks = 0;
sum_ranges = 0; sum_ranges = 0;
for (const auto & part_with_ranges : *final_parts_with_ranges) for (const auto & part_with_ranges : parts_with_ranges)
{ {
sum_ranges += part_with_ranges.ranges.size(); sum_ranges += part_with_ranges.ranges.size();
for (const auto & range : part_with_ranges.ranges) for (const auto & range : part_with_ranges.ranges)
sum_marks += range.end - range.begin; sum_marks += range.end - range.begin;
} }
} }
else
final_parts_with_ranges = &parts_with_ranges;
} }
else else
{ {
@ -306,10 +302,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
return res; return res;
} }
} }
else
final_parts_with_ranges = &parts_with_ranges;
LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << final_parts_with_ranges->size() << " parts by key, " LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
<< sum_marks << " marks to read from " << sum_ranges << " ranges"); << sum_marks << " marks to read from " << sum_ranges << " ranges");
BlockInputStreams res; BlockInputStreams res;
@ -324,7 +318,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
res = spreadMarkRangesAmongThreadsFinal( res = spreadMarkRangesAmongThreadsFinal(
*final_parts_with_ranges, parts_with_ranges,
threads, threads,
column_names_to_read, column_names_to_read,
max_block_size, max_block_size,
@ -336,7 +330,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
else else
{ {
res = spreadMarkRangesAmongThreads( res = spreadMarkRangesAmongThreads(
*final_parts_with_ranges, parts_with_ranges,
threads, threads,
column_names_to_read, column_names_to_read,
max_block_size, max_block_size,