diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 712835ede1c..5e30cafdee4 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1015,6 +1015,7 @@ struct MutationContext FutureMergedMutatedPartPtr future_part; MergeTreeData::DataPartPtr source_part; + MergeTreeData::DataPartPtr projection_part; StorageMetadataPtr metadata_snapshot; MutationCommandsConstPtr commands; @@ -1027,13 +1028,16 @@ struct MutationContext std::unique_ptr num_mutations; QueryPipelineBuilder mutating_pipeline_builder; - /// source here is projection part instead of table part - QueryPipelineBuilder projection_mutating_pipeline_builder; QueryPipeline mutating_pipeline; // in std::unique_ptr mutating_executor; ProgressCallback progress_callback; Block updated_header; + /// source here is projection part instead of table part + QueryPipelineBuilder projection_mutating_pipeline_builder; + QueryPipeline projection_mutating_pipeline; + std::unique_ptr projection_mutating_executor; + std::unique_ptr interpreter; UInt64 watch_prev_elapsed = 0; std::unique_ptr stage_progress; @@ -1050,6 +1054,9 @@ struct MutationContext MergeTreeData::MutableDataPartPtr new_data_part; IMergedBlockOutputStreamPtr out; + MergeTreeData::MutableDataPartPtr new_projection_part; + IMergedBlockOutputStreamPtr projection_out; + String mrk_extension; /// Used in lightweight delete to bit mask the projection if possible, @@ -1886,6 +1893,26 @@ private: ctx->projections_to_build = std::vector{ctx->projections_to_recalc.begin(), ctx->projections_to_recalc.end()}; + chassert (ctx->projection_mutating_pipeline_builder.initialized()); + builder = std::make_unique(std::move(ctx->projection_mutating_pipeline_builder)); + + // ctx->projection_out = std::make_shared( + // ctx->new_projection_part, + // ctx->metadata_snapshot, + // ctx->updated_header.getNamesAndTypesList(), + // ctx->compression_codec, + // std::vector(), + // std::vector(), + // nullptr, + // /*save_marks_in_cache=*/ false, + // ctx->projection_part->index_granularity, + // &ctx->projection_part->index_granularity_info + // ); + + ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + ctx->projection_mutating_pipeline.disableProfileEventUpdate(); + ctx->projection_mutating_executor = std::make_unique(ctx->projection_mutating_pipeline); + part_merger_writer_task = std::make_unique(ctx); } } @@ -2288,11 +2315,23 @@ bool MutateTask::prepare() const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); - MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second; + ctx->projection_part = projections_name_and_part.begin()->second; + + MutationCommands projection_commands; + + MutationHelpers::splitAndModifyMutationCommands( + ctx->projection_part, + proj_desc.metadata, + alter_conversions, + ctx->commands_for_part, + projection_commands, + ctx->for_file_renames, + false, + ctx->log); auto projection_interpreter = std::make_unique( - *ctx->data, projection_part, alter_conversions, - proj_desc.metadata, ctx->for_interpreter, + *ctx->data, ctx->projection_part, alter_conversions, + proj_desc.metadata, projection_commands, proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings); ctx->projection_mutating_pipeline_builder = projection_interpreter->execute(); diff --git a/tests/queries/0_stateless/03261_projection_mask.reference b/tests/queries/0_stateless/03261_projection_mask.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03261_projection_mask.sql b/tests/queries/0_stateless/03261_projection_mask.sql new file mode 100644 index 00000000000..aeba53700dc --- /dev/null +++ b/tests/queries/0_stateless/03261_projection_mask.sql @@ -0,0 +1,29 @@ +-- compact +DROP TABLE IF EXISTS users; + +CREATE TABLE users ( + uid Int16, + name String, + age Int16, + projection p1 (select age, count() group by age), +) ENGINE = MergeTree order by uid +SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 10485760; + +INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34); + +DELETE FROM users WHERE age = 34; + +-- wide +DROP TABLE IF EXISTS users; + +CREATE TABLE users ( + uid Int16, + name String, + age Int16, + projection p1 (select age, count() group by age), +) ENGINE = MergeTree order by uid +SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 0; + +INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34); + +DELETE FROM users WHERE age = 34;