mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
dbms: Server: queries with several replicas: development [#METR-14410]
This commit is contained in:
parent
fd4cfb2b2e
commit
f618f02d23
@ -237,21 +237,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
||||
RangesInDataParts parts_with_ranges;
|
||||
|
||||
/// Найдем, какой диапазон читать из каждого куска.
|
||||
size_t sum_marks = 0;
|
||||
size_t sum_ranges = 0;
|
||||
for (auto & part : parts)
|
||||
{
|
||||
RangesInDataPart ranges(part, (*part_index)++);
|
||||
ranges.ranges = markRangesFromPkRange(part->index, key_condition);
|
||||
|
||||
if (!ranges.ranges.empty())
|
||||
{
|
||||
parts_with_ranges.push_back(ranges);
|
||||
|
||||
sum_ranges += ranges.ranges.size();
|
||||
for (const auto & range : ranges.ranges)
|
||||
sum_marks += range.end - range.begin;
|
||||
}
|
||||
}
|
||||
|
||||
if (settings.parallel_replicas_count > 1)
|
||||
@ -260,47 +252,44 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
||||
settings.parallel_replicas_count);
|
||||
auto segments = splitter.perform();
|
||||
|
||||
if (settings.parallel_replica_offset < segments.size())
|
||||
if (settings.parallel_replica_offset >= segments.size())
|
||||
return BlockInputStreams();
|
||||
|
||||
if (segments.size() > 1)
|
||||
{
|
||||
if (segments.size() > 1)
|
||||
/// Для каждого элемента массива segments, вычисляем его хэш
|
||||
/// Сортируем массив segments по хэшу.
|
||||
/// Выбираем k-й элемент массива segments, где k - наш номер реплики.
|
||||
|
||||
using Entry = std::pair<std::pair<UInt64, UInt64>, RangesInDataParts *>;
|
||||
std::vector<Entry> hashed_segments;
|
||||
hashed_segments.reserve(segments.size());
|
||||
|
||||
for (auto & segment : segments)
|
||||
{
|
||||
/// Для каждого элемента массива segments, вычисляем его хэш
|
||||
/// Сортируем массив segments по хэшу.
|
||||
/// Выбираем k-й элемент массива segments, где k - наш номер реплики.
|
||||
|
||||
using Entry = std::pair<std::pair<UInt64, UInt64>, RangesInDataParts *>;
|
||||
std::vector<Entry> hashed_segments;
|
||||
hashed_segments.reserve(segments.size());
|
||||
|
||||
for (auto & segment : segments)
|
||||
{
|
||||
Entry entry = std::make_pair(computeHash(segment), &segment);
|
||||
hashed_segments.push_back(entry);
|
||||
}
|
||||
|
||||
std::sort(hashed_segments.begin(), hashed_segments.end(), [&](const Entry & lhs, const Entry & rhs)
|
||||
{
|
||||
return lhs.first < rhs.first;
|
||||
});
|
||||
|
||||
parts_with_ranges = std::move(*(hashed_segments[settings.parallel_replica_offset].second));
|
||||
|
||||
/// Пересчитываем количество засечек и диапазонов.
|
||||
sum_marks = 0;
|
||||
sum_ranges = 0;
|
||||
for (const auto & part_with_ranges : parts_with_ranges)
|
||||
{
|
||||
sum_ranges += part_with_ranges.ranges.size();
|
||||
for (const auto & range : part_with_ranges.ranges)
|
||||
sum_marks += range.end - range.begin;
|
||||
}
|
||||
Entry entry = std::make_pair(computeHash(segment), &segment);
|
||||
hashed_segments.push_back(entry);
|
||||
}
|
||||
|
||||
std::sort(hashed_segments.begin(), hashed_segments.end(), [](const Entry & lhs, const Entry & rhs)
|
||||
{
|
||||
return lhs.first < rhs.first;
|
||||
});
|
||||
|
||||
parts_with_ranges = std::move(*(hashed_segments[settings.parallel_replica_offset].second));
|
||||
}
|
||||
else
|
||||
{
|
||||
BlockInputStreams res;
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
/// Считаем количество засечек и диапазонов.
|
||||
size_t sum_marks = 0;
|
||||
size_t sum_ranges = 0;
|
||||
sum_marks = 0;
|
||||
sum_ranges = 0;
|
||||
for (const auto & part_with_ranges : parts_with_ranges)
|
||||
{
|
||||
sum_ranges += part_with_ranges.ranges.size();
|
||||
for (const auto & range : part_with_ranges.ranges)
|
||||
sum_marks += range.end - range.begin;
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
|
||||
|
Loading…
Reference in New Issue
Block a user