diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index fb5bbc4729c..75fd61ae4be 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -804,35 +804,106 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const bool all_parts_on_remote_disks = std::ranges::all_of(global_ctx->future_part->parts, [](const auto & part) { return part->isStoredOnRemoteDisk(); }); ctx->use_prefetch = all_parts_on_remote_disks && global_ctx->data->getSettings()->vertical_merge_remote_filesystem_prefetch; - if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end()) - ctx->prepared_pipe = createPipeForReadingOneColumn(ctx->it_name_and_type->name); +// if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end()) +// ctx->prepared_pipe = createPipeForReadingOneColumn(ctx->it_name_and_type->name); return false; } -Pipe MergeTask::VerticalMergeStage::createPipeForReadingOneColumn(const String & column_name) const +QueryPlan MergeTask::VerticalMergeStage::createPlanForReadingOneColumn(const String & column_name) const { - Pipes pipes; - for (size_t part_num = 0; part_num < global_ctx->future_part->parts.size(); ++part_num) + /// Read from all parts + std::vector plans; + for (const auto & part : global_ctx->future_part->parts) { - Pipe pipe = createMergeTreeSequentialSource( + auto plan_for_part = std::make_unique(); + createReadFromPartStep( MergeTreeSequentialSourceType::Merge, + *plan_for_part, *global_ctx->data, global_ctx->storage_snapshot, - global_ctx->future_part->parts[part_num], + part, Names{column_name}, - /*mark_ranges=*/ {}, global_ctx->input_rows_filtered, /*apply_deleted_mask=*/ true, + std::nullopt, ctx->read_with_direct_io, - ctx->use_prefetch); + ctx->use_prefetch, + global_ctx->context, + getLogger("VerticalMergeStage")); - pipes.emplace_back(std::move(pipe)); + plans.emplace_back(std::move(plan_for_part)); } - return Pipe::unitePipes(std::move(pipes)); + QueryPlan merge_parts_query_plan; + + /// Union of all parts streams + { + DataStreams input_streams; + input_streams.reserve(plans.size()); + for (auto & plan : plans) + input_streams.emplace_back(plan->getCurrentDataStream()); + + auto union_step = std::make_unique(std::move(input_streams)); + merge_parts_query_plan.unitePlans(std::move(union_step), std::move(plans)); + } + + return merge_parts_query_plan; } +/// Gathers values from all parts for one column using rows sources temporary file +class ColumnGathererStep : public ITransformingStep +{ +public: + ColumnGathererStep( + const DataStream & input_stream_, + CompressedReadBufferFromFile * rows_sources_read_buf_, + UInt64 merge_block_size_rows_, + UInt64 merge_block_size_bytes_, + bool is_result_sparse_) + : ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits? + , rows_sources_read_buf(rows_sources_read_buf_) + , merge_block_size_rows(merge_block_size_rows_) + , merge_block_size_bytes(merge_block_size_bytes_) + , is_result_sparse(is_result_sparse_) + {} + + String getName() const override { return "ColumnGatherer"; } + + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override + { + const auto &header = pipeline.getHeader(); + const auto input_streams_count = pipeline.getNumStreams(); + + rows_sources_read_buf->seek(0, 0); + + auto transform = std::make_unique( + header, + input_streams_count, + *rows_sources_read_buf, + merge_block_size_rows, + merge_block_size_bytes, + is_result_sparse); + + pipeline.addTransform(std::move(transform)); + } + + void updateOutputStream() override + { + output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); + + /// TODO: is this correct? + output_stream->sort_scope = DataStream::SortScope::None; + } + +private: + MergeTreeData::MergingParams merging_params{}; + CompressedReadBufferFromFile * rows_sources_read_buf; + const UInt64 merge_block_size_rows; + const UInt64 merge_block_size_bytes; + const bool is_result_sparse; +}; + void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const { const auto & column_name = ctx->it_name_and_type->name; @@ -840,50 +911,64 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed); global_ctx->column_progress = std::make_unique(ctx->progress_before, ctx->column_sizes->columnWeight(column_name)); - Pipe pipe; - if (ctx->prepared_pipe) - { - pipe = std::move(*ctx->prepared_pipe); +// Pipe pipe; +//// if (ctx->prepared_pipe) +//// { +//// pipe = std::move(*ctx->prepared_pipe); +//// +//// auto next_column_it = std::next(ctx->it_name_and_type); +//// if (next_column_it != global_ctx->gathering_columns.end()) +//// ctx->prepared_pipe = createPipeForReadingOneColumn(next_column_it->name); +//// } +//// else +// { +// pipe = createPipeForReadingOneColumn(column_name); +// } - auto next_column_it = std::next(ctx->it_name_and_type); - if (next_column_it != global_ctx->gathering_columns.end()) - ctx->prepared_pipe = createPipeForReadingOneColumn(next_column_it->name); - } - else + auto merge_column_query_plan = createPlanForReadingOneColumn(column_name); + + /// Add column gatherer step { - pipe = createPipeForReadingOneColumn(column_name); +// ctx->rows_sources_read_buf->seek(0, 0); + bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE; + const auto data_settings = global_ctx->data->getSettings(); + auto merge_step = std::make_unique( + merge_column_query_plan.getCurrentDataStream(), + ctx->rows_sources_read_buf.get(), //global_ctx->rows_sources_temporary_file_name, + data_settings->merge_max_block_size, + data_settings->merge_max_block_size_bytes, + is_result_sparse); + merge_step->setStepDescription("Gather column"); + merge_column_query_plan.addStep(std::move(merge_step)); } - ctx->rows_sources_read_buf->seek(0, 0); - bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE; - - const auto data_settings = global_ctx->data->getSettings(); - auto transform = std::make_unique( - pipe.getHeader(), - pipe.numOutputPorts(), - *ctx->rows_sources_read_buf, - data_settings->merge_max_block_size, - data_settings->merge_max_block_size_bytes, - is_result_sparse); - - pipe.addTransform(std::move(transform)); - + /// Add expression step for indexes MergeTreeIndices indexes_to_recalc; - auto indexes_it = global_ctx->skip_indexes_by_column.find(column_name); - - if (indexes_it != global_ctx->skip_indexes_by_column.end()) + IndicesDescription indexes_to_recalc_description; { - indexes_to_recalc = MergeTreeIndexFactory::instance().getMany(indexes_it->second); + auto indexes_it = global_ctx->skip_indexes_by_column.find(column_name); - pipe.addTransform(std::make_shared( - pipe.getHeader(), - indexes_it->second.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), - global_ctx->data->getContext()))); + if (indexes_it != global_ctx->skip_indexes_by_column.end()) + { + indexes_to_recalc_description = indexes_it->second; + indexes_to_recalc = MergeTreeIndexFactory::instance().getMany(indexes_it->second); - pipe.addTransform(std::make_shared(pipe.getHeader())); + auto indices_expression_dag = indexes_it->second.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())->getActionsDAG().clone(); + auto calculate_indices_expression_step = std::make_unique( + merge_column_query_plan.getCurrentDataStream(), + std::move(indices_expression_dag)); + merge_column_query_plan.addStep(std::move(calculate_indices_expression_step)); + } } - ctx->column_parts_pipeline = QueryPipeline(std::move(pipe)); + { + auto pipelineSettings = BuildQueryPipelineSettings::fromContext(global_ctx->context); + auto builder = merge_column_query_plan.buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(global_ctx->context), + pipelineSettings); + + ctx->column_parts_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + } /// Dereference unique_ptr ctx->column_parts_pipeline.setProgressCallback(MergeProgressCallback( diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index a5d7851932c..b36f5f832d9 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -333,7 +333,8 @@ private: Float64 progress_before = 0; std::unique_ptr column_to{nullptr}; - std::optional prepared_pipe; +// TODO: is this really needed for prefetch? +// std::optional prepared_pipe; size_t max_delayed_streams = 0; bool use_prefetch = false; std::list> delayed_streams; @@ -378,7 +379,7 @@ private: bool executeVerticalMergeForOneColumn() const; void finalizeVerticalMergeForOneColumn() const; - Pipe createPipeForReadingOneColumn(const String & column_name) const; + QueryPlan createPlanForReadingOneColumn(const String & column_name) const; VerticalMergeRuntimeContextPtr ctx; GlobalRuntimeContextPtr global_ctx;