diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 4a438795c88..a3e549ecda3 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -94,7 +94,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const String local_tmp_prefix = global_ctx->parent_part ? "" : "tmp_merge_"; const String local_tmp_suffix = global_ctx->parent_part ? ctx->suffix : ""; - if (global_ctx->merges_blocker->isCancelled()) + if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); /// We don't want to perform merge assigned with TTL as normal merge, so @@ -344,7 +344,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() global_ctx->merging_executor.reset(); global_ctx->merged_pipeline.reset(); - if (global_ctx->merges_blocker->isCancelled()) + if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); if (ctx->need_remove_expired_values && global_ctx->ttl_merges_blocker->isCancelled()) @@ -443,7 +443,8 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const { Block block; - if (!global_ctx->merges_blocker->isCancelled() && ctx->executor->pull(block)) + if (!global_ctx->merges_blocker->isCancelled() && !global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed) + && ctx->executor->pull(block)) { ctx->column_elems_written += block.rows(); ctx->column_to->write(block); @@ -458,7 +459,7 @@ bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const { const String & column_name = ctx->it_name_and_type->name; - if (global_ctx->merges_blocker->isCancelled()) + if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); ctx->executor.reset();