limit task time for mutations

This commit is contained in:
Anton Popov 2024-08-15 17:59:31 +00:00
parent a552747082
commit 75f951dae5
3 changed files with 57 additions and 38 deletions

View File

@ -9,7 +9,6 @@
#include <Common/ActionBlocker.h>
#include <Core/Settings.h>
#include <Common/ProfileEvents.h>
#include "base/types.h"
#include <Processors/Transforms/CheckSortedTransform.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Compression/CompressedWriteBuffer.h>
@ -521,7 +520,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
UInt64 step_time_ms = global_ctx->data->getSettings()->merge_preferred_step_execution_time_ms.totalMilliseconds();
UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds();
do
{
@ -751,7 +750,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
UInt64 step_time_ms = global_ctx->data->getSettings()->merge_preferred_step_execution_time_ms.totalMilliseconds();
UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds();
do
{

View File

@ -84,7 +84,7 @@ struct Settings;
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
M(String, merge_workload, "", "Name of workload to be used to access resources for merges", 0) \
M(String, mutation_workload, "", "Name of workload to be used to access resources for mutations", 0) \
M(Milliseconds, merge_preferred_step_execution_time_ms, 50, "Target time to execution of one step of merge. Can be exceeded if one step takes longer time", 0) \
M(Milliseconds, background_task_preferred_step_execution_time_ms, 50, "Target time to execution of one step of merge or mutation. Can be exceeded if one step takes longer time", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \

View File

@ -1257,6 +1257,8 @@ public:
private:
void prepare();
bool mutateOriginalPartAndPrepareProjections();
void writeTempProjectionPart(size_t projection_idx, Chunk chunk);
void finalizeTempProjections();
bool iterateThroughAllProjections();
void constructTaskForProjectionPartsMerge();
void finalize();
@ -1307,10 +1309,22 @@ void PartMergerWriter::prepare()
bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
{
Block cur_block;
Block projection_header;
if (MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry) && ctx->mutating_executor->pull(cur_block))
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
UInt64 step_time_ms = ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds();
do
{
Block cur_block;
Block projection_header;
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
if (!ctx->mutating_executor->pull(cur_block))
{
finalizeTempProjections();
return false;
}
if (ctx->minmax_idx)
ctx->minmax_idx->update(cur_block, MergeTreeData::getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey()));
@ -1322,46 +1336,56 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
const auto & projection = *ctx->projections_to_build[i];
Chunk squashed_chunk;
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
Block block_to_squash = projection.calculate(cur_block, ctx->context);
projection_squashes[i].setHeader(block_to_squash.cloneEmpty());
Chunk squashed_chunk = Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}));
if (squashed_chunk)
{
auto result = projection_squashes[i].getHeader().cloneWithColumns(squashed_chunk.detachColumns());
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, result, projection, ctx->new_data_part.get(), ++block_num);
tmp_part.finalize();
tmp_part.part->getDataPartStorage().commitTransaction();
projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
ProfileEventTimeIncrement<Microseconds> projection_watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
Block block_to_squash = ctx->projections_to_build[i]->calculate(cur_block, ctx->context);
projection_squashes[i].setHeader(block_to_squash.cloneEmpty());
squashed_chunk = Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}));
}
if (squashed_chunk)
writeTempProjectionPart(i, std::move(squashed_chunk));
}
(*ctx->mutate_entry)->rows_written += cur_block.rows();
(*ctx->mutate_entry)->bytes_written_uncompressed += cur_block.bytes();
} while (watch.elapsedMilliseconds() < step_time_ms);
/// Need execute again
return true;
}
/// Need execute again
return true;
}
void PartMergerWriter::writeTempProjectionPart(size_t projection_idx, Chunk chunk)
{
const auto & projection = *ctx->projections_to_build[projection_idx];
const auto & projection_plan = projection_squashes[projection_idx];
auto result = projection_plan.getHeader().cloneWithColumns(chunk.detachColumns());
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data,
ctx->log,
result,
projection,
ctx->new_data_part.get(),
++block_num);
tmp_part.finalize();
tmp_part.part->getDataPartStorage().commitTransaction();
projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
}
void PartMergerWriter::finalizeTempProjections()
{
// Write the last block
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
const auto & projection = *ctx->projections_to_build[i];
auto & projection_squash_plan = projection_squashes[i];
auto squashed_chunk = Squashing::squash(projection_squash_plan.flush());
auto squashed_chunk = Squashing::squash(projection_squashes[i].flush());
if (squashed_chunk)
{
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, result, projection, ctx->new_data_part.get(), ++block_num);
temp_part.finalize();
temp_part.part->getDataPartStorage().commitTransaction();
projection_parts[projection.name].emplace_back(std::move(temp_part.part));
}
writeTempProjectionPart(i, std::move(squashed_chunk));
}
projection_parts_iterator = std::make_move_iterator(projection_parts.begin());
@ -1369,12 +1393,8 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
/// Maybe there are no projections ?
if (projection_parts_iterator != std::make_move_iterator(projection_parts.end()))
constructTaskForProjectionPartsMerge();
/// Let's move on to the next stage
return false;
}
void PartMergerWriter::constructTaskForProjectionPartsMerge()
{
auto && [name, parts] = *projection_parts_iterator;