maintain marks_in_range for each part

This commit is contained in:
“JiaQi 2024-11-25 19:57:48 +08:00
parent eef034e0f9
commit dffdb1fb36
2 changed files with 6 additions and 12 deletions

View File

@ -64,6 +64,8 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd
buffered_tasks.push_back({part.data_part->info, MarkRanges{}}); buffered_tasks.push_back({part.data_part->info, MarkRanges{}});
extension.sendInitialRequest(mode, parts_ranges, /*mark_segment_size_=*/0); extension.sendInitialRequest(mode, parts_ranges, /*mark_segment_size_=*/0);
per_part_marks_in_range.resize(per_part_infos.size(), 1);
} }
MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task) MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task)
@ -76,6 +78,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
const auto & part_info = per_part_infos[task_idx]->data_part->info; const auto & part_info = per_part_infos[task_idx]->data_part->info;
const auto & data_settings = per_part_infos[task_idx]->data_part->storage.getSettings(); const auto & data_settings = per_part_infos[task_idx]->data_part->storage.getSettings();
auto & marks_in_range = per_part_marks_in_range[task_idx];
auto get_from_buffer = [&, auto get_from_buffer = [&,
rows_granularity = (*data_settings)[MergeTreeSetting::index_granularity], rows_granularity = (*data_settings)[MergeTreeSetting::index_granularity],
my_max_block_size = this->block_size_params.max_block_size_rows]() -> std::optional<MarkRanges> my_max_block_size = this->block_size_params.max_block_size_rows]() -> std::optional<MarkRanges>
@ -88,7 +91,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
if (mode == CoordinationMode::WithOrder) if (mode == CoordinationMode::WithOrder)
{ {
/// if already splited, just return desc.ranges /// if already splited, just return desc.ranges
if (split) if (marks_in_range > max_marks_in_range)
{ {
auto result = std::move(desc.ranges); auto result = std::move(desc.ranges);
desc.ranges = MarkRanges{}; desc.ranges = MarkRanges{};
@ -100,13 +103,11 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
MarkRanges result; MarkRanges result;
for (auto range : desc.ranges) for (auto range : desc.ranges)
{ {
while (!split && range.begin + marks_in_range < range.end) while (marks_in_range <= max_marks_in_range && range.begin + marks_in_range < range.end)
{ {
result.emplace_back(range.begin, range.begin + marks_in_range); result.emplace_back(range.begin, range.begin + marks_in_range);
range.begin += marks_in_range; range.begin += marks_in_range;
marks_in_range *= 2; marks_in_range *= 2;
if (marks_in_range > max_marks_in_range)
split = true;
} }
result.emplace_back(range.begin, range.end); result.emplace_back(range.begin, range.end);
} }
@ -132,12 +133,6 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
desc.ranges.pop_back(); desc.ranges.pop_back();
} }
/// TODO: should we need reset for each ranges?
if (desc.ranges.empty())
{
marks_in_range = 1;
}
chassert(result.size() == 1); chassert(result.size() == 1);
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, result.getNumberOfMarks()); ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, result.getNumberOfMarks());
return result; return result;

View File

@ -38,8 +38,7 @@ private:
RangesInDataPartsDescription buffered_tasks; RangesInDataPartsDescription buffered_tasks;
mutable std::mutex mutex; mutable std::mutex mutex;
bool split{false}; std::vector<size_t> per_part_marks_in_range;
size_t marks_in_range{1};
}; };
}; };