Merge pull request #57352 from kitaisreal/mutations-reuse-source-part-granularity

MergeTree mutations reuse source part index granularity
This commit is contained in:
Anton Popov 2023-12-01 16:45:41 +01:00 committed by GitHub
commit 61c517bc02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 4 deletions

View File

@ -24,7 +24,8 @@ MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeTransactionPtr & txn,
bool reset_columns_,
bool blocks_are_granules_size,
const WriteSettings & write_settings_)
const WriteSettings & write_settings_,
const MergeTreeIndexGranularity & computed_index_granularity)
: IMergedBlockOutputStream(data_part, metadata_snapshot_, columns_list_, reset_columns_)
, columns_list(columns_list_)
, default_codec(default_codec_)
@ -48,7 +49,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
data_part->version.setCreationTID(tid, nullptr);
data_part->storeVersionMetadata();
writer = data_part->getWriter(columns_list, metadata_snapshot, skip_indices, statistics, default_codec, writer_settings, {});
writer = data_part->getWriter(columns_list, metadata_snapshot, skip_indices, statistics, default_codec, writer_settings, computed_index_granularity);
}
/// If data is pre-sorted.

View File

@ -25,7 +25,8 @@ public:
const MergeTreeTransactionPtr & txn,
bool reset_columns_ = false,
bool blocks_are_granules_size = false,
const WriteSettings & write_settings = {});
const WriteSettings & write_settings = {},
const MergeTreeIndexGranularity & computed_index_granularity = {});
Block getHeader() const { return metadata_snapshot->getSampleBlock(); }

View File

@ -1525,6 +1525,22 @@ private:
ctx->minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
MergeTreeIndexGranularity computed_granularity;
bool has_delete = false;
for (auto & command_for_interpreter : ctx->for_interpreter)
{
if (command_for_interpreter.type == MutationCommand::DELETE)
{
has_delete = true;
break;
}
}
/// Reuse source part granularity if mutation does not change number of rows
if (!has_delete && ctx->execute_ttl_type == ExecuteTTLType::NONE)
computed_granularity = ctx->source_part->index_granularity;
ctx->out = std::make_shared<MergedBlockOutputStream>(
ctx->new_data_part,
ctx->metadata_snapshot,
@ -1535,7 +1551,8 @@ private:
ctx->txn,
/*reset_columns=*/ true,
/*blocks_are_granules_size=*/ false,
ctx->context->getWriteSettings());
ctx->context->getWriteSettings(),
computed_granularity);
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
ctx->mutating_pipeline.setProgressCallback(ctx->progress_callback);