Use query plan for column vertical merges

This commit is contained in:
Alexander Gololobov 2024-09-02 22:36:42 +02:00
parent 13f4eb3fac
commit 48cacd6f31
2 changed files with 133 additions and 47 deletions

View File

@ -804,35 +804,106 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
bool all_parts_on_remote_disks = std::ranges::all_of(global_ctx->future_part->parts, [](const auto & part) { return part->isStoredOnRemoteDisk(); }); bool all_parts_on_remote_disks = std::ranges::all_of(global_ctx->future_part->parts, [](const auto & part) { return part->isStoredOnRemoteDisk(); });
ctx->use_prefetch = all_parts_on_remote_disks && global_ctx->data->getSettings()->vertical_merge_remote_filesystem_prefetch; ctx->use_prefetch = all_parts_on_remote_disks && global_ctx->data->getSettings()->vertical_merge_remote_filesystem_prefetch;
if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end()) // if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end())
ctx->prepared_pipe = createPipeForReadingOneColumn(ctx->it_name_and_type->name); // ctx->prepared_pipe = createPipeForReadingOneColumn(ctx->it_name_and_type->name);
return false; return false;
} }
Pipe MergeTask::VerticalMergeStage::createPipeForReadingOneColumn(const String & column_name) const QueryPlan MergeTask::VerticalMergeStage::createPlanForReadingOneColumn(const String & column_name) const
{ {
Pipes pipes; /// Read from all parts
for (size_t part_num = 0; part_num < global_ctx->future_part->parts.size(); ++part_num) std::vector<QueryPlanPtr> plans;
for (const auto & part : global_ctx->future_part->parts)
{ {
Pipe pipe = createMergeTreeSequentialSource( auto plan_for_part = std::make_unique<QueryPlan>();
createReadFromPartStep(
MergeTreeSequentialSourceType::Merge, MergeTreeSequentialSourceType::Merge,
*plan_for_part,
*global_ctx->data, *global_ctx->data,
global_ctx->storage_snapshot, global_ctx->storage_snapshot,
global_ctx->future_part->parts[part_num], part,
Names{column_name}, Names{column_name},
/*mark_ranges=*/ {},
global_ctx->input_rows_filtered, global_ctx->input_rows_filtered,
/*apply_deleted_mask=*/ true, /*apply_deleted_mask=*/ true,
std::nullopt,
ctx->read_with_direct_io, ctx->read_with_direct_io,
ctx->use_prefetch); ctx->use_prefetch,
global_ctx->context,
getLogger("VerticalMergeStage"));
pipes.emplace_back(std::move(pipe)); plans.emplace_back(std::move(plan_for_part));
} }
return Pipe::unitePipes(std::move(pipes)); QueryPlan merge_parts_query_plan;
/// Union of all parts streams
{
DataStreams input_streams;
input_streams.reserve(plans.size());
for (auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
merge_parts_query_plan.unitePlans(std::move(union_step), std::move(plans));
}
return merge_parts_query_plan;
} }
/// Gathers values from all parts for one column using rows sources temporary file
class ColumnGathererStep : public ITransformingStep
{
public:
ColumnGathererStep(
const DataStream & input_stream_,
CompressedReadBufferFromFile * rows_sources_read_buf_,
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool is_result_sparse_)
: ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits?
, rows_sources_read_buf(rows_sources_read_buf_)
, merge_block_size_rows(merge_block_size_rows_)
, merge_block_size_bytes(merge_block_size_bytes_)
, is_result_sparse(is_result_sparse_)
{}
String getName() const override { return "ColumnGatherer"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override
{
const auto &header = pipeline.getHeader();
const auto input_streams_count = pipeline.getNumStreams();
rows_sources_read_buf->seek(0, 0);
auto transform = std::make_unique<ColumnGathererTransform>(
header,
input_streams_count,
*rows_sources_read_buf,
merge_block_size_rows,
merge_block_size_bytes,
is_result_sparse);
pipeline.addTransform(std::move(transform));
}
void updateOutputStream() override
{
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
/// TODO: is this correct?
output_stream->sort_scope = DataStream::SortScope::None;
}
private:
MergeTreeData::MergingParams merging_params{};
CompressedReadBufferFromFile * rows_sources_read_buf;
const UInt64 merge_block_size_rows;
const UInt64 merge_block_size_bytes;
const bool is_result_sparse;
};
void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
{ {
const auto & column_name = ctx->it_name_and_type->name; const auto & column_name = ctx->it_name_and_type->name;
@ -840,50 +911,64 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed); ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed);
global_ctx->column_progress = std::make_unique<MergeStageProgress>(ctx->progress_before, ctx->column_sizes->columnWeight(column_name)); global_ctx->column_progress = std::make_unique<MergeStageProgress>(ctx->progress_before, ctx->column_sizes->columnWeight(column_name));
Pipe pipe; // Pipe pipe;
if (ctx->prepared_pipe) //// if (ctx->prepared_pipe)
{ //// {
pipe = std::move(*ctx->prepared_pipe); //// pipe = std::move(*ctx->prepared_pipe);
////
//// auto next_column_it = std::next(ctx->it_name_and_type);
//// if (next_column_it != global_ctx->gathering_columns.end())
//// ctx->prepared_pipe = createPipeForReadingOneColumn(next_column_it->name);
//// }
//// else
// {
// pipe = createPipeForReadingOneColumn(column_name);
// }
auto next_column_it = std::next(ctx->it_name_and_type); auto merge_column_query_plan = createPlanForReadingOneColumn(column_name);
if (next_column_it != global_ctx->gathering_columns.end())
ctx->prepared_pipe = createPipeForReadingOneColumn(next_column_it->name); /// Add column gatherer step
}
else
{ {
pipe = createPipeForReadingOneColumn(column_name); // ctx->rows_sources_read_buf->seek(0, 0);
bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE;
const auto data_settings = global_ctx->data->getSettings();
auto merge_step = std::make_unique<ColumnGathererStep>(
merge_column_query_plan.getCurrentDataStream(),
ctx->rows_sources_read_buf.get(), //global_ctx->rows_sources_temporary_file_name,
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
is_result_sparse);
merge_step->setStepDescription("Gather column");
merge_column_query_plan.addStep(std::move(merge_step));
} }
ctx->rows_sources_read_buf->seek(0, 0); /// Add expression step for indexes
bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE;
const auto data_settings = global_ctx->data->getSettings();
auto transform = std::make_unique<ColumnGathererTransform>(
pipe.getHeader(),
pipe.numOutputPorts(),
*ctx->rows_sources_read_buf,
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
is_result_sparse);
pipe.addTransform(std::move(transform));
MergeTreeIndices indexes_to_recalc; MergeTreeIndices indexes_to_recalc;
auto indexes_it = global_ctx->skip_indexes_by_column.find(column_name); IndicesDescription indexes_to_recalc_description;
if (indexes_it != global_ctx->skip_indexes_by_column.end())
{ {
indexes_to_recalc = MergeTreeIndexFactory::instance().getMany(indexes_it->second); auto indexes_it = global_ctx->skip_indexes_by_column.find(column_name);
pipe.addTransform(std::make_shared<ExpressionTransform>( if (indexes_it != global_ctx->skip_indexes_by_column.end())
pipe.getHeader(), {
indexes_it->second.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), indexes_to_recalc_description = indexes_it->second;
global_ctx->data->getContext()))); indexes_to_recalc = MergeTreeIndexFactory::instance().getMany(indexes_it->second);
pipe.addTransform(std::make_shared<MaterializingTransform>(pipe.getHeader())); auto indices_expression_dag = indexes_it->second.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())->getActionsDAG().clone();
auto calculate_indices_expression_step = std::make_unique<ExpressionStep>(
merge_column_query_plan.getCurrentDataStream(),
std::move(indices_expression_dag));
merge_column_query_plan.addStep(std::move(calculate_indices_expression_step));
}
} }
ctx->column_parts_pipeline = QueryPipeline(std::move(pipe)); {
auto pipelineSettings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto builder = merge_column_query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(global_ctx->context),
pipelineSettings);
ctx->column_parts_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
}
/// Dereference unique_ptr /// Dereference unique_ptr
ctx->column_parts_pipeline.setProgressCallback(MergeProgressCallback( ctx->column_parts_pipeline.setProgressCallback(MergeProgressCallback(

View File

@ -333,7 +333,8 @@ private:
Float64 progress_before = 0; Float64 progress_before = 0;
std::unique_ptr<MergedColumnOnlyOutputStream> column_to{nullptr}; std::unique_ptr<MergedColumnOnlyOutputStream> column_to{nullptr};
std::optional<Pipe> prepared_pipe; // TODO: is this really needed for prefetch?
// std::optional<Pipe> prepared_pipe;
size_t max_delayed_streams = 0; size_t max_delayed_streams = 0;
bool use_prefetch = false; bool use_prefetch = false;
std::list<std::unique_ptr<MergedColumnOnlyOutputStream>> delayed_streams; std::list<std::unique_ptr<MergedColumnOnlyOutputStream>> delayed_streams;
@ -378,7 +379,7 @@ private:
bool executeVerticalMergeForOneColumn() const; bool executeVerticalMergeForOneColumn() const;
void finalizeVerticalMergeForOneColumn() const; void finalizeVerticalMergeForOneColumn() const;
Pipe createPipeForReadingOneColumn(const String & column_name) const; QueryPlan createPlanForReadingOneColumn(const String & column_name) const;
VerticalMergeRuntimeContextPtr ctx; VerticalMergeRuntimeContextPtr ctx;
GlobalRuntimeContextPtr global_ctx; GlobalRuntimeContextPtr global_ctx;