Build pipeline for next column for prefetching

This commit is contained in:
Alexander Gololobov 2024-09-03 17:02:25 +02:00
parent a1cec53b7c
commit 8361724539
2 changed files with 81 additions and 73 deletions

View File

@ -804,53 +804,12 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
bool all_parts_on_remote_disks = std::ranges::all_of(global_ctx->future_part->parts, [](const auto & part) { return part->isStoredOnRemoteDisk(); });
ctx->use_prefetch = all_parts_on_remote_disks && global_ctx->data->getSettings()->vertical_merge_remote_filesystem_prefetch;
// if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end())
// ctx->prepared_pipe = createPipeForReadingOneColumn(ctx->it_name_and_type->name);
if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end())
ctx->prepared_pipeline = createPipelineForReadingOneColumn(ctx->it_name_and_type->name);
return false;
}
QueryPlan MergeTask::VerticalMergeStage::createPlanForReadingOneColumn(const String & column_name) const
{
/// Read from all parts
std::vector<QueryPlanPtr> plans;
for (const auto & part : global_ctx->future_part->parts)
{
auto plan_for_part = std::make_unique<QueryPlan>();
createReadFromPartStep(
MergeTreeSequentialSourceType::Merge,
*plan_for_part,
*global_ctx->data,
global_ctx->storage_snapshot,
part,
Names{column_name},
global_ctx->input_rows_filtered,
/*apply_deleted_mask=*/ true,
std::nullopt,
ctx->read_with_direct_io,
ctx->use_prefetch,
global_ctx->context,
getLogger("VerticalMergeStage"));
plans.emplace_back(std::move(plan_for_part));
}
QueryPlan merge_parts_query_plan;
/// Union of all parts streams
{
DataStreams input_streams;
input_streams.reserve(plans.size());
for (auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
merge_parts_query_plan.unitePlans(std::move(union_step), std::move(plans));
}
return merge_parts_query_plan;
}
/// Gathers values from all parts for one column using rows sources temporary file
class ColumnGathererStep : public ITransformingStep
{
@ -916,32 +875,46 @@ private:
const bool is_result_sparse;
};
void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::VerticalMergeStage::createPipelineForReadingOneColumn(const String & column_name) const
{
const auto & column_name = ctx->it_name_and_type->name;
/// Read from all parts
std::vector<QueryPlanPtr> plans;
for (const auto & part : global_ctx->future_part->parts)
{
auto plan_for_part = std::make_unique<QueryPlan>();
createReadFromPartStep(
MergeTreeSequentialSourceType::Merge,
*plan_for_part,
*global_ctx->data,
global_ctx->storage_snapshot,
part,
Names{column_name},
global_ctx->input_rows_filtered,
/*apply_deleted_mask=*/ true,
std::nullopt,
ctx->read_with_direct_io,
ctx->use_prefetch,
global_ctx->context,
getLogger("VerticalMergeStage"));
ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed);
global_ctx->column_progress = std::make_unique<MergeStageProgress>(ctx->progress_before, ctx->column_sizes->columnWeight(column_name));
plans.emplace_back(std::move(plan_for_part));
}
// Pipe pipe;
//// if (ctx->prepared_pipe)
//// {
//// pipe = std::move(*ctx->prepared_pipe);
////
//// auto next_column_it = std::next(ctx->it_name_and_type);
//// if (next_column_it != global_ctx->gathering_columns.end())
//// ctx->prepared_pipe = createPipeForReadingOneColumn(next_column_it->name);
//// }
//// else
// {
// pipe = createPipeForReadingOneColumn(column_name);
// }
QueryPlan merge_column_query_plan;
auto merge_column_query_plan = createPlanForReadingOneColumn(column_name);
/// Union of all parts streams
{
DataStreams input_streams;
input_streams.reserve(plans.size());
for (auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
merge_column_query_plan.unitePlans(std::move(union_step), std::move(plans));
}
/// Add column gatherer step
{
// ctx->rows_sources_read_buf->seek(0, 0);
bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE;
const auto data_settings = global_ctx->data->getSettings();
auto merge_step = std::make_unique<ColumnGathererStep>(
@ -973,13 +946,36 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
}
}
{
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
ctx->column_parts_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
return {QueryPipelineBuilder::getPipeline(std::move(*builder)), std::move(indexes_to_recalc)};
}
void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
{
const auto & column_name = ctx->it_name_and_type->name;
ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed);
global_ctx->column_progress = std::make_unique<MergeStageProgress>(ctx->progress_before, ctx->column_sizes->columnWeight(column_name));
VerticalMergeRuntimeContext::PreparedColumnPipeline column_pipepline;
if (ctx->prepared_pipeline)
{
column_pipepline = std::move(*ctx->prepared_pipeline);
/// Prepare next column pipeline to initiate prefetching
auto next_column_it = std::next(ctx->it_name_and_type);
if (next_column_it != global_ctx->gathering_columns.end())
ctx->prepared_pipeline = createPipelineForReadingOneColumn(next_column_it->name);
}
else
{
column_pipepline = createPipelineForReadingOneColumn(column_name);
}
ctx->column_parts_pipeline = std::move(column_pipepline.pipeline);
/// Dereference unique_ptr
ctx->column_parts_pipeline.setProgressCallback(MergeProgressCallback(
@ -997,12 +993,16 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
global_ctx->metadata_snapshot,
columns_list,
ctx->compression_codec,
indexes_to_recalc,
column_pipepline.indexes_to_recalc,
getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot),
&global_ctx->written_offset_columns,
global_ctx->to->getIndexGranularity());
ctx->column_elems_written = 0;
/// rows_sources_read_buf is reused for each column so we need to rewind it explicitly each time
/// This sharing also prevents from from running multiple merge of individual columns in parallel.
ctx->rows_sources_read_buf->seek(0, 0);
}
@ -1673,8 +1673,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
merge_parts_query_plan.getCurrentDataStream(),
SizeLimits(), 0 /*limit_hint*/,
global_ctx->deduplicate_by_columns,
false,
true /*TODO: ??*/);
false /*pre_distinct*/,
true /*optimize_distinct_in_order TODO: looks like it shoud be enabled*/);
deduplication_step->setStepDescription("Deduplication step");
merge_parts_query_plan.addStep(std::move(deduplication_step));
}

View File

@ -333,8 +333,16 @@ private:
Float64 progress_before = 0;
std::unique_ptr<MergedColumnOnlyOutputStream> column_to{nullptr};
// TODO: is this really needed for prefetch?
// std::optional<Pipe> prepared_pipe;
/// Used for prefetching. Right before starting merge of a column we create a pipeline for the next column
/// and it initiates prefetching of the first range of that column.
struct PreparedColumnPipeline
{
QueryPipeline pipeline;
MergeTreeIndices indexes_to_recalc;
};
std::optional<PreparedColumnPipeline> prepared_pipeline;
size_t max_delayed_streams = 0;
bool use_prefetch = false;
std::list<std::unique_ptr<MergedColumnOnlyOutputStream>> delayed_streams;
@ -379,7 +387,7 @@ private:
bool executeVerticalMergeForOneColumn() const;
void finalizeVerticalMergeForOneColumn() const;
QueryPlan createPlanForReadingOneColumn(const String & column_name) const;
VerticalMergeRuntimeContext::PreparedColumnPipeline createPipelineForReadingOneColumn(const String & column_name) const;
VerticalMergeRuntimeContextPtr ctx;
GlobalRuntimeContextPtr global_ctx;