From 1e180d9a7ece84429dab5df040870e9a2edd86ef Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 17 Jan 2024 09:36:40 +0000 Subject: [PATCH] Atomically test and set is_generate_initialized --- src/Processors/Transforms/AggregatingTransform.cpp | 9 ++++----- src/Processors/Transforms/AggregatingTransform.h | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index d8888773054..2fd9f102159 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -578,7 +579,7 @@ IProcessor::Status AggregatingTransform::prepare() } /// Finish data processing, prepare to generating. - if (is_consume_finished && !is_generate_initialized) + if (is_consume_finished && !is_generate_initialized.test()) { /// Close input port in case max_rows_to_group_by was reached but not all data was read. inputs.front().close(); @@ -586,7 +587,7 @@ IProcessor::Status AggregatingTransform::prepare() return Status::Ready; } - if (is_generate_initialized && !is_pipeline_created && !processors.empty()) + if (is_generate_initialized.test() && !is_pipeline_created && !processors.empty()) return Status::ExpandPipeline; /// Only possible while consuming. @@ -687,11 +688,9 @@ void AggregatingTransform::consume(Chunk chunk) void AggregatingTransform::initGenerate() { - if (is_generate_initialized.load(std::memory_order_acquire)) + if (is_generate_initialized.test_and_set()) return; - is_generate_initialized.store(true, std::memory_order_release); - /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. /// To do this, we pass a block with zero rows to aggregate. if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set) diff --git a/src/Processors/Transforms/AggregatingTransform.h b/src/Processors/Transforms/AggregatingTransform.h index 61a6acd6bc8..91fdf479ffb 100644 --- a/src/Processors/Transforms/AggregatingTransform.h +++ b/src/Processors/Transforms/AggregatingTransform.h @@ -205,7 +205,7 @@ private: UInt64 src_rows = 0; UInt64 src_bytes = 0; - std::atomic is_generate_initialized = false; + std::atomic_flag is_generate_initialized; bool is_consume_finished = false; bool is_pipeline_created = false;