Atomically test and set is_generate_initialized

This commit is contained in:
Antonio Andelic 2024-01-17 09:36:40 +00:00
parent f349a7d0c0
commit 1e180d9a7e
2 changed files with 5 additions and 6 deletions

View File

@ -1,3 +1,4 @@
#include <atomic>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Formats/NativeReader.h>
@ -578,7 +579,7 @@ IProcessor::Status AggregatingTransform::prepare()
}
/// Finish data processing, prepare to generating.
if (is_consume_finished && !is_generate_initialized)
if (is_consume_finished && !is_generate_initialized.test())
{
/// Close input port in case max_rows_to_group_by was reached but not all data was read.
inputs.front().close();
@ -586,7 +587,7 @@ IProcessor::Status AggregatingTransform::prepare()
return Status::Ready;
}
if (is_generate_initialized && !is_pipeline_created && !processors.empty())
if (is_generate_initialized.test() && !is_pipeline_created && !processors.empty())
return Status::ExpandPipeline;
/// Only possible while consuming.
@ -687,11 +688,9 @@ void AggregatingTransform::consume(Chunk chunk)
void AggregatingTransform::initGenerate()
{
if (is_generate_initialized.load(std::memory_order_acquire))
if (is_generate_initialized.test_and_set())
return;
is_generate_initialized.store(true, std::memory_order_release);
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set)

View File

@ -205,7 +205,7 @@ private:
UInt64 src_rows = 0;
UInt64 src_bytes = 0;
std::atomic<bool> is_generate_initialized = false;
std::atomic_flag is_generate_initialized;
bool is_consume_finished = false;
bool is_pipeline_created = false;