dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-21 15:24:29 +03:00
parent 8135c2286d
commit 92fb1d2689

View File

@ -254,47 +254,56 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
RangesInDataParts * final_parts_with_ranges = nullptr;
if (settings.parallel_replicas_count > 0)
if (settings.parallel_replicas_count > 1)
{
PartsWithRangesSplitter splitter(parts_with_ranges, sum_marks, data.settings.min_rows_for_seek,
settings.parallel_replicas_count);
auto per_replica_parts_with_ranges = splitter.perform();
if ((per_replica_parts_with_ranges.size() > 1) && (settings.parallel_replica_offset < per_replica_parts_with_ranges.size()))
if (settings.parallel_replica_offset < per_replica_parts_with_ranges.size())
{
/// Для каждого элемента массива per_replica_parts_with_ranges, вычисляем его хэш
/// Сортируем массив per_replica_parts_with_ranges по хэшу.
/// Выбираем k-й элемент массива per_replica_parts_with_ranges, где k - наш номер реплики.
using Entry = std::pair<std::pair<UInt64, UInt64>, RangesInDataParts *>;
std::vector<Entry> hashed_replica_parts;
for (auto & cluster : per_replica_parts_with_ranges)
if (per_replica_parts_with_ranges.size() > 1)
{
Entry entry = std::make_pair(computeHash(cluster), &cluster);
hashed_replica_parts.push_back(entry);
}
std::sort(hashed_replica_parts.begin(), hashed_replica_parts.end(), [&](const Entry & lhs, const Entry & rhs)
{
return lhs.first < rhs.first;
});
final_parts_with_ranges = hashed_replica_parts[settings.parallel_replica_offset].second;
/// Пересчитываем количество засечек и диапазонов.
sum_marks = 0;
sum_ranges = 0;
for (const auto & part_with_ranges : *final_parts_with_ranges)
{
sum_ranges += part_with_ranges.ranges.size();
for (const auto & range : part_with_ranges.ranges)
sum_marks += range.end - range.begin;
/// Для каждого элемента массива per_replica_parts_with_ranges, вычисляем его хэш
/// Сортируем массив per_replica_parts_with_ranges по хэшу.
/// Выбираем k-й элемент массива per_replica_parts_with_ranges, где k - наш номер реплики.
using Entry = std::pair<std::pair<UInt64, UInt64>, RangesInDataParts *>;
std::vector<Entry> hashed_replica_parts;
for (auto & cluster : per_replica_parts_with_ranges)
{
Entry entry = std::make_pair(computeHash(cluster), &cluster);
hashed_replica_parts.push_back(entry);
}
std::sort(hashed_replica_parts.begin(), hashed_replica_parts.end(), [&](const Entry & lhs, const Entry & rhs)
{
return lhs.first < rhs.first;
});
final_parts_with_ranges = hashed_replica_parts[settings.parallel_replica_offset].second;
/// Пересчитываем количество засечек и диапазонов.
sum_marks = 0;
sum_ranges = 0;
for (const auto & part_with_ranges : *final_parts_with_ranges)
{
sum_ranges += part_with_ranges.ranges.size();
for (const auto & range : part_with_ranges.ranges)
sum_marks += range.end - range.begin;
}
}
else
final_parts_with_ranges = &parts_with_ranges;
}
else
{
BlockInputStreams res;
return res;
}
}
if (final_parts_with_ranges == nullptr)
else
final_parts_with_ranges = &parts_with_ranges;
LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << final_parts_with_ranges->size() << " parts by key, "