remove unused and add compact projout

This commit is contained in:
jsc0218 2024-11-20 03:11:36 +00:00
parent 190651cf23
commit 5231d00616
4 changed files with 29 additions and 12 deletions

View File

@ -373,14 +373,6 @@ bool MutationsInterpreter::Source::isCompactPart() const
return part && part->getType() == MergeTreeDataPartType::Compact; return part && part->getType() == MergeTreeDataPartType::Compact;
} }
const NamesAndTypesList & MutationsInterpreter::Source::getColumns() const
{
if (!part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter source part is nullptr. It is a bug.");
return part->getColumns();
}
static Names getAvailableColumnsWithVirtuals(StorageMetadataPtr metadata_snapshot, const IStorage & storage) static Names getAvailableColumnsWithVirtuals(StorageMetadataPtr metadata_snapshot, const IStorage & storage)
{ {
auto all_columns = metadata_snapshot->getColumns().getNamesOfPhysical(); auto all_columns = metadata_snapshot->getColumns().getNamesOfPhysical();

View File

@ -129,8 +129,6 @@ public:
bool hasBrokenProjection(const String & name) const; bool hasBrokenProjection(const String & name) const;
bool isCompactPart() const; bool isCompactPart() const;
const NamesAndTypesList & getColumns() const;
void read( void read(
Stage & first_stage, Stage & first_stage,
QueryPlan & plan, QueryPlan & plan,

View File

@ -749,12 +749,11 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
TemporaryPart temp_part; TemporaryPart temp_part;
const auto & metadata_snapshot = projection.metadata; const auto & metadata_snapshot = projection.metadata;
MergeTreeDataPartType part_type;
/// Size of part would not be greater than block.bytes() + epsilon /// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes(); size_t expected_size = block.bytes();
// just check if there is enough space on parent volume // just check if there is enough space on parent volume
MergeTreeData::reserveSpace(expected_size, parent_part->getDataPartStorage()); MergeTreeData::reserveSpace(expected_size, parent_part->getDataPartStorage());
part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type; MergeTreeDataPartType part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type;
auto new_data_part = parent_part->getProjectionPartBuilder(part_name, is_temp).withPartType(part_type).build(); auto new_data_part = parent_part->getProjectionPartBuilder(part_name, is_temp).withPartType(part_type).build();
auto projection_part_storage = new_data_part->getDataPartStoragePtr(); auto projection_part_storage = new_data_part->getDataPartStoragePtr();

View File

@ -1017,6 +1017,7 @@ struct MutationContext
MergeTreeData::DataPartPtr source_part; MergeTreeData::DataPartPtr source_part;
MergeTreeData::DataPartPtr projection_part; MergeTreeData::DataPartPtr projection_part;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
StorageMetadataPtr projection_metadata_snapshot;
MutationCommandsConstPtr commands; MutationCommandsConstPtr commands;
time_t time_of_mutation; time_t time_of_mutation;
@ -1662,6 +1663,30 @@ private:
ctx->mutating_pipeline.disableProfileEventUpdate(); ctx->mutating_pipeline.disableProfileEventUpdate();
ctx->mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->mutating_pipeline); ctx->mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->mutating_pipeline);
chassert (ctx->projection_mutating_pipeline_builder.initialized());
builder = std::make_unique<QueryPipelineBuilder>(std::move(ctx->projection_mutating_pipeline_builder));
const auto & projections_name_and_part = ctx->source_part->getProjectionParts();
auto part_name = fmt::format("{}", projections_name_and_part.begin()->first);
auto new_data_part = ctx->new_data_part->getProjectionPartBuilder(part_name, true).withPartType(ctx->projection_part->getType()).build();
ctx->projection_out = std::make_shared<MergedBlockOutputStream>(
new_data_part,
ctx->projection_metadata_snapshot,
ctx->projection_metadata_snapshot->getColumns().getAllPhysical(),
MergeTreeIndices{},
ColumnsStatistics{},
ctx->data->getContext()->chooseCompressionCodec(0, 0),
Tx::PrehistoricTID,
/*reset_columns=*/ false,
/*save_marks_in_cache=*/ false,
/*blocks_are_granules_size=*/ false,
ctx->data->getContext()->getWriteSettings());
ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
ctx->projection_mutating_pipeline.disableProfileEventUpdate();
ctx->projection_mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->projection_mutating_pipeline);
part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx); part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx);
} }
@ -2314,6 +2339,7 @@ bool MutateTask::prepare()
ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress); ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress);
const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin());
ctx->projection_metadata_snapshot = proj_desc.metadata;
const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); const auto & projections_name_and_part = ctx->source_part->getProjectionParts();
ctx->projection_part = projections_name_and_part.begin()->second; ctx->projection_part = projections_name_and_part.begin()->second;
@ -2329,6 +2355,8 @@ bool MutateTask::prepare()
false, false,
ctx->log); ctx->log);
chassert(!projection_commands.empty());
auto projection_interpreter = std::make_unique<MutationsInterpreter>( auto projection_interpreter = std::make_unique<MutationsInterpreter>(
*ctx->data, ctx->projection_part, alter_conversions, *ctx->data, ctx->projection_part, alter_conversions,
proj_desc.metadata, projection_commands, proj_desc.metadata, projection_commands,