This commit is contained in:
JIaQi 2024-11-21 14:05:20 +08:00 committed by GitHub
commit 5c882b0fee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 70 additions and 5 deletions

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
namespace ProfileEvents
{
@ -14,6 +15,11 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
namespace MergeTreeSetting
{
extern const MergeTreeSettingsUInt64 index_granularity;
}
MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrder(
ParallelReadingExtension extension_,
CoordinationMode mode_,
@ -70,16 +76,73 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
task_idx, per_part_infos.size());
const auto & part_info = per_part_infos[task_idx]->data_part->info;
auto get_from_buffer = [&]() -> std::optional<MarkRanges>
const auto & data_settings = per_part_infos[task_idx]->data_part->storage.getSettings();
auto get_from_buffer = [&,
rows_granularity = (*data_settings)[MergeTreeSetting::index_granularity],
my_max_block_size = this->block_size_params.max_block_size_rows]() -> std::optional<MarkRanges>
{
const size_t max_marks_in_range = (my_max_block_size + rows_granularity - 1) / rows_granularity;
for (auto & desc : buffered_tasks)
{
if (desc.info == part_info && !desc.ranges.empty())
{
auto result = std::move(desc.ranges);
desc.ranges = MarkRanges{};
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, desc.ranges.getNumberOfMarks());
return result;
if (mode == CoordinationMode::WithOrder)
{
/// if already splited, just return desc.ranges
if (split)
{
auto result = std::move(desc.ranges);
desc.ranges = MarkRanges{};
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, result.getNumberOfMarks());
return result;
}
/// else split range but still return all MarkRanges
MarkRanges result;
for (auto range : desc.ranges)
{
while (!split && range.begin + marks_in_range < range.end)
{
result.emplace_back(range.begin, range.begin + marks_in_range);
range.begin += marks_in_range;
marks_in_range *= 2;
if (marks_in_range > max_marks_in_range)
split = true;
}
result.emplace_back(range.begin, range.begin + marks_in_range);
}
chassert(!result.empty());
desc.ranges = MarkRanges{};
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, result.getNumberOfMarks());
return result;
}
else
{
/// for reverse order just reutrn one range
MarkRanges result;
auto & range = desc.ranges.back();
if (range.begin + marks_in_range < range.end)
{
result.emplace_front(range.end - marks_in_range, range.end);
range.end -= marks_in_range;
marks_in_range = std::min(marks_in_range * 2, max_marks_in_range);
}
else
{
result.emplace_front(range.begin, range.end);
desc.ranges.pop_back();
}
/// TODO: should we need reset for each ranges?
if (desc.ranges.empty())
{
marks_in_range = 1;
}
chassert(result.size() == 1);
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, result.getNumberOfMarks());
return result;
}
}
}
return std::nullopt;

View File

@ -38,6 +38,8 @@ private:
RangesInDataPartsDescription buffered_tasks;
mutable std::mutex mutex;
bool split{false};
size_t marks_in_range{1};
};
};