diff --git a/src/Processors/QueryPlan/PartsSplitter.cpp b/src/Processors/QueryPlan/PartsSplitter.cpp index 8bf877cf8b9..6f49bcce25c 100644 --- a/src/Processors/QueryPlan/PartsSplitter.cpp +++ b/src/Processors/QueryPlan/PartsSplitter.cpp @@ -30,6 +30,23 @@ std::string toString(const Values & value) return fmt::format("({})", fmt::join(value, ", ")); } +int compareValues(const Values & lhs, const Values & rhs) +{ + chassert(lhs.size() == rhs.size()); + + for (size_t i = 0; i < lhs.size(); ++i) + { + if (applyVisitor(FieldVisitorAccurateLess(), lhs[i], rhs[i])) + return -1; + + if (!applyVisitor(FieldVisitorAccurateEquals(), lhs[i], rhs[i])) + return 1; + } + + return 0; +} + + /// Adaptor to access PK values from index. class IndexAccess { @@ -49,6 +66,58 @@ public: return values; } + std::optional findRightmostMarkLessThanValueInRange(size_t part_index, Values value, size_t range_begin, size_t range_end) const + { + size_t left = range_begin; + size_t right = range_end; + + while (left < right) + { + size_t middle = left + (right - left) / 2; + int compare_result = compareValues(getValue(part_index, middle), value); + if (compare_result != -1) + right = middle; + else + left = middle + 1; + } + + if (right == range_begin) + return {}; + + return right - 1; + } + + std::optional findRightmostMarkLessThanValueInRange(size_t part_index, Values value, MarkRange mark_range) const + { + return findRightmostMarkLessThanValueInRange(part_index, value, mark_range.begin, mark_range.end); + } + + std::optional findLeftmostMarkGreaterThanValueInRange(size_t part_index, Values value, size_t range_begin, size_t range_end) const + { + size_t left = range_begin; + size_t right = range_end; + + while (left < right) + { + size_t middle = left + (right - left) / 2; + int compare_result = compareValues(getValue(part_index, middle), value); + if (compare_result != 1) + left = middle + 1; + else + right = middle; + } + + if (left == range_end) + return {}; + + return left; + } + + std::optional findLeftmostMarkGreaterThanValueInRange(size_t part_index, Values value, MarkRange mark_range) const + { + return findLeftmostMarkGreaterThanValueInRange(part_index, value, mark_range.begin, mark_range.end); + } + size_t getMarkRows(size_t part_idx, size_t mark) const { return parts[part_idx].data_part->index_granularity.getMarkRows(mark); } size_t getTotalRowCount() const @@ -63,67 +132,367 @@ private: const RangesInDataParts & parts; }; - -/// Splits parts into layers, each layer will contain parts subranges with PK values from its own range. -/// Will try to produce exactly max_layer layers but may return less if data is distributed in not a very parallelizable way. -std::pair, std::vector> split(RangesInDataParts parts, size_t max_layers) +class RangesInDataPartsBuilder { +public: + explicit RangesInDataPartsBuilder(const RangesInDataParts & initial_ranges_in_data_parts_) : initial_ranges_in_data_parts(initial_ranges_in_data_parts_) { } + + void addRange(size_t part_index, MarkRange mark_range) + { + auto [it, inserted] = part_index_to_current_ranges_in_data_parts_index.emplace(part_index, ranges_in_data_parts.size()); + + if (inserted) + { + ranges_in_data_parts.emplace_back( + initial_ranges_in_data_parts[part_index].data_part, + initial_ranges_in_data_parts[part_index].alter_conversions, + initial_ranges_in_data_parts[part_index].part_index_in_query, + MarkRanges{mark_range}); + part_index_to_initial_ranges_in_data_parts_index[it->second] = part_index; + return; + } + + ranges_in_data_parts[it->second].ranges.push_back(mark_range); + } + + RangesInDataParts & getCurrentRangesInDataParts() + { + return ranges_in_data_parts; + } + + size_t mapPartIndexToInitialPartIndex(size_t part_index) const + { + return part_index_to_initial_ranges_in_data_parts_index.at(part_index); + } +private: + std::unordered_map part_index_to_current_ranges_in_data_parts_index; + std::unordered_map part_index_to_initial_ranges_in_data_parts_index; + RangesInDataParts ranges_in_data_parts; + const RangesInDataParts & initial_ranges_in_data_parts; +}; + +struct PartsRangesIterator +{ + enum class EventType : uint8_t + { + RangeStart = 0, + RangeEnd, + }; + + [[maybe_unused]] bool operator<(const PartsRangesIterator & other) const + { + int compare_result = compareValues(value, other.value); + if (compare_result == -1) + return true; + else if (compare_result == 1) + return false; + + // RangeStart event always before RangeEnd event + if (event != other.event) + return event < other.event; + + /// Within the same part we should process events in order of mark numbers, + /// because they already ordered by value and range ends have greater mark numbers than the beginnings. + /// Otherwise we could get invalid ranges with the right bound that is less than the left bound. + const auto ev_mark = event == EventType::RangeStart ? range.begin : range.end; + const auto other_ev_mark = other.event == EventType::RangeStart ? other.range.begin : other.range.end; + + if (ev_mark == other_ev_mark) + return part_index < other.part_index; + + return ev_mark < other_ev_mark; + } + + [[maybe_unused]] bool operator==(const PartsRangesIterator & other) const + { + if (value.size() != other.value.size()) + return false; + + for (size_t i = 0; i < value.size(); ++i) + if (!applyVisitor(FieldVisitorAccurateEquals(), value[i], other.value[i])) + return false; + + return range == other.range && part_index == other.part_index && event == other.event; + } + + [[maybe_unused]] bool operator>(const PartsRangesIterator & other) const + { + if (operator<(other) || operator==(other)) + return false; + + return true; + } + + Values value; + MarkRange range; + size_t part_index; + EventType event; +}; + +struct SplitResult +{ + RangesInDataParts non_intersecting_parts_ranges; + std::vector borders; + std::vector layers; +}; + +SplitResult split(RangesInDataParts ranges_in_data_parts, size_t max_layers) +{ + /** Split ranges in data parts into intersecting ranges in data parts and non intersecting ranges in data parts. + * + * For each marks range we will create 2 events (RangeStart, RangeEnd), add these events into array and sort them by primary key index + * value at this event. + * + * After that we will scan sorted events and maintain current intersecting parts ranges. + * If current intersecting parts ranges is 1, for each event (RangeStart, RangeEnd) we can extract non intersecting range + * from single part range. + * + * There can be 4 possible cases: + * + * 1. RangeStart after RangeStart: + * + * Example: + * + * range 1 [---- ... + * range 2 [(value_1) ... + * + * In this scenario we can extract non intersecting part of range 1. This non intersecting part will have start + * of range 1 and end with rightmost mark from range 1 that contains value less than value_1. + * + * 2. RangeStart after RangeEnd: + * + * Example: + * + * range 1 [ ---- ... + * range 2 [ (value_1)] + * range 3 [(value_2) ... + * + * In this case we can extract non intersecting part of range 1. This non intersecting part will have start + * of leftmost mark from range 1 that contains value greater than value_1 and end with rightmost mark from range 1 + * that contains value less than value_2. + * + * 3. RangeEnd after RangeStart: + * + * Example: + * + * range 1 [----] + * + * In this case we can extract range 1 as non intersecting. + * + * 4. RangeEnd after RangeEnd + * + * Example: + * + * range 1 [ ... ----] + * range 2 [ ... (value_1)] + * + * In this case we can extract non intersecting part of range 1. This non intersecting part will have start + * of leftmost mark from range 1 that contains value greater than value_1 and end with range 1 end. + * + * Additional details: + * + * 1. If part level is 0, we must process all ranges from this part, because they can contain duplicate primary keys. + * 2. If non intersecting range is small, it is better to not add it to non intersecting ranges, to avoid expensive seeks. + */ + + IndexAccess index_access(ranges_in_data_parts); + std::vector parts_ranges; + + for (size_t part_index = 0; part_index < ranges_in_data_parts.size(); ++part_index) + { + for (const auto & range : ranges_in_data_parts[part_index].ranges) + { + const auto & index_granularity = ranges_in_data_parts[part_index].data_part->index_granularity; + parts_ranges.push_back( + {index_access.getValue(part_index, range.begin), range, part_index, PartsRangesIterator::EventType::RangeStart}); + + const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount(); + if (!value_is_defined_at_end_mark) + continue; + + parts_ranges.push_back( + {index_access.getValue(part_index, range.end), range, part_index, PartsRangesIterator::EventType::RangeEnd}); + } + } + + std::sort(parts_ranges.begin(), parts_ranges.end()); + + RangesInDataPartsBuilder intersecting_ranges_in_data_parts_builder(ranges_in_data_parts); + RangesInDataPartsBuilder non_intersecting_ranges_in_data_parts_builder(ranges_in_data_parts); + + static constexpr size_t min_number_of_marks_for_non_intersecting_range = 2; + + auto add_non_intersecting_range = [&](size_t part_index, MarkRange mark_range) + { + non_intersecting_ranges_in_data_parts_builder.addRange(part_index, mark_range); + }; + + auto add_intersecting_range = [&](size_t part_index, MarkRange mark_range) + { + intersecting_ranges_in_data_parts_builder.addRange(part_index, mark_range); + }; + + std::unordered_map part_index_start_to_range; + + chassert(parts_ranges.size() > 1); + chassert(parts_ranges[0].event == PartsRangesIterator::EventType::RangeStart); + part_index_start_to_range[parts_ranges[0].part_index] = parts_ranges[0].range; + + size_t parts_ranges_size = parts_ranges.size(); + for (size_t i = 1; i < parts_ranges_size; ++i) + { + auto & previous_part_range = parts_ranges[i - 1]; + auto & current_part_range = parts_ranges[i]; + size_t intersecting_parts = part_index_start_to_range.size(); + bool range_start = current_part_range.event == PartsRangesIterator::EventType::RangeStart; + + if (range_start) + { + auto [it, inserted] = part_index_start_to_range.emplace(current_part_range.part_index, current_part_range.range); + chassert(inserted); + + if (intersecting_parts != 1) + continue; + + if (previous_part_range.event == PartsRangesIterator::EventType::RangeStart) + { + /// If part level is 0, we must process whole previous part because it can contain duplicate primary keys + if (ranges_in_data_parts[previous_part_range.part_index].data_part->info.level == 0) + continue; + + /// Case 1 Range Start after Range Start + size_t begin = previous_part_range.range.begin; + std::optional end_optional = index_access.findRightmostMarkLessThanValueInRange(previous_part_range.part_index, + current_part_range.value, + previous_part_range.range); + + if (!end_optional) + continue; + + size_t end = *end_optional; + + if (end - begin >= min_number_of_marks_for_non_intersecting_range) + { + part_index_start_to_range[previous_part_range.part_index].begin = end; + add_non_intersecting_range(previous_part_range.part_index, MarkRange{begin, end}); + } + + continue; + } + + auto other_interval_it = part_index_start_to_range.begin(); + for (; other_interval_it != part_index_start_to_range.end(); ++other_interval_it) + { + if (other_interval_it != it) + break; + } + + chassert(other_interval_it != part_index_start_to_range.end()); + size_t other_interval_part_index = other_interval_it->first; + MarkRange other_interval_range = other_interval_it->second; + + /// If part level is 0, we must process whole other intersecting part because it can contain duplicate primary keys + if (ranges_in_data_parts[other_interval_part_index].data_part->info.level == 0) + continue; + + /// Case 2 Range Start after Range End + std::optional begin_optional = index_access.findLeftmostMarkGreaterThanValueInRange(other_interval_part_index, + previous_part_range.value, + other_interval_range); + if (!begin_optional) + continue; + + std::optional end_optional = index_access.findRightmostMarkLessThanValueInRange(other_interval_part_index, + current_part_range.value, + other_interval_range); + if (!end_optional) + continue; + + size_t begin = *end_optional; + size_t end = *end_optional; + + if (end - begin >= min_number_of_marks_for_non_intersecting_range) + { + other_interval_it->second.begin = end; + add_intersecting_range(other_interval_part_index, MarkRange{other_interval_range.begin, begin}); + add_non_intersecting_range(other_interval_part_index, MarkRange{begin, end}); + } + continue; + } + + chassert(current_part_range.event == PartsRangesIterator::EventType::RangeEnd); + + /** If there are more than 1 part ranges that we are currently processing + * that means that this part range is interesecting with other range. + * + * If part level is 0, we must process whole part because it can contain duplicate primary keys. + */ + if (intersecting_parts != 1 || ranges_in_data_parts[current_part_range.part_index].data_part->info.level == 0) + { + add_intersecting_range(current_part_range.part_index, part_index_start_to_range[current_part_range.part_index]); + part_index_start_to_range.erase(current_part_range.part_index); + continue; + } + + if (previous_part_range.event == PartsRangesIterator::EventType::RangeStart) + { + chassert(current_part_range.part_index == previous_part_range.part_index); + chassert(current_part_range.range == previous_part_range.range); + + /// Case 3 Range End after Range Start + non_intersecting_ranges_in_data_parts_builder.addRange(current_part_range.part_index, current_part_range.range); + part_index_start_to_range.erase(current_part_range.part_index); + continue; + } + + chassert(previous_part_range.event == PartsRangesIterator::EventType::RangeEnd); + chassert(previous_part_range.part_index != current_part_range.part_index); + + /// Case 4 Range End after Range End + std::optional begin_optional = index_access.findLeftmostMarkGreaterThanValueInRange(current_part_range.part_index, + previous_part_range.value, + current_part_range.range); + size_t end = current_part_range.range.end; + + if (begin_optional && end - *begin_optional >= min_number_of_marks_for_non_intersecting_range) + { + size_t begin = *begin_optional; + add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range.part_index].begin, begin}); + add_non_intersecting_range(current_part_range.part_index, MarkRange{begin, end}); + } + else + { + add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range.part_index].begin, end}); + } + + part_index_start_to_range.erase(current_part_range.part_index); + } + + auto & non_intersecting_ranges_in_data_parts = non_intersecting_ranges_in_data_parts_builder.getCurrentRangesInDataParts(); + auto & intersecting_ranges_in_data_parts = intersecting_ranges_in_data_parts_builder.getCurrentRangesInDataParts(); + // We will advance the iterator pointing to the mark with the smallest PK value until // there will be not less than rows_per_layer rows in the current layer (roughly speaking). // Then we choose the last observed value as the new border, so the current layer will consists // of granules with values greater than the previous mark and less or equal than the new border. - struct PartsRangesIterator + std::priority_queue, std::greater<>> parts_ranges_queue; + for (size_t part_index = 0; part_index < intersecting_ranges_in_data_parts.size(); ++part_index) { - struct MarkRangeWithPartIdx : MarkRange - { - size_t part_idx; - }; - - enum class EventType - { - RangeStart, - RangeEnd, - }; - - [[maybe_unused]] bool operator<(const PartsRangesIterator & other) const - { - // Accurate comparison of `value > other.value` - for (size_t i = 0; i < value.size(); ++i) - { - if (applyVisitor(FieldVisitorAccurateLess(), value[i], other.value[i])) - return false; - - if (!applyVisitor(FieldVisitorAccurateEquals(), value[i], other.value[i])) - return true; - } - - /// Within the same part we should process events in order of mark numbers, - /// because they already ordered by value and range ends have greater mark numbers than the beginnings. - /// Otherwise we could get invalid ranges with the right bound that is less than the left bound. - const auto ev_mark = event == EventType::RangeStart ? range.begin : range.end; - const auto other_ev_mark = other.event == EventType::RangeStart ? other.range.begin : other.range.end; - return ev_mark > other_ev_mark; - } - - Values value; - MarkRangeWithPartIdx range; - EventType event; - }; - - const auto index_access = std::make_unique(parts); - std::priority_queue parts_ranges_queue; - for (size_t part_idx = 0; part_idx < parts.size(); ++part_idx) - { - for (const auto & range : parts[part_idx].ranges) + size_t initial_part_index = intersecting_ranges_in_data_parts_builder.mapPartIndexToInitialPartIndex(part_index); + + for (const auto & range : intersecting_ranges_in_data_parts[part_index].ranges) { + const auto & index_granularity = intersecting_ranges_in_data_parts[part_index].data_part->index_granularity; parts_ranges_queue.push( - {index_access->getValue(part_idx, range.begin), {range, part_idx}, PartsRangesIterator::EventType::RangeStart}); - const auto & index_granularity = parts[part_idx].data_part->index_granularity; + {index_access.getValue(initial_part_index, range.begin), range, initial_part_index, PartsRangesIterator::EventType::RangeStart}); + const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount(); - if (value_is_defined_at_end_mark) - parts_ranges_queue.push( - {index_access->getValue(part_idx, range.end), {range, part_idx}, PartsRangesIterator::EventType::RangeEnd}); + if (!value_is_defined_at_end_mark) + continue; + + parts_ranges_queue.push( + {index_access.getValue(initial_part_index, range.end), range, initial_part_index, PartsRangesIterator::EventType::RangeEnd}); } } @@ -136,7 +505,7 @@ std::pair, std::vector> split(RangesInDat std::vector borders; std::vector result_layers; - const size_t rows_per_layer = std::max(index_access->getTotalRowCount() / max_layers, 1); + const size_t rows_per_layer = std::max(index_access.getTotalRowCount() / max_layers, 1); while (!parts_ranges_queue.empty()) { @@ -152,9 +521,7 @@ std::pair, std::vector> split(RangesInDat return marks_in_current_layer < intersected_parts * 2; }; - auto & current_layer = result_layers.emplace_back(); - /// Map part_idx into index inside layer, used to merge marks from the same part into one reader - std::unordered_map part_idx_in_layer; + RangesInDataPartsBuilder current_layer_builder(ranges_in_data_parts); while (rows_in_current_layer < rows_per_layer || layers_intersection_is_too_big() || result_layers.size() == max_layers) { @@ -164,57 +531,52 @@ std::pair, std::vector> split(RangesInDat { auto current = parts_ranges_queue.top(); parts_ranges_queue.pop(); - const auto part_idx = current.range.part_idx; + const auto part_index = current.part_index; if (current.event == PartsRangesIterator::EventType::RangeEnd) { - const auto & mark = MarkRange{current_part_range_begin[part_idx], current.range.end}; - auto it = part_idx_in_layer.emplace(std::make_pair(part_idx, current_layer.size())); - if (it.second) - current_layer.emplace_back( - parts[part_idx].data_part, - parts[part_idx].alter_conversions, - parts[part_idx].part_index_in_query, - MarkRanges{mark}); - else - current_layer[it.first->second].ranges.push_back(mark); - - current_part_range_begin.erase(part_idx); - current_part_range_end.erase(part_idx); + current_layer_builder.addRange(part_index, MarkRange{current_part_range_begin[part_index], current.range.end}); + current_part_range_begin.erase(part_index); + current_part_range_end.erase(part_index); continue; } last_value = std::move(current.value); - rows_in_current_layer += index_access->getMarkRows(part_idx, current.range.begin); - marks_in_current_layer++; - current_part_range_begin.try_emplace(part_idx, current.range.begin); - current_part_range_end[part_idx] = current.range.begin; + rows_in_current_layer += index_access.getMarkRows(part_index, current.range.begin); + ++marks_in_current_layer; + + current_part_range_begin.try_emplace(part_index, current.range.begin); + current_part_range_end[part_index] = current.range.begin; + if (current.range.begin + 1 < current.range.end) { - current.range.begin++; - current.value = index_access->getValue(part_idx, current.range.begin); + ++current.range.begin; + current.value = index_access.getValue(part_index, current.range.begin); parts_ranges_queue.push(std::move(current)); } } + if (parts_ranges_queue.empty()) break; + if (rows_in_current_layer >= rows_per_layer && !layers_intersection_is_too_big() && result_layers.size() < max_layers) borders.push_back(last_value); } - for (const auto & [part_idx, last_mark] : current_part_range_end) + + for (const auto & [part_index, last_mark] : current_part_range_end) { - const auto & mark = MarkRange{current_part_range_begin[part_idx], last_mark + 1}; - auto it = part_idx_in_layer.emplace(std::make_pair(part_idx, current_layer.size())); - - if (it.second) - result_layers.back().emplace_back( - parts[part_idx].data_part, parts[part_idx].alter_conversions, parts[part_idx].part_index_in_query, MarkRanges{mark}); - else - current_layer[it.first->second].ranges.push_back(mark); - - current_part_range_begin[part_idx] = current_part_range_end[part_idx]; + current_layer_builder.addRange(part_index, MarkRange{current_part_range_begin[part_index], last_mark + 1}); + current_part_range_begin[part_index] = current_part_range_end[part_index]; } + + result_layers.push_back(std::move(current_layer_builder.getCurrentRangesInDataParts())); } + + std::stable_sort( + non_intersecting_ranges_in_data_parts.begin(), + non_intersecting_ranges_in_data_parts.end(), + [](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; }); + for (auto & layer : result_layers) { std::stable_sort( @@ -223,7 +585,7 @@ std::pair, std::vector> split(RangesInDat [](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; }); } - return {std::move(borders), std::move(result_layers)}; + return {std::move(non_intersecting_ranges_in_data_parts), std::move(borders), std::move(result_layers)}; } @@ -329,44 +691,54 @@ static void reorderColumns(ActionsDAG & dag, const Block & header, const std::st dag.getOutputs() = std::move(new_outputs); } -Pipes buildPipesForReadingByPKRanges( +SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey( const KeyDescription & primary_key, ExpressionActionsPtr sorting_expr, RangesInDataParts parts, size_t max_layers, ContextPtr context, - ReadingInOrderStepGetter && reading_step_getter) + ReadingInOrderStepGetter && in_order_reading_step_getter) { if (max_layers <= 1) throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1"); - auto && [borders, result_layers] = split(std::move(parts), max_layers); + SplitResult split_result = split(std::move(parts), max_layers); + + SplitPartsWithRangesByPrimaryKeyResult result; + result.non_intersecting_parts_ranges = std::move(split_result.non_intersecting_parts_ranges); + + auto borders = std::move(split_result.borders); + auto result_layers = std::move(split_result.layers); auto filters = buildFilters(primary_key, borders); - Pipes pipes(result_layers.size()); for (size_t i = 0; i < result_layers.size(); ++i) { - pipes[i] = reading_step_getter(std::move(result_layers[i])); - pipes[i].addSimpleTransform([sorting_expr](const Block & header) + Pipe layer_pipe = in_order_reading_step_getter(std::move(result_layers[i])); + layer_pipe.addSimpleTransform([sorting_expr](const Block & header) { return std::make_shared(header, sorting_expr); }); + auto & filter_function = filters[i]; if (!filter_function) continue; + auto syntax_result = TreeRewriter(context).analyze(filter_function, primary_key.expression->getRequiredColumnsWithTypes()); auto actions = ExpressionAnalyzer(filter_function, syntax_result, context).getActionsDAG(false); - reorderColumns(*actions, pipes[i].getHeader(), filter_function->getColumnName()); + reorderColumns(*actions, layer_pipe.getHeader(), filter_function->getColumnName()); ExpressionActionsPtr expression_actions = std::make_shared(std::move(actions)); auto description = fmt::format( "filter values in ({}, {}]", i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf"); - pipes[i].addSimpleTransform( + layer_pipe.addSimpleTransform( [&](const Block & header) { auto step = std::make_shared(header, expression_actions, filter_function->getColumnName(), true); step->setDescription(description); return step; }); + + result.merging_pipes.push_back(std::move(layer_pipe)); } - return pipes; + + return result; } } diff --git a/src/Processors/QueryPlan/PartsSplitter.h b/src/Processors/QueryPlan/PartsSplitter.h index 92ba6191e97..47a2f8b468c 100644 --- a/src/Processors/QueryPlan/PartsSplitter.h +++ b/src/Processors/QueryPlan/PartsSplitter.h @@ -13,15 +13,25 @@ namespace DB using ReadingInOrderStepGetter = std::function; -/// Splits parts into layers, each layer will contain parts subranges with PK values from its own range. -/// A separate pipe will be constructed for each layer with a reading step (provided by the reading_step_getter) and -/// a filter for this layer's range of PK values. -/// Will try to produce exactly max_layer pipes but may return less if data is distributed in not a very parallelizable way. -Pipes buildPipesForReadingByPKRanges( +struct SplitPartsWithRangesByPrimaryKeyResult +{ + RangesInDataParts non_intersecting_parts_ranges; + Pipes merging_pipes; +}; + +/** Splits parts ranges into: + * + * 1. Non interesecing part ranges, for parts with level > 0. + * 2. Merging layers, that contain ranges from multiple parts. A separate pipe will be constructed for each layer + * with a reading step (provided by the in_order_reading_step_getter) and a filter for this layer's range of PK values. + * + * Will try to produce exactly max_layer layers but may return less if data is distributed in not a very parallelizable way. + */ +SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey( const KeyDescription & primary_key, ExpressionActionsPtr sorting_expr, RangesInDataParts parts, size_t max_layers, ContextPtr context, - ReadingInOrderStepGetter && reading_step_getter); + ReadingInOrderStepGetter && in_order_reading_step_getter); } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 875b0d9bdbc..48efe44ed2a 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -1072,9 +1072,6 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( it, parts_with_ranges.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; }); parts_to_merge_ranges.push_back(it); } - /// We divide threads for each partition equally. But we will create at least the number of partitions threads. - /// (So, the total number of threads could be more than initial num_streams. - num_streams /= (parts_to_merge_ranges.size() - 1); } else { @@ -1087,8 +1084,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( /// If do_not_merge_across_partitions_select_final is true and num_streams > 1 /// we will store lonely parts with level > 0 to use parallel select on them. - RangesInDataParts lonely_parts; - size_t sum_marks_in_lonely_parts = 0; + RangesInDataParts non_intersecting_parts_by_primary_key; auto sorting_expr = std::make_shared(metadata_for_reading->getSortingKey().expression->getActionsDAG().clone()); @@ -1100,32 +1096,26 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( bool no_merging_final = settings.do_not_merge_across_partitions_select_final && std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && parts_to_merge_ranges[range_index]->data_part->info.level > 0; + if (no_merging_final) + { + non_intersecting_parts_by_primary_key.push_back(std::move(*parts_to_merge_ranges[range_index])); + continue; + } + Pipes pipes; { RangesInDataParts new_parts; - if (no_merging_final) - { - if (num_streams > 1) - sum_marks_in_lonely_parts += parts_to_merge_ranges[range_index]->getMarksCount(); - lonely_parts.push_back(std::move(*parts_to_merge_ranges[range_index])); - continue; - } - else - { - for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it) - { - new_parts.emplace_back(part_it->data_part, part_it->alter_conversions, part_it->part_index_in_query, part_it->ranges); - } - } + for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it) + new_parts.emplace_back(part_it->data_part, part_it->alter_conversions, part_it->part_index_in_query, part_it->ranges); if (new_parts.empty()) continue; if (num_streams > 1 && metadata_for_reading->hasPrimaryKey()) { - // Let's split parts into layers to ensure data parallelism of FINAL. - auto reading_step_getter = [this, &column_names, &info](auto parts) + // Let's split parts into non intersecting parts ranges and layers to ensure data parallelism of FINAL. + auto in_order_reading_step_getter = [this, &column_names, &info](auto parts) { return this->read( std::move(parts), @@ -1136,13 +1126,19 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( info.use_uncompressed_cache); }; - pipes = buildPipesForReadingByPKRanges( + SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey( metadata_for_reading->getPrimaryKey(), sorting_expr, std::move(new_parts), num_streams, context, - std::move(reading_step_getter)); + std::move(in_order_reading_step_getter)); + + for (auto && non_intersecting_parts_range : split_ranges_result.non_intersecting_parts_ranges) + non_intersecting_parts_by_primary_key.push_back(std::move(non_intersecting_parts_range)); + + for (auto && merging_pipe : split_ranges_result.merging_pipes) + pipes.push_back(std::move(merging_pipe)); } else { @@ -1154,10 +1150,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( } /// Drop temporary columns, added by 'sorting_key_expr' - if (!out_projection) + if (!out_projection && !pipes.empty()) out_projection = createProjection(pipes.front().getHeader()); } + if (pipes.empty()) + continue; Names sort_columns = metadata_for_reading->getSortingKeyColumns(); SortDescription sort_description; @@ -1183,45 +1181,9 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( merging_pipes.emplace_back(Pipe::unitePipes(std::move(pipes))); } - if (!lonely_parts.empty()) + if (!non_intersecting_parts_by_primary_key.empty()) { - Pipe pipe; - if (num_streams > 1) - { - size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size(); - - const size_t min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead( - settings.merge_tree_min_rows_for_concurrent_read, - settings.merge_tree_min_bytes_for_concurrent_read, - data_settings->index_granularity, - info.index_granularity_bytes, - sum_marks_in_lonely_parts); - - /// Reduce the number of num_streams_for_lonely_parts if the data is small. - if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read - && lonely_parts.size() < num_streams_for_lonely_parts) - num_streams_for_lonely_parts = std::max( - (sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, - lonely_parts.size()); - - pipe = read( - std::move(lonely_parts), - origin_column_names, - ReadFromMergeTree::ReadType::Default, - num_streams_for_lonely_parts, - min_marks_for_concurrent_read, - info.use_uncompressed_cache); - } - else - { - pipe = read( - std::move(lonely_parts), - origin_column_names, - ReadFromMergeTree::ReadType::InOrder, - num_streams, - 0, - info.use_uncompressed_cache); - } + auto pipe = spreadMarkRangesAmongStreams(std::move(non_intersecting_parts_by_primary_key), num_streams, origin_column_names); no_merging_pipes.emplace_back(std::move(pipe)); } diff --git a/tests/queries/0_stateless/02946_merge_tree_final_split_ranges_by_primary_key.reference b/tests/queries/0_stateless/02946_merge_tree_final_split_ranges_by_primary_key.reference new file mode 100644 index 00000000000..59acae1c7ef --- /dev/null +++ b/tests/queries/0_stateless/02946_merge_tree_final_split_ranges_by_primary_key.reference @@ -0,0 +1,85 @@ +1 +-- +0 0 +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +10 10 +11 11 +12 12 +13 13 +14 14 +15 15 +-- +0 0 +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +10 10 +11 11 +12 12 +13 13 +14 14 +15 15 +-- +0 0 +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +10 10 +11 11 +12 12 +13 13 +14 14 +15 15 +-- +0 0 +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +10 10 +11 11 +12 12 +13 13 +14 14 +15 15 +16 16 +17 17 +18 18 +19 19 +20 20 +21 21 +22 22 +23 23 +24 24 +25 25 +26 26 +27 27 +28 28 +29 29 +30 30 +31 31 diff --git a/tests/queries/0_stateless/02946_merge_tree_final_split_ranges_by_primary_key.sql b/tests/queries/0_stateless/02946_merge_tree_final_split_ranges_by_primary_key.sql new file mode 100644 index 00000000000..70067bcff74 --- /dev/null +++ b/tests/queries/0_stateless/02946_merge_tree_final_split_ranges_by_primary_key.sql @@ -0,0 +1,34 @@ +DROP TABLE IF EXISTS test_table; +CREATE TABLE test_table +( + id UInt64, + value String +) ENGINE=ReplacingMergeTree ORDER BY id SETTINGS index_granularity = 2; + +INSERT INTO test_table SELECT 0, '0'; +INSERT INTO test_table SELECT number + 1, number + 1 FROM numbers(15); +OPTIMIZE TABLE test_table; + +SELECT COUNT() FROM system.parts WHERE table = 'test_table' AND active = 1; +SYSTEM STOP MERGES test_table; + +SELECT '--'; + +SELECT id, value FROM test_table FINAL ORDER BY id; + +SELECT '--'; + +INSERT INTO test_table SELECT 5, '5'; +SELECT id, value FROM test_table FINAL ORDER BY id; + +SELECT '--'; + +INSERT INTO test_table SELECT number + 8, number + 8 FROM numbers(8); +SELECT id, value FROM test_table FINAL ORDER BY id; + +SELECT '--'; + +INSERT INTO test_table SELECT number, number FROM numbers(32); +SELECT id, value FROM test_table FINAL ORDER BY id; + +DROP TABLE test_table;