From 5231d00616e36fa7a85374be74f5d8d6b06870e1 Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Wed, 20 Nov 2024 03:11:36 +0000 Subject: [PATCH] remove unused and add compact projout --- src/Interpreters/MutationsInterpreter.cpp | 8 ------ src/Interpreters/MutationsInterpreter.h | 2 -- .../MergeTree/MergeTreeDataWriter.cpp | 3 +- src/Storages/MergeTree/MutateTask.cpp | 28 +++++++++++++++++++ 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 7e075c99521..0f25d5ac21c 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -373,14 +373,6 @@ bool MutationsInterpreter::Source::isCompactPart() const return part && part->getType() == MergeTreeDataPartType::Compact; } -const NamesAndTypesList & MutationsInterpreter::Source::getColumns() const -{ - if (!part) - throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter source part is nullptr. It is a bug."); - - return part->getColumns(); -} - static Names getAvailableColumnsWithVirtuals(StorageMetadataPtr metadata_snapshot, const IStorage & storage) { auto all_columns = metadata_snapshot->getColumns().getNamesOfPhysical(); diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 145774579da..901cd13cd2f 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -129,8 +129,6 @@ public: bool hasBrokenProjection(const String & name) const; bool isCompactPart() const; - const NamesAndTypesList & getColumns() const; - void read( Stage & first_stage, QueryPlan & plan, diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index ac29a9244b0..5d1ce7bd021 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -749,12 +749,11 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl( TemporaryPart temp_part; const auto & metadata_snapshot = projection.metadata; - MergeTreeDataPartType part_type; /// Size of part would not be greater than block.bytes() + epsilon size_t expected_size = block.bytes(); // just check if there is enough space on parent volume MergeTreeData::reserveSpace(expected_size, parent_part->getDataPartStorage()); - part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type; + MergeTreeDataPartType part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type; auto new_data_part = parent_part->getProjectionPartBuilder(part_name, is_temp).withPartType(part_type).build(); auto projection_part_storage = new_data_part->getDataPartStoragePtr(); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 5e30cafdee4..f5cbf0bbc30 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1017,6 +1017,7 @@ struct MutationContext MergeTreeData::DataPartPtr source_part; MergeTreeData::DataPartPtr projection_part; StorageMetadataPtr metadata_snapshot; + StorageMetadataPtr projection_metadata_snapshot; MutationCommandsConstPtr commands; time_t time_of_mutation; @@ -1662,6 +1663,30 @@ private: ctx->mutating_pipeline.disableProfileEventUpdate(); ctx->mutating_executor = std::make_unique(ctx->mutating_pipeline); + chassert (ctx->projection_mutating_pipeline_builder.initialized()); + builder = std::make_unique(std::move(ctx->projection_mutating_pipeline_builder)); + + const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); + auto part_name = fmt::format("{}", projections_name_and_part.begin()->first); + auto new_data_part = ctx->new_data_part->getProjectionPartBuilder(part_name, true).withPartType(ctx->projection_part->getType()).build(); + + ctx->projection_out = std::make_shared( + new_data_part, + ctx->projection_metadata_snapshot, + ctx->projection_metadata_snapshot->getColumns().getAllPhysical(), + MergeTreeIndices{}, + ColumnsStatistics{}, + ctx->data->getContext()->chooseCompressionCodec(0, 0), + Tx::PrehistoricTID, + /*reset_columns=*/ false, + /*save_marks_in_cache=*/ false, + /*blocks_are_granules_size=*/ false, + ctx->data->getContext()->getWriteSettings()); + + ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + ctx->projection_mutating_pipeline.disableProfileEventUpdate(); + ctx->projection_mutating_executor = std::make_unique(ctx->projection_mutating_pipeline); + part_merger_writer_task = std::make_unique(ctx); } @@ -2314,6 +2339,7 @@ bool MutateTask::prepare() ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress); const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); + ctx->projection_metadata_snapshot = proj_desc.metadata; const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); ctx->projection_part = projections_name_and_part.begin()->second; @@ -2329,6 +2355,8 @@ bool MutateTask::prepare() false, ctx->log); + chassert(!projection_commands.empty()); + auto projection_interpreter = std::make_unique( *ctx->data, ctx->projection_part, alter_conversions, proj_desc.metadata, projection_commands,