From 92fb1d268956c317c9ead215a1b8317d4f282f7f Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Wed, 21 Jan 2015 15:24:29 +0300 Subject: [PATCH] dbms: Server: queries with several replicas: development [#METR-14410] --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 71 +++++++++++-------- 1 file changed, 40 insertions(+), 31 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index c53f885cd66..b0d7fdd7d53 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -254,47 +254,56 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( RangesInDataParts * final_parts_with_ranges = nullptr; - if (settings.parallel_replicas_count > 0) + if (settings.parallel_replicas_count > 1) { PartsWithRangesSplitter splitter(parts_with_ranges, sum_marks, data.settings.min_rows_for_seek, settings.parallel_replicas_count); auto per_replica_parts_with_ranges = splitter.perform(); - if ((per_replica_parts_with_ranges.size() > 1) && (settings.parallel_replica_offset < per_replica_parts_with_ranges.size())) + if (settings.parallel_replica_offset < per_replica_parts_with_ranges.size()) { - /// Для каждого элемента массива per_replica_parts_with_ranges, вычисляем его хэш - /// Сортируем массив per_replica_parts_with_ranges по хэшу. - /// Выбираем k-й элемент массива per_replica_parts_with_ranges, где k - наш номер реплики. - - using Entry = std::pair, RangesInDataParts *>; - std::vector hashed_replica_parts; - - for (auto & cluster : per_replica_parts_with_ranges) + if (per_replica_parts_with_ranges.size() > 1) { - Entry entry = std::make_pair(computeHash(cluster), &cluster); - hashed_replica_parts.push_back(entry); - } - - std::sort(hashed_replica_parts.begin(), hashed_replica_parts.end(), [&](const Entry & lhs, const Entry & rhs) - { - return lhs.first < rhs.first; - }); - - final_parts_with_ranges = hashed_replica_parts[settings.parallel_replica_offset].second; - - /// Пересчитываем количество засечек и диапазонов. - sum_marks = 0; - sum_ranges = 0; - for (const auto & part_with_ranges : *final_parts_with_ranges) - { - sum_ranges += part_with_ranges.ranges.size(); - for (const auto & range : part_with_ranges.ranges) - sum_marks += range.end - range.begin; + /// Для каждого элемента массива per_replica_parts_with_ranges, вычисляем его хэш + /// Сортируем массив per_replica_parts_with_ranges по хэшу. + /// Выбираем k-й элемент массива per_replica_parts_with_ranges, где k - наш номер реплики. + + using Entry = std::pair, RangesInDataParts *>; + std::vector hashed_replica_parts; + + for (auto & cluster : per_replica_parts_with_ranges) + { + Entry entry = std::make_pair(computeHash(cluster), &cluster); + hashed_replica_parts.push_back(entry); + } + + std::sort(hashed_replica_parts.begin(), hashed_replica_parts.end(), [&](const Entry & lhs, const Entry & rhs) + { + return lhs.first < rhs.first; + }); + + final_parts_with_ranges = hashed_replica_parts[settings.parallel_replica_offset].second; + + /// Пересчитываем количество засечек и диапазонов. + sum_marks = 0; + sum_ranges = 0; + for (const auto & part_with_ranges : *final_parts_with_ranges) + { + sum_ranges += part_with_ranges.ranges.size(); + for (const auto & range : part_with_ranges.ranges) + sum_marks += range.end - range.begin; + } } + else + final_parts_with_ranges = &parts_with_ranges; + } + else + { + BlockInputStreams res; + return res; } } - - if (final_parts_with_ranges == nullptr) + else final_parts_with_ranges = &parts_with_ranges; LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << final_parts_with_ranges->size() << " parts by key, "