mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Merge pull request #9050 from CurtizJ/fix-range-reader
Fix order of ranges in MergeTreeDataSelectExecutor
This commit is contained in:
commit
5b4b04e13b
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstddef>
|
||||
#include <vector>
|
||||
#include <deque>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -19,7 +19,7 @@ struct MarkRange
|
||||
MarkRange(const size_t begin_, const size_t end_) : begin{begin_}, end{end_} {}
|
||||
};
|
||||
|
||||
using MarkRanges = std::vector<MarkRange>;
|
||||
using MarkRanges = std::deque<MarkRange>;
|
||||
|
||||
|
||||
}
|
||||
|
@ -73,7 +73,8 @@ MergeTreeReadTask::MergeTreeReadTask(
|
||||
: data_part{data_part_}, mark_ranges{mark_ranges_}, part_index_in_query{part_index_in_query_},
|
||||
ordered_names{ordered_names_}, column_name_set{column_name_set_}, columns{columns_}, pre_columns{pre_columns_},
|
||||
remove_prewhere_column{remove_prewhere_column_}, should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)}
|
||||
{}
|
||||
{
|
||||
}
|
||||
|
||||
MergeTreeReadTask::~MergeTreeReadTask() = default;
|
||||
|
||||
|
@ -30,8 +30,7 @@ struct MergeTreeReadTask
|
||||
{
|
||||
/// data part which should be read while performing this task
|
||||
MergeTreeData::DataPartPtr data_part;
|
||||
/** Ranges to read from `data_part`.
|
||||
* Specified in reverse order for MergeTreeThreadSelectBlockInputStream's convenience of calling .pop_back(). */
|
||||
/// Ranges to read from `data_part`.
|
||||
MarkRanges mark_ranges;
|
||||
/// for virtual `part_index` virtual column
|
||||
size_t part_index_in_query;
|
||||
|
@ -718,9 +718,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
{
|
||||
total_rows += parts[i].getRowsCount();
|
||||
/// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`.
|
||||
std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());
|
||||
|
||||
sum_marks_in_parts[i] = parts[i].getMarksCount();
|
||||
sum_marks += sum_marks_in_parts[i];
|
||||
|
||||
@ -826,9 +823,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
|
||||
sum_marks_in_parts[i] = parts[i].getMarksCount();
|
||||
sum_marks += sum_marks_in_parts[i];
|
||||
|
||||
/// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`.
|
||||
std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());
|
||||
|
||||
if (parts[i].data_part->index_granularity_info.is_adaptive)
|
||||
adaptive_parts++;
|
||||
}
|
||||
@ -891,13 +885,12 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
|
||||
auto range = *it;
|
||||
while (range.begin + marks_in_range < range.end)
|
||||
{
|
||||
new_ranges.emplace_back(range.end - marks_in_range, range.end);
|
||||
new_ranges.emplace_front(range.end - marks_in_range, range.end);
|
||||
range.end -= marks_in_range;
|
||||
marks_in_range = std::min(marks_in_range * 2, max_marks_in_range);
|
||||
}
|
||||
new_ranges.emplace_back(range.begin, range.end);
|
||||
new_ranges.emplace_front(range.begin, range.end);
|
||||
}
|
||||
std::reverse(new_ranges.begin(), new_ranges.end());
|
||||
}
|
||||
|
||||
return new_ranges;
|
||||
@ -919,7 +912,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
|
||||
RangesInDataPart part = parts.back();
|
||||
parts.pop_back();
|
||||
|
||||
size_t & marks_in_part = sum_marks_in_parts.back();
|
||||
size_t & marks_in_part = sum_marks_in_parts.front();
|
||||
|
||||
/// We will not take too few rows from a part.
|
||||
if (marks_in_part >= min_marks_for_concurrent_read &&
|
||||
@ -936,9 +929,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
|
||||
/// We take the whole part if it is small enough.
|
||||
if (marks_in_part <= need_marks)
|
||||
{
|
||||
/// Restore the order of segments.
|
||||
std::reverse(part.ranges.begin(), part.ranges.end());
|
||||
|
||||
ranges_to_get_from_part = part.ranges;
|
||||
|
||||
need_marks -= marks_in_part;
|
||||
@ -952,7 +942,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
|
||||
if (part.ranges.empty())
|
||||
throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
MarkRange & range = part.ranges.back();
|
||||
MarkRange & range = part.ranges.front();
|
||||
|
||||
const size_t marks_in_range = range.end - range.begin;
|
||||
const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
|
||||
@ -962,7 +952,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
|
||||
marks_in_part -= marks_to_get_from_range;
|
||||
need_marks -= marks_to_get_from_range;
|
||||
if (range.begin == range.end)
|
||||
part.ranges.pop_back();
|
||||
part.ranges.pop_front();
|
||||
}
|
||||
parts.emplace_back(part);
|
||||
}
|
||||
|
@ -690,9 +690,9 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
|
||||
if (stream.isFinished())
|
||||
{
|
||||
result.addRows(stream.finalize(result.columns));
|
||||
stream = Stream(ranges.back().begin, ranges.back().end, merge_tree_reader);
|
||||
result.addRange(ranges.back());
|
||||
ranges.pop_back();
|
||||
stream = Stream(ranges.front().begin, ranges.front().end, merge_tree_reader);
|
||||
result.addRange(ranges.front());
|
||||
ranges.pop_front();
|
||||
}
|
||||
|
||||
auto rows_to_read = std::min(space_left, stream.numPendingRowsInCurrentGranule());
|
||||
|
@ -28,11 +28,6 @@ MergeTreeReadPool::MergeTreeReadPool(
|
||||
column_names{column_names_}, do_not_steal_tasks{do_not_steal_tasks_},
|
||||
predict_block_size_bytes{preferred_block_size_bytes_ > 0}, prewhere_info{prewhere_info_}, parts_ranges{parts_}
|
||||
{
|
||||
/// reverse from right-to-left to left-to-right
|
||||
/// because 'reverse' was done in MergeTreeDataSelectExecutor
|
||||
for (auto & part_ranges : parts_ranges)
|
||||
std::reverse(std::begin(part_ranges.ranges), std::end(part_ranges.ranges));
|
||||
|
||||
/// parts don't contain duplicate MergeTreeDataPart's.
|
||||
const auto per_part_sum_marks = fillPerPartInfo(parts_, check_columns_);
|
||||
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_, min_marks_for_concurrent_read_);
|
||||
@ -78,10 +73,6 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
|
||||
if (marks_in_part <= need_marks)
|
||||
{
|
||||
const auto marks_to_get_from_range = marks_in_part;
|
||||
|
||||
/** Ranges are in right-to-left order, because 'reverse' was done in MergeTreeDataSelectExecutor
|
||||
* and that order is supported in 'fillPerThreadInfo'.
|
||||
*/
|
||||
ranges_to_get_from_part = thread_task.ranges;
|
||||
|
||||
marks_in_part -= marks_to_get_from_range;
|
||||
@ -97,7 +88,7 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
|
||||
/// Loop through part ranges.
|
||||
while (need_marks > 0 && !thread_task.ranges.empty())
|
||||
{
|
||||
auto & range = thread_task.ranges.back();
|
||||
auto & range = thread_task.ranges.front();
|
||||
|
||||
const size_t marks_in_range = range.end - range.begin;
|
||||
const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
|
||||
@ -105,19 +96,11 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
|
||||
ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
|
||||
range.begin += marks_to_get_from_range;
|
||||
if (range.begin == range.end)
|
||||
{
|
||||
std::swap(range, thread_task.ranges.back());
|
||||
thread_task.ranges.pop_back();
|
||||
}
|
||||
thread_task.ranges.pop_front();
|
||||
|
||||
marks_in_part -= marks_to_get_from_range;
|
||||
need_marks -= marks_to_get_from_range;
|
||||
}
|
||||
|
||||
/** Change order to right-to-left, for MergeTreeThreadSelectBlockInputStream to get ranges with .pop_back()
|
||||
* (order was changed to left-to-right due to .pop_back() above).
|
||||
*/
|
||||
std::reverse(std::begin(ranges_to_get_from_part), std::end(ranges_to_get_from_part));
|
||||
}
|
||||
|
||||
auto curr_task_size_predictor = !per_part_size_predictor[part_idx] ? nullptr
|
||||
@ -211,7 +194,6 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
|
||||
|
||||
/// Read marks for every data part.
|
||||
size_t sum_marks = 0;
|
||||
/// Ranges are in right-to-left order, due to 'reverse' in MergeTreeDataSelectExecutor.
|
||||
for (const auto & range : part.ranges)
|
||||
sum_marks += range.end - range.begin;
|
||||
|
||||
@ -279,7 +261,6 @@ void MergeTreeReadPool::fillPerThreadInfo(
|
||||
/// Get whole part to read if it is small enough.
|
||||
if (marks_in_part <= need_marks)
|
||||
{
|
||||
/// Leave ranges in right-to-left order for convenience to use .pop_back() in .getTask()
|
||||
ranges_to_get_from_part = part.ranges;
|
||||
marks_in_ranges = marks_in_part;
|
||||
|
||||
@ -295,7 +276,7 @@ void MergeTreeReadPool::fillPerThreadInfo(
|
||||
if (part.ranges.empty())
|
||||
throw Exception("Unexpected end of ranges while spreading marks among threads", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
MarkRange & range = part.ranges.back();
|
||||
MarkRange & range = part.ranges.front();
|
||||
|
||||
const size_t marks_in_range = range.end - range.begin;
|
||||
const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
|
||||
@ -305,13 +286,8 @@ void MergeTreeReadPool::fillPerThreadInfo(
|
||||
marks_in_part -= marks_to_get_from_range;
|
||||
need_marks -= marks_to_get_from_range;
|
||||
if (range.begin == range.end)
|
||||
part.ranges.pop_back();
|
||||
part.ranges.pop_front();
|
||||
}
|
||||
|
||||
/** Change order to right-to-left, for getTask() to get ranges with .pop_back()
|
||||
* (order was changed to left-to-right due to .pop_back() above).
|
||||
*/
|
||||
std::reverse(std::begin(ranges_to_get_from_part), std::end(ranges_to_get_from_part));
|
||||
}
|
||||
|
||||
threads_tasks[i].parts_and_ranges.push_back({ part_idx, ranges_to_get_from_part });
|
||||
|
@ -57,7 +57,7 @@ public:
|
||||
|
||||
size_t getFirstMarkToRead() const
|
||||
{
|
||||
return all_mark_ranges.back().begin;
|
||||
return all_mark_ranges.front().begin;
|
||||
}
|
||||
private:
|
||||
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;
|
||||
|
@ -95,11 +95,6 @@ try
|
||||
|
||||
task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns);
|
||||
|
||||
/** @note you could simply swap `reverse` in if and else branches of MergeTreeDataSelectExecutor,
|
||||
* and remove this reverse. */
|
||||
MarkRanges remaining_mark_ranges = all_mark_ranges;
|
||||
std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end());
|
||||
|
||||
auto size_predictor = (preferred_block_size_bytes == 0)
|
||||
? nullptr
|
||||
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
|
||||
@ -109,7 +104,7 @@ try
|
||||
column_name_set = NameSet{column_names.begin(), column_names.end()};
|
||||
|
||||
task = std::make_unique<MergeTreeReadTask>(
|
||||
data_part, remaining_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
|
||||
data_part, all_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
|
||||
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
|
||||
task_columns.should_reorder, std::move(size_predictor));
|
||||
|
||||
|
@ -0,0 +1 @@
|
||||
300
|
@ -0,0 +1,16 @@
|
||||
drop table if exists t;
|
||||
|
||||
create table t (a Int, b Int) engine = MergeTree order by (a, b) settings index_granularity = 400;
|
||||
|
||||
insert into t select 0, 0 from numbers(50);
|
||||
insert into t select 0, 1 from numbers(350);
|
||||
insert into t select 1, 2 from numbers(400);
|
||||
insert into t select 2, 2 from numbers(400);
|
||||
insert into t select 3, 0 from numbers(100);
|
||||
|
||||
select sleep(1) format Null; -- sleep a bit to wait possible merges after insert
|
||||
|
||||
set max_threads = 1;
|
||||
optimize table t final;
|
||||
|
||||
select sum(a) from t where a in (0, 3) and b = 0;
|
Loading…
Reference in New Issue
Block a user