diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index ee15bc7f711..26cb821f33b 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -9,7 +9,6 @@ #include #include #include -#include "base/types.h" #include #include #include @@ -521,7 +520,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute() bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() { Stopwatch watch(CLOCK_MONOTONIC_COARSE); - UInt64 step_time_ms = global_ctx->data->getSettings()->merge_preferred_step_execution_time_ms.totalMilliseconds(); + UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds(); do { @@ -751,7 +750,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const { Stopwatch watch(CLOCK_MONOTONIC_COARSE); - UInt64 step_time_ms = global_ctx->data->getSettings()->merge_preferred_step_execution_time_ms.totalMilliseconds(); + UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds(); do { diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 149f9a3e80b..de1f0f60cfc 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -84,7 +84,7 @@ struct Settings; M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \ M(String, merge_workload, "", "Name of workload to be used to access resources for merges", 0) \ M(String, mutation_workload, "", "Name of workload to be used to access resources for mutations", 0) \ - M(Milliseconds, merge_preferred_step_execution_time_ms, 50, "Target time to execution of one step of merge. Can be exceeded if one step takes longer time", 0) \ + M(Milliseconds, background_task_preferred_step_execution_time_ms, 50, "Target time to execution of one step of merge or mutation. Can be exceeded if one step takes longer time", 0) \ \ /** Inserts settings. */ \ M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \ diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 14c274d7f64..40648439887 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1257,6 +1257,8 @@ public: private: void prepare(); bool mutateOriginalPartAndPrepareProjections(); + void writeTempProjectionPart(size_t projection_idx, Chunk chunk); + void finalizeTempProjections(); bool iterateThroughAllProjections(); void constructTaskForProjectionPartsMerge(); void finalize(); @@ -1307,10 +1309,22 @@ void PartMergerWriter::prepare() bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { - Block cur_block; - Block projection_header; - if (MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry) && ctx->mutating_executor->pull(cur_block)) + Stopwatch watch(CLOCK_MONOTONIC_COARSE); + UInt64 step_time_ms = ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds(); + + do { + Block cur_block; + Block projection_header; + + MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry); + + if (!ctx->mutating_executor->pull(cur_block)) + { + finalizeTempProjections(); + return false; + } + if (ctx->minmax_idx) ctx->minmax_idx->update(cur_block, MergeTreeData::getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey())); @@ -1322,46 +1336,56 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) { - const auto & projection = *ctx->projections_to_build[i]; + Chunk squashed_chunk; - ProfileEventTimeIncrement watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds); - Block block_to_squash = projection.calculate(cur_block, ctx->context); - projection_squashes[i].setHeader(block_to_squash.cloneEmpty()); - - Chunk squashed_chunk = Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()})); - if (squashed_chunk) { - auto result = projection_squashes[i].getHeader().cloneWithColumns(squashed_chunk.detachColumns()); - auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( - *ctx->data, ctx->log, result, projection, ctx->new_data_part.get(), ++block_num); - tmp_part.finalize(); - tmp_part.part->getDataPartStorage().commitTransaction(); - projection_parts[projection.name].emplace_back(std::move(tmp_part.part)); + ProfileEventTimeIncrement projection_watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds); + Block block_to_squash = ctx->projections_to_build[i]->calculate(cur_block, ctx->context); + + projection_squashes[i].setHeader(block_to_squash.cloneEmpty()); + squashed_chunk = Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()})); } + + if (squashed_chunk) + writeTempProjectionPart(i, std::move(squashed_chunk)); } (*ctx->mutate_entry)->rows_written += cur_block.rows(); (*ctx->mutate_entry)->bytes_written_uncompressed += cur_block.bytes(); + } while (watch.elapsedMilliseconds() < step_time_ms); - /// Need execute again - return true; - } + /// Need execute again + return true; +} +void PartMergerWriter::writeTempProjectionPart(size_t projection_idx, Chunk chunk) +{ + const auto & projection = *ctx->projections_to_build[projection_idx]; + const auto & projection_plan = projection_squashes[projection_idx]; + + auto result = projection_plan.getHeader().cloneWithColumns(chunk.detachColumns()); + + auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( + *ctx->data, + ctx->log, + result, + projection, + ctx->new_data_part.get(), + ++block_num); + + tmp_part.finalize(); + tmp_part.part->getDataPartStorage().commitTransaction(); + projection_parts[projection.name].emplace_back(std::move(tmp_part.part)); +} + +void PartMergerWriter::finalizeTempProjections() +{ // Write the last block for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) { - const auto & projection = *ctx->projections_to_build[i]; - auto & projection_squash_plan = projection_squashes[i]; - auto squashed_chunk = Squashing::squash(projection_squash_plan.flush()); + auto squashed_chunk = Squashing::squash(projection_squashes[i].flush()); if (squashed_chunk) - { - auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns()); - auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( - *ctx->data, ctx->log, result, projection, ctx->new_data_part.get(), ++block_num); - temp_part.finalize(); - temp_part.part->getDataPartStorage().commitTransaction(); - projection_parts[projection.name].emplace_back(std::move(temp_part.part)); - } + writeTempProjectionPart(i, std::move(squashed_chunk)); } projection_parts_iterator = std::make_move_iterator(projection_parts.begin()); @@ -1369,12 +1393,8 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() /// Maybe there are no projections ? if (projection_parts_iterator != std::make_move_iterator(projection_parts.end())) constructTaskForProjectionPartsMerge(); - - /// Let's move on to the next stage - return false; } - void PartMergerWriter::constructTaskForProjectionPartsMerge() { auto && [name, parts] = *projection_parts_iterator;