This commit is contained in:
jsc0218 2024-11-19 20:44:19 +00:00
parent 0ee0ab9132
commit 190651cf23
3 changed files with 73 additions and 5 deletions

View File

@ -1015,6 +1015,7 @@ struct MutationContext
FutureMergedMutatedPartPtr future_part;
MergeTreeData::DataPartPtr source_part;
MergeTreeData::DataPartPtr projection_part;
StorageMetadataPtr metadata_snapshot;
MutationCommandsConstPtr commands;
@ -1027,13 +1028,16 @@ struct MutationContext
std::unique_ptr<CurrentMetrics::Increment> num_mutations;
QueryPipelineBuilder mutating_pipeline_builder;
/// source here is projection part instead of table part
QueryPipelineBuilder projection_mutating_pipeline_builder;
QueryPipeline mutating_pipeline; // in
std::unique_ptr<PullingPipelineExecutor> mutating_executor;
ProgressCallback progress_callback;
Block updated_header;
/// source here is projection part instead of table part
QueryPipelineBuilder projection_mutating_pipeline_builder;
QueryPipeline projection_mutating_pipeline;
std::unique_ptr<PullingPipelineExecutor> projection_mutating_executor;
std::unique_ptr<MutationsInterpreter> interpreter;
UInt64 watch_prev_elapsed = 0;
std::unique_ptr<MergeStageProgress> stage_progress;
@ -1050,6 +1054,9 @@ struct MutationContext
MergeTreeData::MutableDataPartPtr new_data_part;
IMergedBlockOutputStreamPtr out;
MergeTreeData::MutableDataPartPtr new_projection_part;
IMergedBlockOutputStreamPtr projection_out;
String mrk_extension;
/// Used in lightweight delete to bit mask the projection if possible,
@ -1886,6 +1893,26 @@ private:
ctx->projections_to_build = std::vector<ProjectionDescriptionRawPtr>{ctx->projections_to_recalc.begin(), ctx->projections_to_recalc.end()};
chassert (ctx->projection_mutating_pipeline_builder.initialized());
builder = std::make_unique<QueryPipelineBuilder>(std::move(ctx->projection_mutating_pipeline_builder));
// ctx->projection_out = std::make_shared<MergedColumnOnlyOutputStream>(
// ctx->new_projection_part,
// ctx->metadata_snapshot,
// ctx->updated_header.getNamesAndTypesList(),
// ctx->compression_codec,
// std::vector<MergeTreeIndexPtr>(),
// std::vector<ColumnStatisticsPartPtr>(),
// nullptr,
// /*save_marks_in_cache=*/ false,
// ctx->projection_part->index_granularity,
// &ctx->projection_part->index_granularity_info
// );
ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
ctx->projection_mutating_pipeline.disableProfileEventUpdate();
ctx->projection_mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->projection_mutating_pipeline);
part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx);
}
}
@ -2288,11 +2315,23 @@ bool MutateTask::prepare()
const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin());
const auto & projections_name_and_part = ctx->source_part->getProjectionParts();
MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second;
ctx->projection_part = projections_name_and_part.begin()->second;
MutationCommands projection_commands;
MutationHelpers::splitAndModifyMutationCommands(
ctx->projection_part,
proj_desc.metadata,
alter_conversions,
ctx->commands_for_part,
projection_commands,
ctx->for_file_renames,
false,
ctx->log);
auto projection_interpreter = std::make_unique<MutationsInterpreter>(
*ctx->data, projection_part, alter_conversions,
proj_desc.metadata, ctx->for_interpreter,
*ctx->data, ctx->projection_part, alter_conversions,
proj_desc.metadata, projection_commands,
proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings);
ctx->projection_mutating_pipeline_builder = projection_interpreter->execute();

View File

@ -0,0 +1,29 @@
-- compact
DROP TABLE IF EXISTS users;
CREATE TABLE users (
uid Int16,
name String,
age Int16,
projection p1 (select age, count() group by age),
) ENGINE = MergeTree order by uid
SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 10485760;
INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34);
DELETE FROM users WHERE age = 34;
-- wide
DROP TABLE IF EXISTS users;
CREATE TABLE users (
uid Int16,
name String,
age Int16,
projection p1 (select age, count() group by age),
) ENGINE = MergeTree order by uid
SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 0;
INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34);
DELETE FROM users WHERE age = 34;