diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 5e78ab49010..f3bd0fb651c 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -756,12 +756,11 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl( TemporaryPart temp_part; const auto & metadata_snapshot = projection.metadata; - MergeTreeDataPartType part_type; /// Size of part would not be greater than block.bytes() + epsilon size_t expected_size = block.bytes(); // just check if there is enough space on parent volume MergeTreeData::reserveSpace(expected_size, parent_part->getDataPartStorage()); - part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type; + MergeTreeDataPartType part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type; auto new_data_part = parent_part->getProjectionPartBuilder(part_name, is_temp).withPartType(part_type).build(); auto projection_part_storage = new_data_part->getDataPartStoragePtr(); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 0510f51b4ff..72bf733073a 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1008,7 +1008,6 @@ void finalizeMutatedPart( new_data_part->default_codec = codec; } - } struct MutationContext @@ -1023,7 +1022,9 @@ struct MutationContext FutureMergedMutatedPartPtr future_part; MergeTreeData::DataPartPtr source_part; + MergeTreeData::DataPartPtr projection_part; StorageMetadataPtr metadata_snapshot; + StorageMetadataPtr projection_metadata_snapshot; MutationCommandsConstPtr commands; time_t time_of_mutation; @@ -1040,6 +1041,11 @@ struct MutationContext ProgressCallback progress_callback; Block updated_header; + /// source here is projection part instead of table part + QueryPipelineBuilder projection_mutating_pipeline_builder; + QueryPipeline projection_mutating_pipeline; + std::unique_ptr projection_mutating_executor; + std::unique_ptr interpreter; UInt64 watch_prev_elapsed = 0; std::unique_ptr stage_progress; @@ -1056,8 +1062,14 @@ struct MutationContext MergeTreeData::MutableDataPartPtr new_data_part; IMergedBlockOutputStreamPtr out; + MergeTreeData::MutableDataPartPtr new_projection_part; + IMergedBlockOutputStreamPtr projection_out; + String mrk_extension; + /// Used in lightweight delete to bit mask the projection if possible, + /// preference is higher than rebuild as more performant. + std::vector projections_to_mask; std::vector projections_to_build; IMergeTreeDataPart::MinMaxIndexPtr minmax_idx; @@ -1151,6 +1163,14 @@ public: if (iterateThroughAllProjections()) return true; + state = State::NEED_MASK_PROJECTION_PARTS; + return true; + } + case State::NEED_MASK_PROJECTION_PARTS: + { + if (iterateThroughAllProjectionsToMask()) + return true; + state = State::SUCCESS; return true; } @@ -1170,6 +1190,7 @@ private: void finalizeTempProjections(); bool iterateThroughAllProjections(); void constructTaskForProjectionPartsMerge(); + bool iterateThroughAllProjectionsToMask(); void finalize(); enum class State : uint8_t @@ -1177,7 +1198,7 @@ private: NEED_PREPARE, NEED_MUTATE_ORIGINAL_PART, NEED_MERGE_PROJECTION_PARTS, - + NEED_MASK_PROJECTION_PARTS, SUCCESS }; @@ -1343,6 +1364,14 @@ bool PartMergerWriter::iterateThroughAllProjections() return true; } +bool PartMergerWriter::iterateThroughAllProjectionsToMask() +{ + for (size_t i = 0, size = ctx->projections_to_mask.size(); i < size; ++i) + { + } + return false; +} + void PartMergerWriter::finalize() { if (ctx->count_lightweight_deleted_rows) @@ -1660,6 +1689,30 @@ private: ctx->mutating_pipeline.disableProfileEventUpdate(); ctx->mutating_executor = std::make_unique(ctx->mutating_pipeline); + chassert (ctx->projection_mutating_pipeline_builder.initialized()); + builder = std::make_unique(std::move(ctx->projection_mutating_pipeline_builder)); + + const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); + auto part_name = fmt::format("{}", projections_name_and_part.begin()->first); + auto new_data_part = ctx->new_data_part->getProjectionPartBuilder(part_name, true).withPartType(ctx->projection_part->getType()).build(); + + ctx->projection_out = std::make_shared( + new_data_part, + ctx->projection_metadata_snapshot, + ctx->projection_metadata_snapshot->getColumns().getAllPhysical(), + MergeTreeIndices{}, + ColumnsStatistics{}, + ctx->data->getContext()->chooseCompressionCodec(0, 0), + Tx::PrehistoricTID, + /*reset_columns=*/ false, + /*save_marks_in_cache=*/ false, + /*blocks_are_granules_size=*/ false, + ctx->data->getContext()->getWriteSettings()); + + ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + ctx->projection_mutating_pipeline.disableProfileEventUpdate(); + ctx->projection_mutating_executor = std::make_unique(ctx->projection_mutating_pipeline); + part_merger_writer_task = std::make_unique(ctx); } @@ -1895,6 +1948,26 @@ private: ctx->projections_to_build = std::vector{ctx->projections_to_recalc.begin(), ctx->projections_to_recalc.end()}; + chassert (ctx->projection_mutating_pipeline_builder.initialized()); + builder = std::make_unique(std::move(ctx->projection_mutating_pipeline_builder)); + + // ctx->projection_out = std::make_shared( + // ctx->new_projection_part, + // ctx->metadata_snapshot, + // ctx->updated_header.getNamesAndTypesList(), + // ctx->compression_codec, + // std::vector(), + // std::vector(), + // nullptr, + // /*save_marks_in_cache=*/ false, + // ctx->projection_part->index_granularity, + // &ctx->projection_part->index_granularity_info + // ); + + ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + ctx->projection_mutating_pipeline.disableProfileEventUpdate(); + ctx->projection_mutating_executor = std::make_unique(ctx->projection_mutating_pipeline); + part_merger_writer_task = std::make_unique(ctx); } } @@ -2308,6 +2381,32 @@ bool MutateTask::prepare() ctx->updated_header = ctx->interpreter->getUpdatedHeader(); ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress); + const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); + ctx->projection_metadata_snapshot = proj_desc.metadata; + const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); + ctx->projection_part = projections_name_and_part.begin()->second; + + MutationCommands projection_commands; + + MutationHelpers::splitAndModifyMutationCommands( + ctx->projection_part, + proj_desc.metadata, + alter_conversions, + ctx->commands_for_part, + projection_commands, + ctx->for_file_renames, + false, + ctx->log); + + chassert(!projection_commands.empty()); + + auto projection_interpreter = std::make_unique( + *ctx->data, ctx->projection_part, alter_conversions, + proj_desc.metadata, projection_commands, + proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings); + + ctx->projection_mutating_pipeline_builder = projection_interpreter->execute(); + lightweight_delete_mode = ctx->updated_header.has(RowExistsColumn::name); /// If under the condition of lightweight delete mode with rebuild option, add projections again here as we can only know /// the condition as early as from here. diff --git a/src/Storages/ProjectionsDescription.cpp b/src/Storages/ProjectionsDescription.cpp index 26c3238c940..3a4d8637a51 100644 --- a/src/Storages/ProjectionsDescription.cpp +++ b/src/Storages/ProjectionsDescription.cpp @@ -334,6 +334,17 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context) return ret; } +void ProjectionDescription::mask(const Block & block[[maybe_unused]], ContextPtr context[[maybe_unused]]) const +{ + // auto mut_context = Context::createCopy(context); + // query_ast_copy = query_ast->clone(); + // auto builder = InterpreterAlterQuery(query_ast_copy, mut_context, + // Pipe(std::make_shared(block))) + // .buildQueryPipeline(); + // auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); + // PullingPipelineExecutor executor(pipeline); + // executor.pull(ret); +} String ProjectionsDescription::toString() const { diff --git a/src/Storages/ProjectionsDescription.h b/src/Storages/ProjectionsDescription.h index 445a4828e31..f4068fdd403 100644 --- a/src/Storages/ProjectionsDescription.h +++ b/src/Storages/ProjectionsDescription.h @@ -93,6 +93,8 @@ struct ProjectionDescription Block calculate(const Block & block, ContextPtr context) const; + void mask(const Block & block, ContextPtr context) const; + String getDirectoryName() const { return name + ".proj"; } }; diff --git a/tests/queries/0_stateless/03261_projection_mask.reference b/tests/queries/0_stateless/03261_projection_mask.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03261_projection_mask.sql b/tests/queries/0_stateless/03261_projection_mask.sql new file mode 100644 index 00000000000..aeba53700dc --- /dev/null +++ b/tests/queries/0_stateless/03261_projection_mask.sql @@ -0,0 +1,29 @@ +-- compact +DROP TABLE IF EXISTS users; + +CREATE TABLE users ( + uid Int16, + name String, + age Int16, + projection p1 (select age, count() group by age), +) ENGINE = MergeTree order by uid +SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 10485760; + +INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34); + +DELETE FROM users WHERE age = 34; + +-- wide +DROP TABLE IF EXISTS users; + +CREATE TABLE users ( + uid Int16, + name String, + age Int16, + projection p1 (select age, count() group by age), +) ENGINE = MergeTree order by uid +SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 0; + +INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34); + +DELETE FROM users WHERE age = 34;