From 7e444136bbad7e80f3a1905bbea7fa4c7e9a8337 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Mon, 2 Sep 2024 18:27:24 +0200 Subject: [PATCH 01/14] Use QueryPlan for horizontal part of merge --- src/Interpreters/MutationsInterpreter.cpp | 13 +- src/Storages/MergeTree/MergeTask.cpp | 412 ++++++++++++------ .../MergeTree/MergeTreeSequentialSource.cpp | 51 ++- .../MergeTree/MergeTreeSequentialSource.h | 3 + 4 files changed, 334 insertions(+), 145 deletions(-) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 0b93b5989b1..a8d45caeeaf 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -1217,9 +1217,16 @@ void MutationsInterpreter::Source::read( createReadFromPartStep( MergeTreeSequentialSourceType::Mutation, - plan, *data, storage_snapshot, - part, required_columns, - apply_deleted_mask_, std::move(filter), context_, + plan, + *data, storage_snapshot, + part, + required_columns, + nullptr, + apply_deleted_mask_, + std::move(filter), + false, + false, + context_, getLogger("MutationsInterpreter")); } else diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index fa86bb31629..3bee2ecb0d9 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -38,6 +38,11 @@ #include #include #include +#include +#include +#include +#include +#include #include #include #include @@ -1206,12 +1211,204 @@ bool MergeTask::execute() } +/// Apply merge strategy (Ordinary, Colapsing, Aggregating, etc) to the stream +class ApplyMergeStep : public ITransformingStep /// TODO: is this transformation step? +{ +public: + ApplyMergeStep( + const DataStream & input_stream_, + const SortDescription & sort_description_, + const Names partition_key_columns_, + const MergeTreeData::MergingParams & merging_params_, + WriteBuffer * rows_sources_write_buf_, + UInt64 merge_block_size_rows_, + UInt64 merge_block_size_bytes_, + bool blocks_are_granules_size_, + bool cleanup_) + : ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits? + , sort_description(sort_description_) + , partition_key_columns(partition_key_columns_) + , merging_params(merging_params_) + , rows_sources_write_buf(rows_sources_write_buf_) + , merge_block_size_rows(merge_block_size_rows_) + , merge_block_size_bytes(merge_block_size_bytes_) + , blocks_are_granules_size(blocks_are_granules_size_) + , cleanup(cleanup_) + {} + + String getName() const override { return "ApplyMergePolicy"; } + + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override + { + /// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number. + /// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part, + /// that is going in insertion order. + ProcessorPtr merged_transform; + +// /// There is no sense to have the block size bigger than one granule for merge operations. +// const UInt64 merge_block_size_rows = data_settings->merge_max_block_size; +// const UInt64 merge_block_size_bytes = data_settings->merge_max_block_size_bytes; + + const auto &header = pipeline.getHeader(); + const auto input_streams_count = pipeline.getNumStreams(); + + switch (merging_params.mode) + { + case MergeTreeData::MergingParams::Ordinary: + merged_transform = std::make_shared( + header, + input_streams_count, + sort_description, + merge_block_size_rows, + merge_block_size_bytes, + SortingQueueStrategy::Default, + /* limit_= */0, + /* always_read_till_end_= */false, + rows_sources_write_buf, + blocks_are_granules_size); + break; + + case MergeTreeData::MergingParams::Collapsing: + merged_transform = std::make_shared( + header, input_streams_count, sort_description, merging_params.sign_column, false, + merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size); + break; + + case MergeTreeData::MergingParams::Summing: + merged_transform = std::make_shared( + header, input_streams_count, sort_description, merging_params.columns_to_sum, partition_key_columns, merge_block_size_rows, merge_block_size_bytes); + break; + + case MergeTreeData::MergingParams::Aggregating: + merged_transform = std::make_shared(header, input_streams_count, sort_description, merge_block_size_rows, merge_block_size_bytes); + break; + + case MergeTreeData::MergingParams::Replacing: + merged_transform = std::make_shared( + header, input_streams_count, sort_description, merging_params.is_deleted_column, merging_params.version_column, + merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size, + cleanup); + break; + + case MergeTreeData::MergingParams::Graphite: + merged_transform = std::make_shared( + header, input_streams_count, sort_description, merge_block_size_rows, merge_block_size_bytes, + merging_params.graphite_params, time_of_merge); + break; + + case MergeTreeData::MergingParams::VersionedCollapsing: + merged_transform = std::make_shared( + header, input_streams_count, sort_description, merging_params.sign_column, + merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size); + break; + } + + pipeline.addTransform(std::move(merged_transform)); + +#ifndef NDEBUG + if (!sort_description.empty()) + { + pipeline.addSimpleTransform([&](const Block & header_) + { + auto transform = std::make_shared(header_, sort_description); + return transform; + }); + } +#endif + } + + void updateOutputStream() override + { + output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); + output_stream->sort_description = sort_description; + + /// TODO: is this correct? +// if (partition_key_columns.empty()) + output_stream->sort_scope = DataStream::SortScope::Global; +// else +// output_stream->sort_scope = DataStream::SortScope::Stream; + } + +private: + SortDescription sort_description; + Names partition_key_columns; + MergeTreeData::MergingParams merging_params{}; + WriteBuffer * rows_sources_write_buf; + const UInt64 merge_block_size_rows; + const UInt64 merge_block_size_bytes; + bool blocks_are_granules_size; + bool cleanup{false}; + time_t time_of_merge{0}; +}; + + +class MaterializingStep : public ITransformingStep /// TODO: is this transformation step? +{ +public: + explicit MaterializingStep( + const DataStream & input_stream_) + : ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits? + {} + + String getName() const override { return "Materializing"; } + + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override + { + pipeline.addTransform(std::make_shared(input_streams.front().header)); + } + + void updateOutputStream() override + { + /// TODO: can this be simplified? + output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); + output_stream->sort_description = input_streams.front().sort_description; + } +}; + + +class TTLStep : public ITransformingStep +{ +public: + TTLStep( + const DataStream & input_stream_, + const ContextPtr & context_, + const MergeTreeData & storage_, + const StorageMetadataPtr & metadata_snapshot_, + const MergeTreeData::MutableDataPartPtr & data_part_, + time_t current_time, + bool force_) + : ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits? + { + transform = std::make_shared(context_, input_stream_.header, storage_, metadata_snapshot_, data_part_, current_time, force_); + subqueries_for_sets = transform->getSubqueries(); + } + + String getName() const override { return "Materializing"; } + + PreparedSets::Subqueries getSubqueries() { return std::move(subqueries_for_sets); } + + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override + { + pipeline.addTransform(transform); + } + + void updateOutputStream() override + { + // TODO: implement? + } + +private: + std::shared_ptr transform; + PreparedSets::Subqueries subqueries_for_sets; +}; + + void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() { /** Read from all parts, merge and write into a new one. * In passing, we calculate expression for sorting. */ - Pipes pipes; + global_ctx->watch_prev_elapsed = 0; /// We count total amount of bytes in parts @@ -1238,143 +1435,92 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() global_ctx->horizontal_stage_progress = std::make_unique( ctx->column_sizes ? ctx->column_sizes->keyColumnsWeight() : 1.0); + auto sorting_key_expression_dag = global_ctx->metadata_snapshot->getSortingKey().expression->getActionsDAG().clone(); + + /// Read from all parts + std::vector plans; for (const auto & part : global_ctx->future_part->parts) { - Pipe pipe = createMergeTreeSequentialSource( + /// TODO: this is just for debugging purposes, remove it later + if (part->getMarksCount() == 0) + LOG_DEBUG(ctx->log, "Part {} is empty", part->name); + + auto plan_for_part = std::make_unique(); + createReadFromPartStep( MergeTreeSequentialSourceType::Merge, + *plan_for_part, *global_ctx->data, global_ctx->storage_snapshot, part, global_ctx->merging_columns.getNames(), - /*mark_ranges=*/ {}, global_ctx->input_rows_filtered, /*apply_deleted_mask=*/ true, + /*filter=*/ std::nullopt, ctx->read_with_direct_io, - /*prefetch=*/ false); + /*prefetch=*/ false, + global_ctx->context, + ctx->log); if (global_ctx->metadata_snapshot->hasSortingKey()) { - pipe.addSimpleTransform([this](const Block & header) - { - return std::make_shared(header, global_ctx->metadata_snapshot->getSortingKey().expression); - }); + /// Calculate sorting key expressions so that they are available for merge sorting. + auto calculate_sorting_key_expression_step = std::make_unique( + plan_for_part->getCurrentDataStream(), + sorting_key_expression_dag.clone()); /// TODO: can we avoid cloning here? + plan_for_part->addStep(std::move(calculate_sorting_key_expression_step)); } - pipes.emplace_back(std::move(pipe)); + plans.emplace_back(std::move(plan_for_part)); } + QueryPlan merge_parts_query_plan; - Names sort_columns = global_ctx->metadata_snapshot->getSortingKeyColumns(); - SortDescription sort_description; - sort_description.compile_sort_description = global_ctx->data->getContext()->getSettingsRef().compile_sort_description; - sort_description.min_count_to_compile_sort_description = global_ctx->data->getContext()->getSettingsRef().min_count_to_compile_sort_description; - - size_t sort_columns_size = sort_columns.size(); - sort_description.reserve(sort_columns_size); - - Names partition_key_columns = global_ctx->metadata_snapshot->getPartitionKey().column_names; - - Block header = pipes.at(0).getHeader(); - for (size_t i = 0; i < sort_columns_size; ++i) - sort_description.emplace_back(sort_columns[i], 1, 1); - -#ifndef NDEBUG - if (!sort_description.empty()) + /// Union of all parts streams { - for (size_t i = 0; i < pipes.size(); ++i) - { - auto & pipe = pipes[i]; - pipe.addSimpleTransform([&](const Block & header_) - { - auto transform = std::make_shared(header_, sort_description); - transform->setDescription(global_ctx->future_part->parts[i]->name); - return transform; - }); - } + DataStreams input_streams; + input_streams.reserve(plans.size()); + for (auto & plan : plans) + input_streams.emplace_back(plan->getCurrentDataStream()); + + auto union_step = std::make_unique(std::move(input_streams)); + merge_parts_query_plan.unitePlans(std::move(union_step), std::move(plans)); } -#endif - /// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number. - /// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part, - /// that is going in insertion order. - ProcessorPtr merged_transform; - - /// If merge is vertical we cannot calculate it - ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical); - - /// There is no sense to have the block size bigger than one granule for merge operations. - const UInt64 merge_block_size_rows = data_settings->merge_max_block_size; - const UInt64 merge_block_size_bytes = data_settings->merge_max_block_size_bytes; - - switch (ctx->merging_params.mode) + /// Merge { - case MergeTreeData::MergingParams::Ordinary: - merged_transform = std::make_shared( - header, - pipes.size(), - sort_description, - merge_block_size_rows, - merge_block_size_bytes, - SortingQueueStrategy::Default, - /* limit_= */0, - /* always_read_till_end_= */false, - ctx->rows_sources_write_buf.get(), - ctx->blocks_are_granules_size); - break; + Names sort_columns = global_ctx->metadata_snapshot->getSortingKeyColumns(); + SortDescription sort_description; + sort_description.compile_sort_description = global_ctx->data->getContext()->getSettingsRef().compile_sort_description; + sort_description.min_count_to_compile_sort_description = global_ctx->data->getContext()->getSettingsRef().min_count_to_compile_sort_description; - case MergeTreeData::MergingParams::Collapsing: - merged_transform = std::make_shared( - header, pipes.size(), sort_description, ctx->merging_params.sign_column, false, - merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size); - break; + size_t sort_columns_size = sort_columns.size(); + sort_description.reserve(sort_columns_size); - case MergeTreeData::MergingParams::Summing: - merged_transform = std::make_shared( - header, pipes.size(), sort_description, ctx->merging_params.columns_to_sum, partition_key_columns, merge_block_size_rows, merge_block_size_bytes); - break; + Names partition_key_columns = global_ctx->metadata_snapshot->getPartitionKey().column_names; - case MergeTreeData::MergingParams::Aggregating: - merged_transform = std::make_shared(header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes); - break; + for (size_t i = 0; i < sort_columns_size; ++i) + sort_description.emplace_back(sort_columns[i], 1, 1); - case MergeTreeData::MergingParams::Replacing: - if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup) - throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed"); + /// If merge is vertical we cannot calculate it + ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical); - merged_transform = std::make_shared( - header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column, - merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size, - global_ctx->cleanup); - break; + if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup) + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed"); - case MergeTreeData::MergingParams::Graphite: - merged_transform = std::make_shared( - header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes, - ctx->merging_params.graphite_params, global_ctx->time_of_merge); - break; - - case MergeTreeData::MergingParams::VersionedCollapsing: - merged_transform = std::make_shared( - header, pipes.size(), sort_description, ctx->merging_params.sign_column, - merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size); - break; + auto merge_step = std::make_unique( + merge_parts_query_plan.getCurrentDataStream(), + sort_description, + partition_key_columns, + ctx->merging_params, + ctx->rows_sources_write_buf.get(), + data_settings->merge_max_block_size, + data_settings->merge_max_block_size_bytes, + ctx->blocks_are_granules_size, + global_ctx->cleanup); + merge_step->setStepDescription("Merge sorted parts"); + merge_parts_query_plan.addStep(std::move(merge_step)); } - auto builder = std::make_unique(); - builder->init(Pipe::unitePipes(std::move(pipes))); - builder->addTransform(std::move(merged_transform)); - -#ifndef NDEBUG - if (!sort_description.empty()) - { - builder->addSimpleTransform([&](const Block & header_) - { - auto transform = std::make_shared(header_, sort_description); - return transform; - }); - } -#endif - if (global_ctx->deduplicate) { const auto & virtuals = *global_ctx->data->getVirtualsPtr(); @@ -1383,44 +1529,56 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() /// If deduplicate_by_columns is empty, add all columns except virtuals. if (global_ctx->deduplicate_by_columns.empty()) { - for (const auto & column : global_ctx->merging_columns) + for (const auto & column_name : global_ctx->merging_columns.getNames()) { - if (virtuals.tryGet(column.name, VirtualsKind::Persistent)) + if (virtuals.tryGet(column_name, VirtualsKind::Persistent)) continue; - global_ctx->deduplicate_by_columns.emplace_back(column.name); + global_ctx->deduplicate_by_columns.emplace_back(column_name); } } - if (DistinctSortedTransform::isApplicable(header, sort_description, global_ctx->deduplicate_by_columns)) - builder->addTransform(std::make_shared( - builder->getHeader(), sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns)); - else - builder->addTransform(std::make_shared( - builder->getHeader(), SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns)); + auto deduplication_step = std::make_unique( + merge_parts_query_plan.getCurrentDataStream(), + SizeLimits(), 0 /*limit_hint*/, + global_ctx->deduplicate_by_columns, + false, + true /*TODO: ??*/); + deduplication_step->setStepDescription("Deduplication step"); + merge_parts_query_plan.addStep(std::move(deduplication_step)); } PreparedSets::Subqueries subqueries; + /// TTL step if (ctx->need_remove_expired_values) { - auto transform = std::make_shared(global_ctx->context, builder->getHeader(), *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl); - subqueries = transform->getSubqueries(); - builder->addTransform(std::move(transform)); + auto ttl_step = std::make_unique( + merge_parts_query_plan.getCurrentDataStream(), global_ctx->context, *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl); + subqueries = ttl_step->getSubqueries(); + ttl_step->setStepDescription("TTL step"); + merge_parts_query_plan.addStep(std::move(ttl_step)); } + /// Secondary indices expressions if (!global_ctx->merging_skip_indexes.empty()) { - builder->addTransform(std::make_shared( - builder->getHeader(), - global_ctx->merging_skip_indexes.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), - global_ctx->data->getContext()))); - - builder->addTransform(std::make_shared(builder->getHeader())); + auto indices_expression_dag = global_ctx->merging_skip_indexes.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())->getActionsDAG().clone(); + auto calculate_indices_expression_step = std::make_unique( + merge_parts_query_plan.getCurrentDataStream(), + std::move(indices_expression_dag)); + merge_parts_query_plan.addStep(std::move(calculate_indices_expression_step)); + /// TODO: what is the purpose of MaterializingTransform in the original code? + merge_parts_query_plan.addStep(std::make_unique(merge_parts_query_plan.getCurrentDataStream())); } if (!subqueries.empty()) - builder = addCreatingSetsTransform(std::move(builder), std::move(subqueries), global_ctx->context); + addCreatingSetsStep(merge_parts_query_plan, std::move(subqueries), global_ctx->context); + + auto pipelineSettings = BuildQueryPipelineSettings::fromContext(global_ctx->context); + auto builder = merge_parts_query_plan.buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(global_ctx->context), + pipelineSettings); global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); /// Dereference unique_ptr and pass horizontal_stage_progress by reference diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index 39aa191a3d2..444a59b5590 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -347,8 +347,11 @@ public: const StorageSnapshotPtr & storage_snapshot_, MergeTreeData::DataPartPtr data_part_, Names columns_to_read_, + std::shared_ptr> filtered_rows_count_, bool apply_deleted_mask_, std::optional filter_, + bool read_with_direct_io_, + bool prefetch_, ContextPtr context_, LoggerPtr log_) : ISourceStep(DataStream{.header = storage_snapshot_->getSampleBlockForColumns(columns_to_read_)}) @@ -357,8 +360,11 @@ public: , storage_snapshot(storage_snapshot_) , data_part(std::move(data_part_)) , columns_to_read(std::move(columns_to_read_)) + , filtered_rows_count(std::move(filtered_rows_count_)) , apply_deleted_mask(apply_deleted_mask_) , filter(std::move(filter_)) + , read_with_direct_io(read_with_direct_io_) + , prefetch(prefetch_) , context(std::move(context_)) , log(log_) { @@ -401,24 +407,27 @@ public: data_part, columns_to_read, std::move(mark_ranges), - /*filtered_rows_count=*/ nullptr, + filtered_rows_count, apply_deleted_mask, - /*read_with_direct_io=*/ false, - /*prefetch=*/ false); + read_with_direct_io, + prefetch); pipeline.init(Pipe(std::move(source))); } private: - MergeTreeSequentialSourceType type; + const MergeTreeSequentialSourceType type; const MergeTreeData & storage; - StorageSnapshotPtr storage_snapshot; - MergeTreeData::DataPartPtr data_part; - Names columns_to_read; - bool apply_deleted_mask; - std::optional filter; - ContextPtr context; - LoggerPtr log; + const StorageSnapshotPtr storage_snapshot; + const MergeTreeData::DataPartPtr data_part; + const Names columns_to_read; + const std::shared_ptr> filtered_rows_count; + const bool apply_deleted_mask; + const std::optional filter; + const bool read_with_direct_io; + const bool prefetch; + const ContextPtr context; + const LoggerPtr log; }; void createReadFromPartStep( @@ -428,15 +437,27 @@ void createReadFromPartStep( const StorageSnapshotPtr & storage_snapshot, MergeTreeData::DataPartPtr data_part, Names columns_to_read, + std::shared_ptr> filtered_rows_count, bool apply_deleted_mask, std::optional filter, + bool read_with_direct_io, + bool prefetch, ContextPtr context, LoggerPtr log) { - auto reading = std::make_unique(type, - storage, storage_snapshot, std::move(data_part), - std::move(columns_to_read), apply_deleted_mask, - std::move(filter), std::move(context), log); + auto reading = std::make_unique( + type, + storage, + storage_snapshot, + std::move(data_part), + std::move(columns_to_read), + filtered_rows_count, + apply_deleted_mask, + std::move(filter), + read_with_direct_io, + prefetch, + std::move(context), + log); plan.addStep(std::move(reading)); } diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.h b/src/Storages/MergeTree/MergeTreeSequentialSource.h index 1b05512b9a3..543d1f60d10 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.h +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.h @@ -37,8 +37,11 @@ void createReadFromPartStep( const StorageSnapshotPtr & storage_snapshot, MergeTreeData::DataPartPtr data_part, Names columns_to_read, + std::shared_ptr> filtered_rows_count, bool apply_deleted_mask, std::optional filter, + bool read_with_direct_io, + bool prefetch, ContextPtr context, LoggerPtr log); From 13f4eb3fac6c2c0781351ee5db382383193b2af5 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Mon, 2 Sep 2024 22:24:53 +0200 Subject: [PATCH 02/14] Fix for graphite merge mode --- src/Storages/MergeTree/MergeTask.cpp | 25 ++++++++++++------------- src/Storages/MergeTree/MergeTask.h | 2 +- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 3bee2ecb0d9..fb5bbc4729c 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1224,7 +1224,8 @@ public: UInt64 merge_block_size_rows_, UInt64 merge_block_size_bytes_, bool blocks_are_granules_size_, - bool cleanup_) + bool cleanup_, + time_t time_of_merge_) : ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits? , sort_description(sort_description_) , partition_key_columns(partition_key_columns_) @@ -1234,6 +1235,7 @@ public: , merge_block_size_bytes(merge_block_size_bytes_) , blocks_are_granules_size(blocks_are_granules_size_) , cleanup(cleanup_) + , time_of_merge(time_of_merge_) {} String getName() const override { return "ApplyMergePolicy"; } @@ -1245,10 +1247,6 @@ public: /// that is going in insertion order. ProcessorPtr merged_transform; -// /// There is no sense to have the block size bigger than one granule for merge operations. -// const UInt64 merge_block_size_rows = data_settings->merge_max_block_size; -// const UInt64 merge_block_size_bytes = data_settings->merge_max_block_size_bytes; - const auto &header = pipeline.getHeader(); const auto input_streams_count = pipeline.getNumStreams(); @@ -1330,15 +1328,15 @@ public: } private: - SortDescription sort_description; - Names partition_key_columns; - MergeTreeData::MergingParams merging_params{}; + const SortDescription sort_description; + const Names partition_key_columns; + const MergeTreeData::MergingParams merging_params{}; WriteBuffer * rows_sources_write_buf; const UInt64 merge_block_size_rows; const UInt64 merge_block_size_bytes; - bool blocks_are_granules_size; - bool cleanup{false}; - time_t time_of_merge{0}; + const bool blocks_are_granules_size; + const bool cleanup{false}; + const time_t time_of_merge{0}; }; @@ -1403,7 +1401,7 @@ private: }; -void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() +void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const { /** Read from all parts, merge and write into a new one. * In passing, we calculate expression for sorting. @@ -1516,7 +1514,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() data_settings->merge_max_block_size, data_settings->merge_max_block_size_bytes, ctx->blocks_are_granules_size, - global_ctx->cleanup); + global_ctx->cleanup, + global_ctx->time_of_merge); merge_step->setStepDescription("Merge sorted parts"); merge_parts_query_plan.addStep(std::move(merge_step)); } diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index c80995888d4..a5d7851932c 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -291,7 +291,7 @@ private: bool executeMergeProjections(); MergeAlgorithm chooseMergeAlgorithm() const; - void createMergedStream(); + void createMergedStream() const; void extractMergingAndGatheringColumns() const; void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) override From 48cacd6f310c107c1dd0239a7639527adb054b69 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Mon, 2 Sep 2024 22:36:42 +0200 Subject: [PATCH 03/14] Use query plan for column vertical merges --- src/Storages/MergeTree/MergeTask.cpp | 175 ++++++++++++++++++++------- src/Storages/MergeTree/MergeTask.h | 5 +- 2 files changed, 133 insertions(+), 47 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index fb5bbc4729c..75fd61ae4be 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -804,35 +804,106 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const bool all_parts_on_remote_disks = std::ranges::all_of(global_ctx->future_part->parts, [](const auto & part) { return part->isStoredOnRemoteDisk(); }); ctx->use_prefetch = all_parts_on_remote_disks && global_ctx->data->getSettings()->vertical_merge_remote_filesystem_prefetch; - if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end()) - ctx->prepared_pipe = createPipeForReadingOneColumn(ctx->it_name_and_type->name); +// if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end()) +// ctx->prepared_pipe = createPipeForReadingOneColumn(ctx->it_name_and_type->name); return false; } -Pipe MergeTask::VerticalMergeStage::createPipeForReadingOneColumn(const String & column_name) const +QueryPlan MergeTask::VerticalMergeStage::createPlanForReadingOneColumn(const String & column_name) const { - Pipes pipes; - for (size_t part_num = 0; part_num < global_ctx->future_part->parts.size(); ++part_num) + /// Read from all parts + std::vector plans; + for (const auto & part : global_ctx->future_part->parts) { - Pipe pipe = createMergeTreeSequentialSource( + auto plan_for_part = std::make_unique(); + createReadFromPartStep( MergeTreeSequentialSourceType::Merge, + *plan_for_part, *global_ctx->data, global_ctx->storage_snapshot, - global_ctx->future_part->parts[part_num], + part, Names{column_name}, - /*mark_ranges=*/ {}, global_ctx->input_rows_filtered, /*apply_deleted_mask=*/ true, + std::nullopt, ctx->read_with_direct_io, - ctx->use_prefetch); + ctx->use_prefetch, + global_ctx->context, + getLogger("VerticalMergeStage")); - pipes.emplace_back(std::move(pipe)); + plans.emplace_back(std::move(plan_for_part)); } - return Pipe::unitePipes(std::move(pipes)); + QueryPlan merge_parts_query_plan; + + /// Union of all parts streams + { + DataStreams input_streams; + input_streams.reserve(plans.size()); + for (auto & plan : plans) + input_streams.emplace_back(plan->getCurrentDataStream()); + + auto union_step = std::make_unique(std::move(input_streams)); + merge_parts_query_plan.unitePlans(std::move(union_step), std::move(plans)); + } + + return merge_parts_query_plan; } +/// Gathers values from all parts for one column using rows sources temporary file +class ColumnGathererStep : public ITransformingStep +{ +public: + ColumnGathererStep( + const DataStream & input_stream_, + CompressedReadBufferFromFile * rows_sources_read_buf_, + UInt64 merge_block_size_rows_, + UInt64 merge_block_size_bytes_, + bool is_result_sparse_) + : ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits? + , rows_sources_read_buf(rows_sources_read_buf_) + , merge_block_size_rows(merge_block_size_rows_) + , merge_block_size_bytes(merge_block_size_bytes_) + , is_result_sparse(is_result_sparse_) + {} + + String getName() const override { return "ColumnGatherer"; } + + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override + { + const auto &header = pipeline.getHeader(); + const auto input_streams_count = pipeline.getNumStreams(); + + rows_sources_read_buf->seek(0, 0); + + auto transform = std::make_unique( + header, + input_streams_count, + *rows_sources_read_buf, + merge_block_size_rows, + merge_block_size_bytes, + is_result_sparse); + + pipeline.addTransform(std::move(transform)); + } + + void updateOutputStream() override + { + output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); + + /// TODO: is this correct? + output_stream->sort_scope = DataStream::SortScope::None; + } + +private: + MergeTreeData::MergingParams merging_params{}; + CompressedReadBufferFromFile * rows_sources_read_buf; + const UInt64 merge_block_size_rows; + const UInt64 merge_block_size_bytes; + const bool is_result_sparse; +}; + void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const { const auto & column_name = ctx->it_name_and_type->name; @@ -840,50 +911,64 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed); global_ctx->column_progress = std::make_unique(ctx->progress_before, ctx->column_sizes->columnWeight(column_name)); - Pipe pipe; - if (ctx->prepared_pipe) - { - pipe = std::move(*ctx->prepared_pipe); +// Pipe pipe; +//// if (ctx->prepared_pipe) +//// { +//// pipe = std::move(*ctx->prepared_pipe); +//// +//// auto next_column_it = std::next(ctx->it_name_and_type); +//// if (next_column_it != global_ctx->gathering_columns.end()) +//// ctx->prepared_pipe = createPipeForReadingOneColumn(next_column_it->name); +//// } +//// else +// { +// pipe = createPipeForReadingOneColumn(column_name); +// } - auto next_column_it = std::next(ctx->it_name_and_type); - if (next_column_it != global_ctx->gathering_columns.end()) - ctx->prepared_pipe = createPipeForReadingOneColumn(next_column_it->name); - } - else + auto merge_column_query_plan = createPlanForReadingOneColumn(column_name); + + /// Add column gatherer step { - pipe = createPipeForReadingOneColumn(column_name); +// ctx->rows_sources_read_buf->seek(0, 0); + bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE; + const auto data_settings = global_ctx->data->getSettings(); + auto merge_step = std::make_unique( + merge_column_query_plan.getCurrentDataStream(), + ctx->rows_sources_read_buf.get(), //global_ctx->rows_sources_temporary_file_name, + data_settings->merge_max_block_size, + data_settings->merge_max_block_size_bytes, + is_result_sparse); + merge_step->setStepDescription("Gather column"); + merge_column_query_plan.addStep(std::move(merge_step)); } - ctx->rows_sources_read_buf->seek(0, 0); - bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE; - - const auto data_settings = global_ctx->data->getSettings(); - auto transform = std::make_unique( - pipe.getHeader(), - pipe.numOutputPorts(), - *ctx->rows_sources_read_buf, - data_settings->merge_max_block_size, - data_settings->merge_max_block_size_bytes, - is_result_sparse); - - pipe.addTransform(std::move(transform)); - + /// Add expression step for indexes MergeTreeIndices indexes_to_recalc; - auto indexes_it = global_ctx->skip_indexes_by_column.find(column_name); - - if (indexes_it != global_ctx->skip_indexes_by_column.end()) + IndicesDescription indexes_to_recalc_description; { - indexes_to_recalc = MergeTreeIndexFactory::instance().getMany(indexes_it->second); + auto indexes_it = global_ctx->skip_indexes_by_column.find(column_name); - pipe.addTransform(std::make_shared( - pipe.getHeader(), - indexes_it->second.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), - global_ctx->data->getContext()))); + if (indexes_it != global_ctx->skip_indexes_by_column.end()) + { + indexes_to_recalc_description = indexes_it->second; + indexes_to_recalc = MergeTreeIndexFactory::instance().getMany(indexes_it->second); - pipe.addTransform(std::make_shared(pipe.getHeader())); + auto indices_expression_dag = indexes_it->second.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())->getActionsDAG().clone(); + auto calculate_indices_expression_step = std::make_unique( + merge_column_query_plan.getCurrentDataStream(), + std::move(indices_expression_dag)); + merge_column_query_plan.addStep(std::move(calculate_indices_expression_step)); + } } - ctx->column_parts_pipeline = QueryPipeline(std::move(pipe)); + { + auto pipelineSettings = BuildQueryPipelineSettings::fromContext(global_ctx->context); + auto builder = merge_column_query_plan.buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(global_ctx->context), + pipelineSettings); + + ctx->column_parts_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + } /// Dereference unique_ptr ctx->column_parts_pipeline.setProgressCallback(MergeProgressCallback( diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index a5d7851932c..b36f5f832d9 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -333,7 +333,8 @@ private: Float64 progress_before = 0; std::unique_ptr column_to{nullptr}; - std::optional prepared_pipe; +// TODO: is this really needed for prefetch? +// std::optional prepared_pipe; size_t max_delayed_streams = 0; bool use_prefetch = false; std::list> delayed_streams; @@ -378,7 +379,7 @@ private: bool executeVerticalMergeForOneColumn() const; void finalizeVerticalMergeForOneColumn() const; - Pipe createPipeForReadingOneColumn(const String & column_name) const; + QueryPlan createPlanForReadingOneColumn(const String & column_name) const; VerticalMergeRuntimeContextPtr ctx; GlobalRuntimeContextPtr global_ctx; From d28cba981ccd6a58939854a0204d654c6075337d Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 3 Sep 2024 08:59:01 +0200 Subject: [PATCH 04/14] Fix clang_tidy --- src/Storages/MergeTree/MergeTask.cpp | 6 +++--- src/Storages/MergeTree/MergeTask.h | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 75fd61ae4be..cafc11fc34d 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -195,7 +195,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu } } -bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() +bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const { ProfileEvents::increment(ProfileEvents::Merge); @@ -657,7 +657,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::constructTaskForProjectionPart } -bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() // NOLINT +bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() const { /// In case if there are no projections we didn't construct a task if (!ctx->merge_projection_parts_task_ptr) @@ -676,7 +676,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() // N return true; } -bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() +bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() const { Stopwatch watch(CLOCK_MONOTONIC_COARSE); UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds(); diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index b36f5f832d9..a30ab4712d5 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -268,12 +268,12 @@ private: { bool execute() override; - bool prepare(); - bool executeImpl(); + bool prepare() const; + bool executeImpl() const; void finalize() const; /// NOTE: Using pointer-to-member instead of std::function and lambda makes stacktraces much more concise and readable - using ExecuteAndFinalizeHorizontalPartSubtasks = std::array; + using ExecuteAndFinalizeHorizontalPartSubtasks = std::array; const ExecuteAndFinalizeHorizontalPartSubtasks subtasks { @@ -288,7 +288,7 @@ private: void calculateProjections(const Block & block) const; void finalizeProjections() const; void constructTaskForProjectionPartsMerge() const; - bool executeMergeProjections(); + bool executeMergeProjections() const; MergeAlgorithm chooseMergeAlgorithm() const; void createMergedStream() const; From 6a6935cb84a31493def51cf5d65954bec75f587e Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 3 Sep 2024 13:09:18 +0200 Subject: [PATCH 05/14] Cleanup --- src/Storages/MergeTree/MergeTask.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index cafc11fc34d..6f5b8301d4a 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1466,7 +1466,7 @@ public: subqueries_for_sets = transform->getSubqueries(); } - String getName() const override { return "Materializing"; } + String getName() const override { return "TTL"; } PreparedSets::Subqueries getSubqueries() { return std::move(subqueries_for_sets); } @@ -1524,9 +1524,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const std::vector plans; for (const auto & part : global_ctx->future_part->parts) { - /// TODO: this is just for debugging purposes, remove it later if (part->getMarksCount() == 0) - LOG_DEBUG(ctx->log, "Part {} is empty", part->name); + LOG_TRACE(ctx->log, "Part {} is empty", part->name); auto plan_for_part = std::make_unique(); createReadFromPartStep( @@ -1613,12 +1612,12 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const /// If deduplicate_by_columns is empty, add all columns except virtuals. if (global_ctx->deduplicate_by_columns.empty()) { - for (const auto & column_name : global_ctx->merging_columns.getNames()) + for (const auto & column : global_ctx->merging_columns) { - if (virtuals.tryGet(column_name, VirtualsKind::Persistent)) + if (virtuals.tryGet(column.name, VirtualsKind::Persistent)) continue; - global_ctx->deduplicate_by_columns.emplace_back(column_name); + global_ctx->deduplicate_by_columns.emplace_back(column.name); } } From a1cec53b7c2a6508277280bd8c36f90dfe661560 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 3 Sep 2024 14:54:05 +0200 Subject: [PATCH 06/14] Fix updateOutputStream and Traits --- src/Storages/MergeTree/MergeTask.cpp | 106 ++++++++++++++++++++------- 1 file changed, 78 insertions(+), 28 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 6f5b8301d4a..1bf1573fc1f 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -861,7 +861,7 @@ public: UInt64 merge_block_size_rows_, UInt64 merge_block_size_bytes_, bool is_result_sparse_) - : ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits? + : ITransformingStep(input_stream_, input_stream_.header, getTraits()) , rows_sources_read_buf(rows_sources_read_buf_) , merge_block_size_rows(merge_block_size_rows_) , merge_block_size_bytes(merge_block_size_bytes_) @@ -891,12 +891,24 @@ public: void updateOutputStream() override { output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); - - /// TODO: is this correct? - output_stream->sort_scope = DataStream::SortScope::None; } private: + static Traits getTraits() + { + return ITransformingStep::Traits + { + { + .returns_single_stream = true, + .preserves_number_of_streams = true, + .preserves_sorting = true, + }, + { + .preserves_number_of_rows = false, + } + }; + } + MergeTreeData::MergingParams merging_params{}; CompressedReadBufferFromFile * rows_sources_read_buf; const UInt64 merge_block_size_rows; @@ -962,10 +974,9 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const } { - auto pipelineSettings = BuildQueryPipelineSettings::fromContext(global_ctx->context); - auto builder = merge_column_query_plan.buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(global_ctx->context), - pipelineSettings); + auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context); + auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); + auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings); ctx->column_parts_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); } @@ -1297,7 +1308,7 @@ bool MergeTask::execute() /// Apply merge strategy (Ordinary, Colapsing, Aggregating, etc) to the stream -class ApplyMergeStep : public ITransformingStep /// TODO: is this transformation step? +class ApplyMergeStep : public ITransformingStep { public: ApplyMergeStep( @@ -1311,7 +1322,7 @@ public: bool blocks_are_granules_size_, bool cleanup_, time_t time_of_merge_) - : ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits? + : ITransformingStep(input_stream_, input_stream_.header, getTraits()) , sort_description(sort_description_) , partition_key_columns(partition_key_columns_) , merging_params(merging_params_) @@ -1403,16 +1414,24 @@ public: void updateOutputStream() override { output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); - output_stream->sort_description = sort_description; - - /// TODO: is this correct? -// if (partition_key_columns.empty()) - output_stream->sort_scope = DataStream::SortScope::Global; -// else -// output_stream->sort_scope = DataStream::SortScope::Stream; } private: + static Traits getTraits() + { + return ITransformingStep::Traits + { + { + .returns_single_stream = true, + .preserves_number_of_streams = true, + .preserves_sorting = true, + }, + { + .preserves_number_of_rows = false, + } + }; + } + const SortDescription sort_description; const Names partition_key_columns; const MergeTreeData::MergingParams merging_params{}; @@ -1425,12 +1444,12 @@ private: }; -class MaterializingStep : public ITransformingStep /// TODO: is this transformation step? +class MaterializingStep : public ITransformingStep { public: explicit MaterializingStep( const DataStream & input_stream_) - : ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits? + : ITransformingStep(input_stream_, input_stream_.header, getTraits()) {} String getName() const override { return "Materializing"; } @@ -1442,9 +1461,23 @@ public: void updateOutputStream() override { - /// TODO: can this be simplified? output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); - output_stream->sort_description = input_streams.front().sort_description; + } + +private: + static Traits getTraits() + { + return ITransformingStep::Traits + { + { + .returns_single_stream = true, + .preserves_number_of_streams = true, + .preserves_sorting = true, + }, + { + .preserves_number_of_rows = true, + } + }; } }; @@ -1460,7 +1493,7 @@ public: const MergeTreeData::MutableDataPartPtr & data_part_, time_t current_time, bool force_) - : ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits? + : ITransformingStep(input_stream_, input_stream_.header, getTraits()) { transform = std::make_shared(context_, input_stream_.header, storage_, metadata_snapshot_, data_part_, current_time, force_); subqueries_for_sets = transform->getSubqueries(); @@ -1477,10 +1510,25 @@ public: void updateOutputStream() override { - // TODO: implement? + output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); } private: + static Traits getTraits() + { + return ITransformingStep::Traits + { + { + .returns_single_stream = true, + .preserves_number_of_streams = true, + .preserves_sorting = true, + }, + { + .preserves_number_of_rows = false, + } + }; + } + std::shared_ptr transform; PreparedSets::Subqueries subqueries_for_sets; }; @@ -1658,12 +1706,14 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const if (!subqueries.empty()) addCreatingSetsStep(merge_parts_query_plan, std::move(subqueries), global_ctx->context); - auto pipelineSettings = BuildQueryPipelineSettings::fromContext(global_ctx->context); - auto builder = merge_parts_query_plan.buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(global_ctx->context), - pipelineSettings); + { + auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context); + auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); + auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings); + + global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + } - global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); /// Dereference unique_ptr and pass horizontal_stage_progress by reference global_ctx->merged_pipeline.setProgressCallback(MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->horizontal_stage_progress)); /// Is calculated inside MergeProgressCallback. From 8361724539408d95f9757e00047919d70ea50bbd Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 3 Sep 2024 17:02:25 +0200 Subject: [PATCH 07/14] Build pipeline for next column for prefetching --- src/Storages/MergeTree/MergeTask.cpp | 140 +++++++++++++-------------- src/Storages/MergeTree/MergeTask.h | 14 ++- 2 files changed, 81 insertions(+), 73 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 1bf1573fc1f..a4104672de7 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -804,53 +804,12 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const bool all_parts_on_remote_disks = std::ranges::all_of(global_ctx->future_part->parts, [](const auto & part) { return part->isStoredOnRemoteDisk(); }); ctx->use_prefetch = all_parts_on_remote_disks && global_ctx->data->getSettings()->vertical_merge_remote_filesystem_prefetch; -// if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end()) -// ctx->prepared_pipe = createPipeForReadingOneColumn(ctx->it_name_and_type->name); + if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end()) + ctx->prepared_pipeline = createPipelineForReadingOneColumn(ctx->it_name_and_type->name); return false; } -QueryPlan MergeTask::VerticalMergeStage::createPlanForReadingOneColumn(const String & column_name) const -{ - /// Read from all parts - std::vector plans; - for (const auto & part : global_ctx->future_part->parts) - { - auto plan_for_part = std::make_unique(); - createReadFromPartStep( - MergeTreeSequentialSourceType::Merge, - *plan_for_part, - *global_ctx->data, - global_ctx->storage_snapshot, - part, - Names{column_name}, - global_ctx->input_rows_filtered, - /*apply_deleted_mask=*/ true, - std::nullopt, - ctx->read_with_direct_io, - ctx->use_prefetch, - global_ctx->context, - getLogger("VerticalMergeStage")); - - plans.emplace_back(std::move(plan_for_part)); - } - - QueryPlan merge_parts_query_plan; - - /// Union of all parts streams - { - DataStreams input_streams; - input_streams.reserve(plans.size()); - for (auto & plan : plans) - input_streams.emplace_back(plan->getCurrentDataStream()); - - auto union_step = std::make_unique(std::move(input_streams)); - merge_parts_query_plan.unitePlans(std::move(union_step), std::move(plans)); - } - - return merge_parts_query_plan; -} - /// Gathers values from all parts for one column using rows sources temporary file class ColumnGathererStep : public ITransformingStep { @@ -916,32 +875,46 @@ private: const bool is_result_sparse; }; -void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const +MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::VerticalMergeStage::createPipelineForReadingOneColumn(const String & column_name) const { - const auto & column_name = ctx->it_name_and_type->name; + /// Read from all parts + std::vector plans; + for (const auto & part : global_ctx->future_part->parts) + { + auto plan_for_part = std::make_unique(); + createReadFromPartStep( + MergeTreeSequentialSourceType::Merge, + *plan_for_part, + *global_ctx->data, + global_ctx->storage_snapshot, + part, + Names{column_name}, + global_ctx->input_rows_filtered, + /*apply_deleted_mask=*/ true, + std::nullopt, + ctx->read_with_direct_io, + ctx->use_prefetch, + global_ctx->context, + getLogger("VerticalMergeStage")); - ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed); - global_ctx->column_progress = std::make_unique(ctx->progress_before, ctx->column_sizes->columnWeight(column_name)); + plans.emplace_back(std::move(plan_for_part)); + } -// Pipe pipe; -//// if (ctx->prepared_pipe) -//// { -//// pipe = std::move(*ctx->prepared_pipe); -//// -//// auto next_column_it = std::next(ctx->it_name_and_type); -//// if (next_column_it != global_ctx->gathering_columns.end()) -//// ctx->prepared_pipe = createPipeForReadingOneColumn(next_column_it->name); -//// } -//// else -// { -// pipe = createPipeForReadingOneColumn(column_name); -// } + QueryPlan merge_column_query_plan; - auto merge_column_query_plan = createPlanForReadingOneColumn(column_name); + /// Union of all parts streams + { + DataStreams input_streams; + input_streams.reserve(plans.size()); + for (auto & plan : plans) + input_streams.emplace_back(plan->getCurrentDataStream()); + + auto union_step = std::make_unique(std::move(input_streams)); + merge_column_query_plan.unitePlans(std::move(union_step), std::move(plans)); + } /// Add column gatherer step { -// ctx->rows_sources_read_buf->seek(0, 0); bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE; const auto data_settings = global_ctx->data->getSettings(); auto merge_step = std::make_unique( @@ -973,13 +946,36 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const } } - { - auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context); - auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); - auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings); + auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context); + auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); + auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings); - ctx->column_parts_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + return {QueryPipelineBuilder::getPipeline(std::move(*builder)), std::move(indexes_to_recalc)}; +} + +void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const +{ + const auto & column_name = ctx->it_name_and_type->name; + + ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed); + global_ctx->column_progress = std::make_unique(ctx->progress_before, ctx->column_sizes->columnWeight(column_name)); + + VerticalMergeRuntimeContext::PreparedColumnPipeline column_pipepline; + if (ctx->prepared_pipeline) + { + column_pipepline = std::move(*ctx->prepared_pipeline); + + /// Prepare next column pipeline to initiate prefetching + auto next_column_it = std::next(ctx->it_name_and_type); + if (next_column_it != global_ctx->gathering_columns.end()) + ctx->prepared_pipeline = createPipelineForReadingOneColumn(next_column_it->name); } + else + { + column_pipepline = createPipelineForReadingOneColumn(column_name); + } + + ctx->column_parts_pipeline = std::move(column_pipepline.pipeline); /// Dereference unique_ptr ctx->column_parts_pipeline.setProgressCallback(MergeProgressCallback( @@ -997,12 +993,16 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const global_ctx->metadata_snapshot, columns_list, ctx->compression_codec, - indexes_to_recalc, + column_pipepline.indexes_to_recalc, getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot), &global_ctx->written_offset_columns, global_ctx->to->getIndexGranularity()); ctx->column_elems_written = 0; + + /// rows_sources_read_buf is reused for each column so we need to rewind it explicitly each time + /// This sharing also prevents from from running multiple merge of individual columns in parallel. + ctx->rows_sources_read_buf->seek(0, 0); } @@ -1673,8 +1673,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const merge_parts_query_plan.getCurrentDataStream(), SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns, - false, - true /*TODO: ??*/); + false /*pre_distinct*/, + true /*optimize_distinct_in_order TODO: looks like it shoud be enabled*/); deduplication_step->setStepDescription("Deduplication step"); merge_parts_query_plan.addStep(std::move(deduplication_step)); } diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index a30ab4712d5..bbe53c34c7e 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -333,8 +333,16 @@ private: Float64 progress_before = 0; std::unique_ptr column_to{nullptr}; -// TODO: is this really needed for prefetch? -// std::optional prepared_pipe; + + /// Used for prefetching. Right before starting merge of a column we create a pipeline for the next column + /// and it initiates prefetching of the first range of that column. + struct PreparedColumnPipeline + { + QueryPipeline pipeline; + MergeTreeIndices indexes_to_recalc; + }; + + std::optional prepared_pipeline; size_t max_delayed_streams = 0; bool use_prefetch = false; std::list> delayed_streams; @@ -379,7 +387,7 @@ private: bool executeVerticalMergeForOneColumn() const; void finalizeVerticalMergeForOneColumn() const; - QueryPlan createPlanForReadingOneColumn(const String & column_name) const; + VerticalMergeRuntimeContext::PreparedColumnPipeline createPipelineForReadingOneColumn(const String & column_name) const; VerticalMergeRuntimeContextPtr ctx; GlobalRuntimeContextPtr global_ctx; From 472e6eb856e338332fbebb2519066f093c18a15f Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 3 Sep 2024 17:16:43 +0200 Subject: [PATCH 08/14] typo --- src/Storages/MergeTree/MergeTask.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index a4104672de7..576ea341877 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1674,7 +1674,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns, false /*pre_distinct*/, - true /*optimize_distinct_in_order TODO: looks like it shoud be enabled*/); + true /*optimize_distinct_in_order TODO: looks like it should be enabled*/); deduplication_step->setStepDescription("Deduplication step"); merge_parts_query_plan.addStep(std::move(deduplication_step)); } From 20eaecc4f39adf73ac402c88d4a54d70f859453c Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Thu, 5 Sep 2024 13:50:26 +0200 Subject: [PATCH 09/14] Fix build --- src/Storages/MergeTree/MergeTask.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index e6d7b4656c9..398a9472456 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1585,8 +1585,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const std::vector plans; for (size_t i = 0; i < global_ctx->future_part->parts.size(); ++i) { - if (part->getMarksCount() == 0) - LOG_TRACE(ctx->log, "Part {} is empty", part->name); + if (global_ctx->future_part->parts[i]->getMarksCount() == 0) + LOG_TRACE(ctx->log, "Part {} is empty", global_ctx->future_part->parts[i]->name); auto plan_for_part = std::make_unique(); createReadFromPartStep( From 1bcc4ba823805bed282133fb7035b73598641fc6 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Mon, 9 Sep 2024 15:30:19 +0200 Subject: [PATCH 10/14] Renamed ApplyMergeStep into MergePartsStep --- src/Storages/MergeTree/MergeTask.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 398a9472456..3ca909a2d09 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1321,10 +1321,10 @@ bool MergeTask::execute() /// Apply merge strategy (Ordinary, Colapsing, Aggregating, etc) to the stream -class ApplyMergeStep : public ITransformingStep +class MergePartsStep : public ITransformingStep { public: - ApplyMergeStep( + MergePartsStep( const DataStream & input_stream_, const SortDescription & sort_description_, const Names partition_key_columns_, @@ -1347,7 +1347,7 @@ public: , time_of_merge(time_of_merge_) {} - String getName() const override { return "ApplyMergePolicy"; } + String getName() const override { return "MergeParts"; } void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override { @@ -1651,7 +1651,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup) throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed"); - auto merge_step = std::make_unique( + auto merge_step = std::make_unique( merge_parts_query_plan.getCurrentDataStream(), sort_description, partition_key_columns, From 8c1f434b1ac2c9fbb83561a43a6ee10f20d81974 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Mon, 9 Sep 2024 15:31:43 +0200 Subject: [PATCH 11/14] Do column materialization using ActionsDAG::addMaterializingOutputActions instead of a special step --- src/Storages/MergeTree/MergeTask.cpp | 42 +--------------------------- 1 file changed, 1 insertion(+), 41 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 3ca909a2d09..33cdff10b6a 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1456,45 +1456,6 @@ private: const time_t time_of_merge{0}; }; - -class MaterializingStep : public ITransformingStep -{ -public: - explicit MaterializingStep( - const DataStream & input_stream_) - : ITransformingStep(input_stream_, input_stream_.header, getTraits()) - {} - - String getName() const override { return "Materializing"; } - - void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override - { - pipeline.addTransform(std::make_shared(input_streams.front().header)); - } - - void updateOutputStream() override - { - output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits()); - } - -private: - static Traits getTraits() - { - return ITransformingStep::Traits - { - { - .returns_single_stream = true, - .preserves_number_of_streams = true, - .preserves_sorting = true, - }, - { - .preserves_number_of_rows = true, - } - }; - } -}; - - class TTLStep : public ITransformingStep { public: @@ -1709,12 +1670,11 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const if (!global_ctx->merging_skip_indexes.empty()) { auto indices_expression_dag = global_ctx->merging_skip_indexes.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())->getActionsDAG().clone(); + indices_expression_dag.addMaterializingOutputActions(); /// Const columns cannot be written without materialization. auto calculate_indices_expression_step = std::make_unique( merge_parts_query_plan.getCurrentDataStream(), std::move(indices_expression_dag)); merge_parts_query_plan.addStep(std::move(calculate_indices_expression_step)); - /// TODO: what is the purpose of MaterializingTransform in the original code? - merge_parts_query_plan.addStep(std::make_unique(merge_parts_query_plan.getCurrentDataStream())); } if (!subqueries.empty()) From 4da1e10ac66059b47a89e69327ea79d487e2847f Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Mon, 9 Sep 2024 16:01:00 +0200 Subject: [PATCH 12/14] Move sorting key calculation step outside the loop --- src/Storages/MergeTree/MergeTask.cpp | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 33cdff10b6a..9a1e749734c 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1540,8 +1540,6 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const global_ctx->horizontal_stage_progress = std::make_unique( ctx->column_sizes ? ctx->column_sizes->keyColumnsWeight() : 1.0); - auto sorting_key_expression_dag = global_ctx->metadata_snapshot->getSortingKey().expression->getActionsDAG().clone(); - /// Read from all parts std::vector plans; for (size_t i = 0; i < global_ctx->future_part->parts.size(); ++i) @@ -1566,15 +1564,6 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const global_ctx->context, ctx->log); - if (global_ctx->metadata_snapshot->hasSortingKey()) - { - /// Calculate sorting key expressions so that they are available for merge sorting. - auto calculate_sorting_key_expression_step = std::make_unique( - plan_for_part->getCurrentDataStream(), - sorting_key_expression_dag.clone()); /// TODO: can we avoid cloning here? - plan_for_part->addStep(std::move(calculate_sorting_key_expression_step)); - } - plans.emplace_back(std::move(plan_for_part)); } @@ -1591,6 +1580,16 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const merge_parts_query_plan.unitePlans(std::move(union_step), std::move(plans)); } + if (global_ctx->metadata_snapshot->hasSortingKey()) + { + /// Calculate sorting key expressions so that they are available for merge sorting. + auto sorting_key_expression_dag = global_ctx->metadata_snapshot->getSortingKey().expression->getActionsDAG().clone(); + auto calculate_sorting_key_expression_step = std::make_unique( + merge_parts_query_plan.getCurrentDataStream(), + std::move(sorting_key_expression_dag)); + merge_parts_query_plan.addStep(std::move(calculate_sorting_key_expression_step)); + } + /// Merge { Names sort_columns = global_ctx->metadata_snapshot->getSortingKeyColumns(); From 2f15fcd23fb069a69d2f1a0caee36ae33f39fe45 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 10 Sep 2024 20:57:03 +0200 Subject: [PATCH 13/14] Test with sparse serialization, vertical merge and skip indices --- .../03175_sparse_and_skip_index.reference | 4 ++ .../03175_sparse_and_skip_index.sql | 45 +++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 tests/queries/0_stateless/03175_sparse_and_skip_index.reference create mode 100644 tests/queries/0_stateless/03175_sparse_and_skip_index.sql diff --git a/tests/queries/0_stateless/03175_sparse_and_skip_index.reference b/tests/queries/0_stateless/03175_sparse_and_skip_index.reference new file mode 100644 index 00000000000..619e98a152a --- /dev/null +++ b/tests/queries/0_stateless/03175_sparse_and_skip_index.reference @@ -0,0 +1,4 @@ +key Sparse +value Sparse +1000 +1 diff --git a/tests/queries/0_stateless/03175_sparse_and_skip_index.sql b/tests/queries/0_stateless/03175_sparse_and_skip_index.sql new file mode 100644 index 00000000000..4de6d1ac6df --- /dev/null +++ b/tests/queries/0_stateless/03175_sparse_and_skip_index.sql @@ -0,0 +1,45 @@ +DROP TABLE IF EXISTS t_bloom_filter; +CREATE TABLE t_bloom_filter( + key UInt64, + value UInt64, + + INDEX key_bf key TYPE bloom_filter(0.01) GRANULARITY 2147483648, -- bloom filter on sorting key column + INDEX value_bf value TYPE bloom_filter(0.01) GRANULARITY 2147483648 -- bloom filter on no-sorting column +) ENGINE=MergeTree ORDER BY key +SETTINGS + -- settings to trigger sparse serialization and vertical merge + ratio_of_defaults_for_sparse_serialization = 0.0 + ,vertical_merge_algorithm_min_rows_to_activate = 1 + ,vertical_merge_algorithm_min_columns_to_activate = 1 + ,allow_vertical_merges_from_compact_to_wide_parts = 1 + ,min_bytes_for_wide_part=0 +; + +SYSTEM STOP MERGES t_bloom_filter; + +-- Create at least one part +INSERT INTO t_bloom_filter +SELECT + number % 100 as key, -- 100 unique keys + rand() % 100 as value -- 100 unique values +FROM numbers(50_000); + +-- And another part +INSERT INTO t_bloom_filter +SELECT + number % 100 as key, -- 100 unique keys + rand() % 100 as value -- 100 unique values +FROM numbers(50_000, 50_000); + +SYSTEM START MERGES t_bloom_filter; + +-- Merge everything into a single part +OPTIMIZE TABLE t_bloom_filter FINAL; + +-- Check sparse serialization +SELECT column, serialization_kind FROM system.parts_columns WHERE database = currentDatabase() AND table = 't_bloom_filter' AND active ORDER BY column; + +SELECT COUNT() FROM t_bloom_filter WHERE key = 1; + +-- Check bloom filter non-zero size +SELECT COUNT() FROM system.parts WHERE database = currentDatabase() AND table = 't_bloom_filter' AND secondary_indices_uncompressed_bytes > 200 AND active; From d4aa06524b4e65e2bca4ff851ad54dd4468e5cc0 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 10 Sep 2024 20:57:55 +0200 Subject: [PATCH 14/14] Add materialization when building indices in vertical merge --- src/Storages/MergeTree/MergeTask.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 9a1e749734c..5c993504245 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -952,6 +952,7 @@ MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::Vertic indexes_to_recalc = MergeTreeIndexFactory::instance().getMany(indexes_it->second); auto indices_expression_dag = indexes_it->second.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())->getActionsDAG().clone(); + indices_expression_dag.addMaterializingOutputActions(); /// Const columns cannot be written without materialization. auto calculate_indices_expression_step = std::make_unique( merge_column_query_plan.getCurrentDataStream(), std::move(indices_expression_dag));