MergeTree FINAL extract non intersecting parts ranges

This commit is contained in:
Maksim Kita 2023-12-21 14:30:09 +03:00
parent 35e27ab1a3
commit 269e9706fb
5 changed files with 627 additions and 164 deletions

View File

@ -30,6 +30,23 @@ std::string toString(const Values & value)
return fmt::format("({})", fmt::join(value, ", "));
}
int compareValues(const Values & lhs, const Values & rhs)
{
chassert(lhs.size() == rhs.size());
for (size_t i = 0; i < lhs.size(); ++i)
{
if (applyVisitor(FieldVisitorAccurateLess(), lhs[i], rhs[i]))
return -1;
if (!applyVisitor(FieldVisitorAccurateEquals(), lhs[i], rhs[i]))
return 1;
}
return 0;
}
/// Adaptor to access PK values from index.
class IndexAccess
{
@ -49,6 +66,58 @@ public:
return values;
}
std::optional<size_t> findRightmostMarkLessThanValueInRange(size_t part_index, Values value, size_t range_begin, size_t range_end) const
{
size_t left = range_begin;
size_t right = range_end;
while (left < right)
{
size_t middle = left + (right - left) / 2;
int compare_result = compareValues(getValue(part_index, middle), value);
if (compare_result != -1)
right = middle;
else
left = middle + 1;
}
if (right == range_begin)
return {};
return right - 1;
}
std::optional<size_t> findRightmostMarkLessThanValueInRange(size_t part_index, Values value, MarkRange mark_range) const
{
return findRightmostMarkLessThanValueInRange(part_index, value, mark_range.begin, mark_range.end);
}
std::optional<size_t> findLeftmostMarkGreaterThanValueInRange(size_t part_index, Values value, size_t range_begin, size_t range_end) const
{
size_t left = range_begin;
size_t right = range_end;
while (left < right)
{
size_t middle = left + (right - left) / 2;
int compare_result = compareValues(getValue(part_index, middle), value);
if (compare_result != 1)
left = middle + 1;
else
right = middle;
}
if (left == range_end)
return {};
return left;
}
std::optional<size_t> findLeftmostMarkGreaterThanValueInRange(size_t part_index, Values value, MarkRange mark_range) const
{
return findLeftmostMarkGreaterThanValueInRange(part_index, value, mark_range.begin, mark_range.end);
}
size_t getMarkRows(size_t part_idx, size_t mark) const { return parts[part_idx].data_part->index_granularity.getMarkRows(mark); }
size_t getTotalRowCount() const
@ -63,67 +132,367 @@ private:
const RangesInDataParts & parts;
};
/// Splits parts into layers, each layer will contain parts subranges with PK values from its own range.
/// Will try to produce exactly max_layer layers but may return less if data is distributed in not a very parallelizable way.
std::pair<std::vector<Values>, std::vector<RangesInDataParts>> split(RangesInDataParts parts, size_t max_layers)
class RangesInDataPartsBuilder
{
public:
explicit RangesInDataPartsBuilder(const RangesInDataParts & initial_ranges_in_data_parts_) : initial_ranges_in_data_parts(initial_ranges_in_data_parts_) { }
void addRange(size_t part_index, MarkRange mark_range)
{
auto [it, inserted] = part_index_to_current_ranges_in_data_parts_index.emplace(part_index, ranges_in_data_parts.size());
if (inserted)
{
ranges_in_data_parts.emplace_back(
initial_ranges_in_data_parts[part_index].data_part,
initial_ranges_in_data_parts[part_index].alter_conversions,
initial_ranges_in_data_parts[part_index].part_index_in_query,
MarkRanges{mark_range});
part_index_to_initial_ranges_in_data_parts_index[it->second] = part_index;
return;
}
ranges_in_data_parts[it->second].ranges.push_back(mark_range);
}
RangesInDataParts & getCurrentRangesInDataParts()
{
return ranges_in_data_parts;
}
size_t mapPartIndexToInitialPartIndex(size_t part_index) const
{
return part_index_to_initial_ranges_in_data_parts_index.at(part_index);
}
private:
std::unordered_map<size_t, size_t> part_index_to_current_ranges_in_data_parts_index;
std::unordered_map<size_t, size_t> part_index_to_initial_ranges_in_data_parts_index;
RangesInDataParts ranges_in_data_parts;
const RangesInDataParts & initial_ranges_in_data_parts;
};
struct PartsRangesIterator
{
enum class EventType : uint8_t
{
RangeStart = 0,
RangeEnd,
};
[[maybe_unused]] bool operator<(const PartsRangesIterator & other) const
{
int compare_result = compareValues(value, other.value);
if (compare_result == -1)
return true;
else if (compare_result == 1)
return false;
// RangeStart event always before RangeEnd event
if (event != other.event)
return event < other.event;
/// Within the same part we should process events in order of mark numbers,
/// because they already ordered by value and range ends have greater mark numbers than the beginnings.
/// Otherwise we could get invalid ranges with the right bound that is less than the left bound.
const auto ev_mark = event == EventType::RangeStart ? range.begin : range.end;
const auto other_ev_mark = other.event == EventType::RangeStart ? other.range.begin : other.range.end;
if (ev_mark == other_ev_mark)
return part_index < other.part_index;
return ev_mark < other_ev_mark;
}
[[maybe_unused]] bool operator==(const PartsRangesIterator & other) const
{
if (value.size() != other.value.size())
return false;
for (size_t i = 0; i < value.size(); ++i)
if (!applyVisitor(FieldVisitorAccurateEquals(), value[i], other.value[i]))
return false;
return range == other.range && part_index == other.part_index && event == other.event;
}
[[maybe_unused]] bool operator>(const PartsRangesIterator & other) const
{
if (operator<(other) || operator==(other))
return false;
return true;
}
Values value;
MarkRange range;
size_t part_index;
EventType event;
};
struct SplitResult
{
RangesInDataParts non_intersecting_parts_ranges;
std::vector<Values> borders;
std::vector<RangesInDataParts> layers;
};
SplitResult split(RangesInDataParts ranges_in_data_parts, size_t max_layers)
{
/** Split ranges in data parts into intersecting ranges in data parts and non intersecting ranges in data parts.
*
* For each marks range we will create 2 events (RangeStart, RangeEnd), add these events into array and sort them by primary key index
* value at this event.
*
* After that we will scan sorted events and maintain current intersecting parts ranges.
* If current intersecting parts ranges is 1, for each event (RangeStart, RangeEnd) we can extract non intersecting range
* from single part range.
*
* There can be 4 possible cases:
*
* 1. RangeStart after RangeStart:
*
* Example:
*
* range 1 [---- ...
* range 2 [(value_1) ...
*
* In this scenario we can extract non intersecting part of range 1. This non intersecting part will have start
* of range 1 and end with rightmost mark from range 1 that contains value less than value_1.
*
* 2. RangeStart after RangeEnd:
*
* Example:
*
* range 1 [ ---- ...
* range 2 [ (value_1)]
* range 3 [(value_2) ...
*
* In this case we can extract non intersecting part of range 1. This non intersecting part will have start
* of leftmost mark from range 1 that contains value greater than value_1 and end with rightmost mark from range 1
* that contains value less than value_2.
*
* 3. RangeEnd after RangeStart:
*
* Example:
*
* range 1 [----]
*
* In this case we can extract range 1 as non intersecting.
*
* 4. RangeEnd after RangeEnd
*
* Example:
*
* range 1 [ ... ----]
* range 2 [ ... (value_1)]
*
* In this case we can extract non intersecting part of range 1. This non intersecting part will have start
* of leftmost mark from range 1 that contains value greater than value_1 and end with range 1 end.
*
* Additional details:
*
* 1. If part level is 0, we must process all ranges from this part, because they can contain duplicate primary keys.
* 2. If non intersecting range is small, it is better to not add it to non intersecting ranges, to avoid expensive seeks.
*/
IndexAccess index_access(ranges_in_data_parts);
std::vector<PartsRangesIterator> parts_ranges;
for (size_t part_index = 0; part_index < ranges_in_data_parts.size(); ++part_index)
{
for (const auto & range : ranges_in_data_parts[part_index].ranges)
{
const auto & index_granularity = ranges_in_data_parts[part_index].data_part->index_granularity;
parts_ranges.push_back(
{index_access.getValue(part_index, range.begin), range, part_index, PartsRangesIterator::EventType::RangeStart});
const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount();
if (!value_is_defined_at_end_mark)
continue;
parts_ranges.push_back(
{index_access.getValue(part_index, range.end), range, part_index, PartsRangesIterator::EventType::RangeEnd});
}
}
std::sort(parts_ranges.begin(), parts_ranges.end());
RangesInDataPartsBuilder intersecting_ranges_in_data_parts_builder(ranges_in_data_parts);
RangesInDataPartsBuilder non_intersecting_ranges_in_data_parts_builder(ranges_in_data_parts);
static constexpr size_t min_number_of_marks_for_non_intersecting_range = 2;
auto add_non_intersecting_range = [&](size_t part_index, MarkRange mark_range)
{
non_intersecting_ranges_in_data_parts_builder.addRange(part_index, mark_range);
};
auto add_intersecting_range = [&](size_t part_index, MarkRange mark_range)
{
intersecting_ranges_in_data_parts_builder.addRange(part_index, mark_range);
};
std::unordered_map<size_t, MarkRange> part_index_start_to_range;
chassert(parts_ranges.size() > 1);
chassert(parts_ranges[0].event == PartsRangesIterator::EventType::RangeStart);
part_index_start_to_range[parts_ranges[0].part_index] = parts_ranges[0].range;
size_t parts_ranges_size = parts_ranges.size();
for (size_t i = 1; i < parts_ranges_size; ++i)
{
auto & previous_part_range = parts_ranges[i - 1];
auto & current_part_range = parts_ranges[i];
size_t intersecting_parts = part_index_start_to_range.size();
bool range_start = current_part_range.event == PartsRangesIterator::EventType::RangeStart;
if (range_start)
{
auto [it, inserted] = part_index_start_to_range.emplace(current_part_range.part_index, current_part_range.range);
chassert(inserted);
if (intersecting_parts != 1)
continue;
if (previous_part_range.event == PartsRangesIterator::EventType::RangeStart)
{
/// If part level is 0, we must process whole previous part because it can contain duplicate primary keys
if (ranges_in_data_parts[previous_part_range.part_index].data_part->info.level == 0)
continue;
/// Case 1 Range Start after Range Start
size_t begin = previous_part_range.range.begin;
std::optional<size_t> end_optional = index_access.findRightmostMarkLessThanValueInRange(previous_part_range.part_index,
current_part_range.value,
previous_part_range.range);
if (!end_optional)
continue;
size_t end = *end_optional;
if (end - begin >= min_number_of_marks_for_non_intersecting_range)
{
part_index_start_to_range[previous_part_range.part_index].begin = end;
add_non_intersecting_range(previous_part_range.part_index, MarkRange{begin, end});
}
continue;
}
auto other_interval_it = part_index_start_to_range.begin();
for (; other_interval_it != part_index_start_to_range.end(); ++other_interval_it)
{
if (other_interval_it != it)
break;
}
chassert(other_interval_it != part_index_start_to_range.end());
size_t other_interval_part_index = other_interval_it->first;
MarkRange other_interval_range = other_interval_it->second;
/// If part level is 0, we must process whole other intersecting part because it can contain duplicate primary keys
if (ranges_in_data_parts[other_interval_part_index].data_part->info.level == 0)
continue;
/// Case 2 Range Start after Range End
std::optional<size_t> begin_optional = index_access.findLeftmostMarkGreaterThanValueInRange(other_interval_part_index,
previous_part_range.value,
other_interval_range);
if (!begin_optional)
continue;
std::optional<size_t> end_optional = index_access.findRightmostMarkLessThanValueInRange(other_interval_part_index,
current_part_range.value,
other_interval_range);
if (!end_optional)
continue;
size_t begin = *end_optional;
size_t end = *end_optional;
if (end - begin >= min_number_of_marks_for_non_intersecting_range)
{
other_interval_it->second.begin = end;
add_intersecting_range(other_interval_part_index, MarkRange{other_interval_range.begin, begin});
add_non_intersecting_range(other_interval_part_index, MarkRange{begin, end});
}
continue;
}
chassert(current_part_range.event == PartsRangesIterator::EventType::RangeEnd);
/** If there are more than 1 part ranges that we are currently processing
* that means that this part range is interesecting with other range.
*
* If part level is 0, we must process whole part because it can contain duplicate primary keys.
*/
if (intersecting_parts != 1 || ranges_in_data_parts[current_part_range.part_index].data_part->info.level == 0)
{
add_intersecting_range(current_part_range.part_index, part_index_start_to_range[current_part_range.part_index]);
part_index_start_to_range.erase(current_part_range.part_index);
continue;
}
if (previous_part_range.event == PartsRangesIterator::EventType::RangeStart)
{
chassert(current_part_range.part_index == previous_part_range.part_index);
chassert(current_part_range.range == previous_part_range.range);
/// Case 3 Range End after Range Start
non_intersecting_ranges_in_data_parts_builder.addRange(current_part_range.part_index, current_part_range.range);
part_index_start_to_range.erase(current_part_range.part_index);
continue;
}
chassert(previous_part_range.event == PartsRangesIterator::EventType::RangeEnd);
chassert(previous_part_range.part_index != current_part_range.part_index);
/// Case 4 Range End after Range End
std::optional<size_t> begin_optional = index_access.findLeftmostMarkGreaterThanValueInRange(current_part_range.part_index,
previous_part_range.value,
current_part_range.range);
size_t end = current_part_range.range.end;
if (begin_optional && end - *begin_optional >= min_number_of_marks_for_non_intersecting_range)
{
size_t begin = *begin_optional;
add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range.part_index].begin, begin});
add_non_intersecting_range(current_part_range.part_index, MarkRange{begin, end});
}
else
{
add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range.part_index].begin, end});
}
part_index_start_to_range.erase(current_part_range.part_index);
}
auto & non_intersecting_ranges_in_data_parts = non_intersecting_ranges_in_data_parts_builder.getCurrentRangesInDataParts();
auto & intersecting_ranges_in_data_parts = intersecting_ranges_in_data_parts_builder.getCurrentRangesInDataParts();
// We will advance the iterator pointing to the mark with the smallest PK value until
// there will be not less than rows_per_layer rows in the current layer (roughly speaking).
// Then we choose the last observed value as the new border, so the current layer will consists
// of granules with values greater than the previous mark and less or equal than the new border.
struct PartsRangesIterator
std::priority_queue<PartsRangesIterator, std::vector<PartsRangesIterator>, std::greater<>> parts_ranges_queue;
for (size_t part_index = 0; part_index < intersecting_ranges_in_data_parts.size(); ++part_index)
{
struct MarkRangeWithPartIdx : MarkRange
{
size_t part_idx;
};
enum class EventType
{
RangeStart,
RangeEnd,
};
[[maybe_unused]] bool operator<(const PartsRangesIterator & other) const
{
// Accurate comparison of `value > other.value`
for (size_t i = 0; i < value.size(); ++i)
{
if (applyVisitor(FieldVisitorAccurateLess(), value[i], other.value[i]))
return false;
if (!applyVisitor(FieldVisitorAccurateEquals(), value[i], other.value[i]))
return true;
}
/// Within the same part we should process events in order of mark numbers,
/// because they already ordered by value and range ends have greater mark numbers than the beginnings.
/// Otherwise we could get invalid ranges with the right bound that is less than the left bound.
const auto ev_mark = event == EventType::RangeStart ? range.begin : range.end;
const auto other_ev_mark = other.event == EventType::RangeStart ? other.range.begin : other.range.end;
return ev_mark > other_ev_mark;
}
Values value;
MarkRangeWithPartIdx range;
EventType event;
};
const auto index_access = std::make_unique<IndexAccess>(parts);
std::priority_queue<PartsRangesIterator> parts_ranges_queue;
for (size_t part_idx = 0; part_idx < parts.size(); ++part_idx)
{
for (const auto & range : parts[part_idx].ranges)
size_t initial_part_index = intersecting_ranges_in_data_parts_builder.mapPartIndexToInitialPartIndex(part_index);
for (const auto & range : intersecting_ranges_in_data_parts[part_index].ranges)
{
const auto & index_granularity = intersecting_ranges_in_data_parts[part_index].data_part->index_granularity;
parts_ranges_queue.push(
{index_access->getValue(part_idx, range.begin), {range, part_idx}, PartsRangesIterator::EventType::RangeStart});
const auto & index_granularity = parts[part_idx].data_part->index_granularity;
{index_access.getValue(initial_part_index, range.begin), range, initial_part_index, PartsRangesIterator::EventType::RangeStart});
const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount();
if (value_is_defined_at_end_mark)
parts_ranges_queue.push(
{index_access->getValue(part_idx, range.end), {range, part_idx}, PartsRangesIterator::EventType::RangeEnd});
if (!value_is_defined_at_end_mark)
continue;
parts_ranges_queue.push(
{index_access.getValue(initial_part_index, range.end), range, initial_part_index, PartsRangesIterator::EventType::RangeEnd});
}
}
@ -136,7 +505,7 @@ std::pair<std::vector<Values>, std::vector<RangesInDataParts>> split(RangesInDat
std::vector<Values> borders;
std::vector<RangesInDataParts> result_layers;
const size_t rows_per_layer = std::max<size_t>(index_access->getTotalRowCount() / max_layers, 1);
const size_t rows_per_layer = std::max<size_t>(index_access.getTotalRowCount() / max_layers, 1);
while (!parts_ranges_queue.empty())
{
@ -152,9 +521,7 @@ std::pair<std::vector<Values>, std::vector<RangesInDataParts>> split(RangesInDat
return marks_in_current_layer < intersected_parts * 2;
};
auto & current_layer = result_layers.emplace_back();
/// Map part_idx into index inside layer, used to merge marks from the same part into one reader
std::unordered_map<size_t, size_t> part_idx_in_layer;
RangesInDataPartsBuilder current_layer_builder(ranges_in_data_parts);
while (rows_in_current_layer < rows_per_layer || layers_intersection_is_too_big() || result_layers.size() == max_layers)
{
@ -164,57 +531,52 @@ std::pair<std::vector<Values>, std::vector<RangesInDataParts>> split(RangesInDat
{
auto current = parts_ranges_queue.top();
parts_ranges_queue.pop();
const auto part_idx = current.range.part_idx;
const auto part_index = current.part_index;
if (current.event == PartsRangesIterator::EventType::RangeEnd)
{
const auto & mark = MarkRange{current_part_range_begin[part_idx], current.range.end};
auto it = part_idx_in_layer.emplace(std::make_pair(part_idx, current_layer.size()));
if (it.second)
current_layer.emplace_back(
parts[part_idx].data_part,
parts[part_idx].alter_conversions,
parts[part_idx].part_index_in_query,
MarkRanges{mark});
else
current_layer[it.first->second].ranges.push_back(mark);
current_part_range_begin.erase(part_idx);
current_part_range_end.erase(part_idx);
current_layer_builder.addRange(part_index, MarkRange{current_part_range_begin[part_index], current.range.end});
current_part_range_begin.erase(part_index);
current_part_range_end.erase(part_index);
continue;
}
last_value = std::move(current.value);
rows_in_current_layer += index_access->getMarkRows(part_idx, current.range.begin);
marks_in_current_layer++;
current_part_range_begin.try_emplace(part_idx, current.range.begin);
current_part_range_end[part_idx] = current.range.begin;
rows_in_current_layer += index_access.getMarkRows(part_index, current.range.begin);
++marks_in_current_layer;
current_part_range_begin.try_emplace(part_index, current.range.begin);
current_part_range_end[part_index] = current.range.begin;
if (current.range.begin + 1 < current.range.end)
{
current.range.begin++;
current.value = index_access->getValue(part_idx, current.range.begin);
++current.range.begin;
current.value = index_access.getValue(part_index, current.range.begin);
parts_ranges_queue.push(std::move(current));
}
}
if (parts_ranges_queue.empty())
break;
if (rows_in_current_layer >= rows_per_layer && !layers_intersection_is_too_big() && result_layers.size() < max_layers)
borders.push_back(last_value);
}
for (const auto & [part_idx, last_mark] : current_part_range_end)
for (const auto & [part_index, last_mark] : current_part_range_end)
{
const auto & mark = MarkRange{current_part_range_begin[part_idx], last_mark + 1};
auto it = part_idx_in_layer.emplace(std::make_pair(part_idx, current_layer.size()));
if (it.second)
result_layers.back().emplace_back(
parts[part_idx].data_part, parts[part_idx].alter_conversions, parts[part_idx].part_index_in_query, MarkRanges{mark});
else
current_layer[it.first->second].ranges.push_back(mark);
current_part_range_begin[part_idx] = current_part_range_end[part_idx];
current_layer_builder.addRange(part_index, MarkRange{current_part_range_begin[part_index], last_mark + 1});
current_part_range_begin[part_index] = current_part_range_end[part_index];
}
result_layers.push_back(std::move(current_layer_builder.getCurrentRangesInDataParts()));
}
std::stable_sort(
non_intersecting_ranges_in_data_parts.begin(),
non_intersecting_ranges_in_data_parts.end(),
[](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
for (auto & layer : result_layers)
{
std::stable_sort(
@ -223,7 +585,7 @@ std::pair<std::vector<Values>, std::vector<RangesInDataParts>> split(RangesInDat
[](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
}
return {std::move(borders), std::move(result_layers)};
return {std::move(non_intersecting_ranges_in_data_parts), std::move(borders), std::move(result_layers)};
}
@ -329,44 +691,54 @@ static void reorderColumns(ActionsDAG & dag, const Block & header, const std::st
dag.getOutputs() = std::move(new_outputs);
}
Pipes buildPipesForReadingByPKRanges(
SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey(
const KeyDescription & primary_key,
ExpressionActionsPtr sorting_expr,
RangesInDataParts parts,
size_t max_layers,
ContextPtr context,
ReadingInOrderStepGetter && reading_step_getter)
ReadingInOrderStepGetter && in_order_reading_step_getter)
{
if (max_layers <= 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1");
auto && [borders, result_layers] = split(std::move(parts), max_layers);
SplitResult split_result = split(std::move(parts), max_layers);
SplitPartsWithRangesByPrimaryKeyResult result;
result.non_intersecting_parts_ranges = std::move(split_result.non_intersecting_parts_ranges);
auto borders = std::move(split_result.borders);
auto result_layers = std::move(split_result.layers);
auto filters = buildFilters(primary_key, borders);
Pipes pipes(result_layers.size());
for (size_t i = 0; i < result_layers.size(); ++i)
{
pipes[i] = reading_step_getter(std::move(result_layers[i]));
pipes[i].addSimpleTransform([sorting_expr](const Block & header)
Pipe layer_pipe = in_order_reading_step_getter(std::move(result_layers[i]));
layer_pipe.addSimpleTransform([sorting_expr](const Block & header)
{ return std::make_shared<ExpressionTransform>(header, sorting_expr); });
auto & filter_function = filters[i];
if (!filter_function)
continue;
auto syntax_result = TreeRewriter(context).analyze(filter_function, primary_key.expression->getRequiredColumnsWithTypes());
auto actions = ExpressionAnalyzer(filter_function, syntax_result, context).getActionsDAG(false);
reorderColumns(*actions, pipes[i].getHeader(), filter_function->getColumnName());
reorderColumns(*actions, layer_pipe.getHeader(), filter_function->getColumnName());
ExpressionActionsPtr expression_actions = std::make_shared<ExpressionActions>(std::move(actions));
auto description = fmt::format(
"filter values in ({}, {}]", i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf");
pipes[i].addSimpleTransform(
layer_pipe.addSimpleTransform(
[&](const Block & header)
{
auto step = std::make_shared<FilterSortedStreamByRange>(header, expression_actions, filter_function->getColumnName(), true);
step->setDescription(description);
return step;
});
result.merging_pipes.push_back(std::move(layer_pipe));
}
return pipes;
return result;
}
}

View File

@ -13,15 +13,25 @@ namespace DB
using ReadingInOrderStepGetter = std::function<Pipe(RangesInDataParts)>;
/// Splits parts into layers, each layer will contain parts subranges with PK values from its own range.
/// A separate pipe will be constructed for each layer with a reading step (provided by the reading_step_getter) and
/// a filter for this layer's range of PK values.
/// Will try to produce exactly max_layer pipes but may return less if data is distributed in not a very parallelizable way.
Pipes buildPipesForReadingByPKRanges(
struct SplitPartsWithRangesByPrimaryKeyResult
{
RangesInDataParts non_intersecting_parts_ranges;
Pipes merging_pipes;
};
/** Splits parts ranges into:
*
* 1. Non interesecing part ranges, for parts with level > 0.
* 2. Merging layers, that contain ranges from multiple parts. A separate pipe will be constructed for each layer
* with a reading step (provided by the in_order_reading_step_getter) and a filter for this layer's range of PK values.
*
* Will try to produce exactly max_layer layers but may return less if data is distributed in not a very parallelizable way.
*/
SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey(
const KeyDescription & primary_key,
ExpressionActionsPtr sorting_expr,
RangesInDataParts parts,
size_t max_layers,
ContextPtr context,
ReadingInOrderStepGetter && reading_step_getter);
ReadingInOrderStepGetter && in_order_reading_step_getter);
}

View File

@ -1072,9 +1072,6 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
it, parts_with_ranges.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; });
parts_to_merge_ranges.push_back(it);
}
/// We divide threads for each partition equally. But we will create at least the number of partitions threads.
/// (So, the total number of threads could be more than initial num_streams.
num_streams /= (parts_to_merge_ranges.size() - 1);
}
else
{
@ -1087,8 +1084,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
/// If do_not_merge_across_partitions_select_final is true and num_streams > 1
/// we will store lonely parts with level > 0 to use parallel select on them.
RangesInDataParts lonely_parts;
size_t sum_marks_in_lonely_parts = 0;
RangesInDataParts non_intersecting_parts_by_primary_key;
auto sorting_expr = std::make_shared<ExpressionActions>(metadata_for_reading->getSortingKey().expression->getActionsDAG().clone());
@ -1100,32 +1096,26 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
bool no_merging_final = settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0;
if (no_merging_final)
{
non_intersecting_parts_by_primary_key.push_back(std::move(*parts_to_merge_ranges[range_index]));
continue;
}
Pipes pipes;
{
RangesInDataParts new_parts;
if (no_merging_final)
{
if (num_streams > 1)
sum_marks_in_lonely_parts += parts_to_merge_ranges[range_index]->getMarksCount();
lonely_parts.push_back(std::move(*parts_to_merge_ranges[range_index]));
continue;
}
else
{
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
{
new_parts.emplace_back(part_it->data_part, part_it->alter_conversions, part_it->part_index_in_query, part_it->ranges);
}
}
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
new_parts.emplace_back(part_it->data_part, part_it->alter_conversions, part_it->part_index_in_query, part_it->ranges);
if (new_parts.empty())
continue;
if (num_streams > 1 && metadata_for_reading->hasPrimaryKey())
{
// Let's split parts into layers to ensure data parallelism of FINAL.
auto reading_step_getter = [this, &column_names, &info](auto parts)
// Let's split parts into non intersecting parts ranges and layers to ensure data parallelism of FINAL.
auto in_order_reading_step_getter = [this, &column_names, &info](auto parts)
{
return this->read(
std::move(parts),
@ -1136,13 +1126,19 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
info.use_uncompressed_cache);
};
pipes = buildPipesForReadingByPKRanges(
SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
metadata_for_reading->getPrimaryKey(),
sorting_expr,
std::move(new_parts),
num_streams,
context,
std::move(reading_step_getter));
std::move(in_order_reading_step_getter));
for (auto && non_intersecting_parts_range : split_ranges_result.non_intersecting_parts_ranges)
non_intersecting_parts_by_primary_key.push_back(std::move(non_intersecting_parts_range));
for (auto && merging_pipe : split_ranges_result.merging_pipes)
pipes.push_back(std::move(merging_pipe));
}
else
{
@ -1154,10 +1150,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
}
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
if (!out_projection && !pipes.empty())
out_projection = createProjection(pipes.front().getHeader());
}
if (pipes.empty())
continue;
Names sort_columns = metadata_for_reading->getSortingKeyColumns();
SortDescription sort_description;
@ -1183,45 +1181,9 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
merging_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
}
if (!lonely_parts.empty())
if (!non_intersecting_parts_by_primary_key.empty())
{
Pipe pipe;
if (num_streams > 1)
{
size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size();
const size_t min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data_settings->index_granularity,
info.index_granularity_bytes,
sum_marks_in_lonely_parts);
/// Reduce the number of num_streams_for_lonely_parts if the data is small.
if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read
&& lonely_parts.size() < num_streams_for_lonely_parts)
num_streams_for_lonely_parts = std::max(
(sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read,
lonely_parts.size());
pipe = read(
std::move(lonely_parts),
origin_column_names,
ReadFromMergeTree::ReadType::Default,
num_streams_for_lonely_parts,
min_marks_for_concurrent_read,
info.use_uncompressed_cache);
}
else
{
pipe = read(
std::move(lonely_parts),
origin_column_names,
ReadFromMergeTree::ReadType::InOrder,
num_streams,
0,
info.use_uncompressed_cache);
}
auto pipe = spreadMarkRangesAmongStreams(std::move(non_intersecting_parts_by_primary_key), num_streams, origin_column_names);
no_merging_pipes.emplace_back(std::move(pipe));
}

View File

@ -0,0 +1,85 @@
1
--
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
--
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
--
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
--
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
20 20
21 21
22 22
23 23
24 24
25 25
26 26
27 27
28 28
29 29
30 30
31 31

View File

@ -0,0 +1,34 @@
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table
(
id UInt64,
value String
) ENGINE=ReplacingMergeTree ORDER BY id SETTINGS index_granularity = 2;
INSERT INTO test_table SELECT 0, '0';
INSERT INTO test_table SELECT number + 1, number + 1 FROM numbers(15);
OPTIMIZE TABLE test_table;
SELECT COUNT() FROM system.parts WHERE table = 'test_table' AND active = 1;
SYSTEM STOP MERGES test_table;
SELECT '--';
SELECT id, value FROM test_table FINAL ORDER BY id;
SELECT '--';
INSERT INTO test_table SELECT 5, '5';
SELECT id, value FROM test_table FINAL ORDER BY id;
SELECT '--';
INSERT INTO test_table SELECT number + 8, number + 8 FROM numbers(8);
SELECT id, value FROM test_table FINAL ORDER BY id;
SELECT '--';
INSERT INTO test_table SELECT number, number FROM numbers(32);
SELECT id, value FROM test_table FINAL ORDER BY id;
DROP TABLE test_table;