mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #68496 from ClickHouse/backport/24.8/68016
Backport #68016 to 24.8: Try to make bigger steps in execution of merges
This commit is contained in:
commit
2a2c48207a
@ -519,11 +519,20 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
|
||||
|
||||
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
|
||||
{
|
||||
Block block;
|
||||
if (!ctx->is_cancelled() && (global_ctx->merging_executor->pull(block)))
|
||||
{
|
||||
global_ctx->rows_written += block.rows();
|
||||
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
|
||||
UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds();
|
||||
|
||||
do
|
||||
{
|
||||
Block block;
|
||||
|
||||
if (ctx->is_cancelled() || !global_ctx->merging_executor->pull(block))
|
||||
{
|
||||
finalize();
|
||||
return false;
|
||||
}
|
||||
|
||||
global_ctx->rows_written += block.rows();
|
||||
const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block);
|
||||
|
||||
UInt64 result_rows = 0;
|
||||
@ -543,11 +552,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
|
||||
|
||||
global_ctx->space_reservation->update(static_cast<size_t>((1. - progress) * ctx->initial_reservation));
|
||||
}
|
||||
} while (watch.elapsedMilliseconds() < step_time_ms);
|
||||
|
||||
/// Need execute again
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::finalize() const
|
||||
{
|
||||
global_ctx->merging_executor.reset();
|
||||
global_ctx->merged_pipeline.reset();
|
||||
|
||||
@ -557,14 +569,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
|
||||
if (ctx->need_remove_expired_values && global_ctx->ttl_merges_blocker->isCancelled())
|
||||
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts with expired TTL");
|
||||
|
||||
const auto data_settings = global_ctx->data->getSettings();
|
||||
const size_t sum_compressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_compressed;
|
||||
ctx->need_sync = needSyncPart(ctx->sum_input_rows_upper_bound, sum_compressed_bytes_upper_bound, *data_settings);
|
||||
|
||||
return false;
|
||||
ctx->need_sync = needSyncPart(ctx->sum_input_rows_upper_bound, sum_compressed_bytes_upper_bound, *global_ctx->data->getSettings());
|
||||
}
|
||||
|
||||
|
||||
bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
|
||||
{
|
||||
/// No need to execute this part if it is horizontal merge.
|
||||
@ -741,17 +749,24 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
|
||||
|
||||
bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const
|
||||
{
|
||||
Block block;
|
||||
if (!global_ctx->merges_blocker->isCancelled() && !global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)
|
||||
&& ctx->executor->pull(block))
|
||||
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
|
||||
UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds();
|
||||
|
||||
do
|
||||
{
|
||||
Block block;
|
||||
|
||||
if (global_ctx->merges_blocker->isCancelled()
|
||||
|| global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)
|
||||
|| !ctx->executor->pull(block))
|
||||
return false;
|
||||
|
||||
ctx->column_elems_written += block.rows();
|
||||
ctx->column_to->write(block);
|
||||
} while (watch.elapsedMilliseconds() < step_time_ms);
|
||||
|
||||
/// Need execute again
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
@ -254,6 +254,7 @@ private:
|
||||
|
||||
bool prepare();
|
||||
bool executeImpl();
|
||||
void finalize() const;
|
||||
|
||||
/// NOTE: Using pointer-to-member instead of std::function and lambda makes stacktraces much more concise and readable
|
||||
using ExecuteAndFinalizeHorizontalPartSubtasks = std::array<bool(ExecuteAndFinalizeHorizontalPart::*)(), 2>;
|
||||
|
@ -84,6 +84,7 @@ struct Settings;
|
||||
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
|
||||
M(String, merge_workload, "", "Name of workload to be used to access resources for merges", 0) \
|
||||
M(String, mutation_workload, "", "Name of workload to be used to access resources for mutations", 0) \
|
||||
M(Milliseconds, background_task_preferred_step_execution_time_ms, 50, "Target time to execution of one step of merge or mutation. Can be exceeded if one step takes longer time", 0) \
|
||||
\
|
||||
/** Inserts settings. */ \
|
||||
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
|
||||
|
@ -1257,6 +1257,8 @@ public:
|
||||
private:
|
||||
void prepare();
|
||||
bool mutateOriginalPartAndPrepareProjections();
|
||||
void writeTempProjectionPart(size_t projection_idx, Chunk chunk);
|
||||
void finalizeTempProjections();
|
||||
bool iterateThroughAllProjections();
|
||||
void constructTaskForProjectionPartsMerge();
|
||||
void finalize();
|
||||
@ -1307,10 +1309,22 @@ void PartMergerWriter::prepare()
|
||||
|
||||
bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
|
||||
{
|
||||
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
|
||||
UInt64 step_time_ms = ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds();
|
||||
|
||||
do
|
||||
{
|
||||
Block cur_block;
|
||||
Block projection_header;
|
||||
if (MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry) && ctx->mutating_executor->pull(cur_block))
|
||||
|
||||
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
|
||||
|
||||
if (!ctx->mutating_executor->pull(cur_block))
|
||||
{
|
||||
finalizeTempProjections();
|
||||
return false;
|
||||
}
|
||||
|
||||
if (ctx->minmax_idx)
|
||||
ctx->minmax_idx->update(cur_block, MergeTreeData::getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey()));
|
||||
|
||||
@ -1322,46 +1336,56 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
|
||||
|
||||
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
|
||||
{
|
||||
const auto & projection = *ctx->projections_to_build[i];
|
||||
Chunk squashed_chunk;
|
||||
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
|
||||
Block block_to_squash = projection.calculate(cur_block, ctx->context);
|
||||
projection_squashes[i].setHeader(block_to_squash.cloneEmpty());
|
||||
|
||||
Chunk squashed_chunk = Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}));
|
||||
if (squashed_chunk)
|
||||
{
|
||||
auto result = projection_squashes[i].getHeader().cloneWithColumns(squashed_chunk.detachColumns());
|
||||
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*ctx->data, ctx->log, result, projection, ctx->new_data_part.get(), ++block_num);
|
||||
tmp_part.finalize();
|
||||
tmp_part.part->getDataPartStorage().commitTransaction();
|
||||
projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
|
||||
ProfileEventTimeIncrement<Microseconds> projection_watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
|
||||
Block block_to_squash = ctx->projections_to_build[i]->calculate(cur_block, ctx->context);
|
||||
|
||||
projection_squashes[i].setHeader(block_to_squash.cloneEmpty());
|
||||
squashed_chunk = Squashing::squash(projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}));
|
||||
}
|
||||
|
||||
if (squashed_chunk)
|
||||
writeTempProjectionPart(i, std::move(squashed_chunk));
|
||||
}
|
||||
|
||||
(*ctx->mutate_entry)->rows_written += cur_block.rows();
|
||||
(*ctx->mutate_entry)->bytes_written_uncompressed += cur_block.bytes();
|
||||
} while (watch.elapsedMilliseconds() < step_time_ms);
|
||||
|
||||
/// Need execute again
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void PartMergerWriter::writeTempProjectionPart(size_t projection_idx, Chunk chunk)
|
||||
{
|
||||
const auto & projection = *ctx->projections_to_build[projection_idx];
|
||||
const auto & projection_plan = projection_squashes[projection_idx];
|
||||
|
||||
auto result = projection_plan.getHeader().cloneWithColumns(chunk.detachColumns());
|
||||
|
||||
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*ctx->data,
|
||||
ctx->log,
|
||||
result,
|
||||
projection,
|
||||
ctx->new_data_part.get(),
|
||||
++block_num);
|
||||
|
||||
tmp_part.finalize();
|
||||
tmp_part.part->getDataPartStorage().commitTransaction();
|
||||
projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
|
||||
}
|
||||
|
||||
void PartMergerWriter::finalizeTempProjections()
|
||||
{
|
||||
// Write the last block
|
||||
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
|
||||
{
|
||||
const auto & projection = *ctx->projections_to_build[i];
|
||||
auto & projection_squash_plan = projection_squashes[i];
|
||||
auto squashed_chunk = Squashing::squash(projection_squash_plan.flush());
|
||||
auto squashed_chunk = Squashing::squash(projection_squashes[i].flush());
|
||||
if (squashed_chunk)
|
||||
{
|
||||
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());
|
||||
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*ctx->data, ctx->log, result, projection, ctx->new_data_part.get(), ++block_num);
|
||||
temp_part.finalize();
|
||||
temp_part.part->getDataPartStorage().commitTransaction();
|
||||
projection_parts[projection.name].emplace_back(std::move(temp_part.part));
|
||||
}
|
||||
writeTempProjectionPart(i, std::move(squashed_chunk));
|
||||
}
|
||||
|
||||
projection_parts_iterator = std::make_move_iterator(projection_parts.begin());
|
||||
@ -1369,12 +1393,8 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
|
||||
/// Maybe there are no projections ?
|
||||
if (projection_parts_iterator != std::make_move_iterator(projection_parts.end()))
|
||||
constructTaskForProjectionPartsMerge();
|
||||
|
||||
/// Let's move on to the next stage
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
void PartMergerWriter::constructTaskForProjectionPartsMerge()
|
||||
{
|
||||
auto && [name, parts] = *projection_parts_iterator;
|
||||
|
Loading…
Reference in New Issue
Block a user