Cancel vertical merges (#31057)

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
Amos Bird 2021-11-15 19:32:53 +08:00 committed by GitHub
parent af820f85d3
commit 30b06a969b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -94,7 +94,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
const String local_tmp_prefix = global_ctx->parent_part ? "" : "tmp_merge_";
const String local_tmp_suffix = global_ctx->parent_part ? ctx->suffix : "";
if (global_ctx->merges_blocker->isCancelled())
if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed))
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
/// We don't want to perform merge assigned with TTL as normal merge, so
@ -344,7 +344,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
global_ctx->merging_executor.reset();
global_ctx->merged_pipeline.reset();
if (global_ctx->merges_blocker->isCancelled())
if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed))
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
if (ctx->need_remove_expired_values && global_ctx->ttl_merges_blocker->isCancelled())
@ -443,7 +443,8 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const
{
Block block;
if (!global_ctx->merges_blocker->isCancelled() && ctx->executor->pull(block))
if (!global_ctx->merges_blocker->isCancelled() && !global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)
&& ctx->executor->pull(block))
{
ctx->column_elems_written += block.rows();
ctx->column_to->write(block);
@ -458,7 +459,7 @@ bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const
void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
{
const String & column_name = ctx->it_name_and_type->name;
if (global_ctx->merges_blocker->isCancelled())
if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed))
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
ctx->executor.reset();