mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 07:01:59 +00:00
implement split range on MergeTreeReadPoolParallelReplicasInOrder
This commit is contained in:
parent
2d2d611bd3
commit
bf4445158c
@ -1,4 +1,5 @@
|
||||
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h>
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
@ -14,6 +15,11 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace MergeTreeSetting
|
||||
{
|
||||
extern const MergeTreeSettingsUInt64 index_granularity;
|
||||
}
|
||||
|
||||
MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrder(
|
||||
ParallelReadingExtension extension_,
|
||||
CoordinationMode mode_,
|
||||
@ -70,16 +76,73 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
|
||||
task_idx, per_part_infos.size());
|
||||
|
||||
const auto & part_info = per_part_infos[task_idx]->data_part->info;
|
||||
auto get_from_buffer = [&]() -> std::optional<MarkRanges>
|
||||
const auto & data_settings = per_part_infos[task_idx]->data_part->storage.getSettings();
|
||||
auto get_from_buffer = [&,
|
||||
rows_granularity = (*data_settings)[MergeTreeSetting::index_granularity],
|
||||
my_max_block_size = this->block_size_params.max_block_size_rows]() -> std::optional<MarkRanges>
|
||||
{
|
||||
const size_t max_marks_in_range = (my_max_block_size + rows_granularity - 1) / rows_granularity;
|
||||
for (auto & desc : buffered_tasks)
|
||||
{
|
||||
if (desc.info == part_info && !desc.ranges.empty())
|
||||
{
|
||||
auto result = std::move(desc.ranges);
|
||||
desc.ranges = MarkRanges{};
|
||||
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, desc.ranges.getNumberOfMarks());
|
||||
return result;
|
||||
if (mode == CoordinationMode::WithOrder)
|
||||
{
|
||||
/// if already splited, just return desc.ranges
|
||||
if (split)
|
||||
{
|
||||
auto result = std::move(desc.ranges);
|
||||
desc.ranges = MarkRanges{};
|
||||
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, result.getNumberOfMarks());
|
||||
return result;
|
||||
}
|
||||
|
||||
/// else split range but still return all MarkRanges
|
||||
MarkRanges result;
|
||||
for (auto range : desc.ranges)
|
||||
{
|
||||
while (!split && range.begin + marks_in_range < range.end)
|
||||
{
|
||||
result.emplace_back(range.begin, range.begin + marks_in_range);
|
||||
range.begin += marks_in_range;
|
||||
marks_in_range *= 2;
|
||||
if (marks_in_range > max_marks_in_range)
|
||||
split = true;
|
||||
}
|
||||
result.emplace_back(range.begin, range.begin + marks_in_range);
|
||||
}
|
||||
chassert(!result.empty());
|
||||
desc.ranges = MarkRanges{};
|
||||
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, result.getNumberOfMarks());
|
||||
return result;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// for reverse order just reutrn one range
|
||||
MarkRanges result;
|
||||
auto & range = desc.ranges.back();
|
||||
if (range.begin + marks_in_range < range.end)
|
||||
{
|
||||
result.emplace_front(range.end - marks_in_range, range.end);
|
||||
range.end -= marks_in_range;
|
||||
marks_in_range = std::min(marks_in_range * 2, max_marks_in_range);
|
||||
}
|
||||
else
|
||||
{
|
||||
result.emplace_front(range.begin, range.end);
|
||||
desc.ranges.pop_back();
|
||||
}
|
||||
|
||||
/// TODO: should we need reset for each ranges?
|
||||
if (desc.ranges.empty())
|
||||
{
|
||||
marks_in_range = 1;
|
||||
}
|
||||
|
||||
chassert(result.size() == 1);
|
||||
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, result.getNumberOfMarks());
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
|
@ -38,6 +38,8 @@ private:
|
||||
RangesInDataPartsDescription buffered_tasks;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
bool split{false};
|
||||
size_t marks_in_range{1};
|
||||
};
|
||||
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user