Merge pull request #58896 from ClickHouse/fix-data-race-aggregatetransform

Fix data race in `AggregatingTransform`
This commit is contained in:
robot-ch-test-poll 2024-01-18 07:46:49 +01:00 committed by GitHub
commit fba4d3f427
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 5 additions and 6 deletions

View File

@ -1,3 +1,4 @@
#include <atomic>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Formats/NativeReader.h>
@ -578,7 +579,7 @@ IProcessor::Status AggregatingTransform::prepare()
}
/// Finish data processing, prepare to generating.
if (is_consume_finished && !is_generate_initialized)
if (is_consume_finished && !is_generate_initialized.test())
{
/// Close input port in case max_rows_to_group_by was reached but not all data was read.
inputs.front().close();
@ -586,7 +587,7 @@ IProcessor::Status AggregatingTransform::prepare()
return Status::Ready;
}
if (is_generate_initialized && !is_pipeline_created && !processors.empty())
if (is_generate_initialized.test() && !is_pipeline_created && !processors.empty())
return Status::ExpandPipeline;
/// Only possible while consuming.
@ -687,11 +688,9 @@ void AggregatingTransform::consume(Chunk chunk)
void AggregatingTransform::initGenerate()
{
if (is_generate_initialized.load(std::memory_order_acquire))
if (is_generate_initialized.test_and_set())
return;
is_generate_initialized.store(true, std::memory_order_release);
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set)

View File

@ -205,7 +205,7 @@ private:
UInt64 src_rows = 0;
UInt64 src_bytes = 0;
std::atomic<bool> is_generate_initialized = false;
std::atomic_flag is_generate_initialized;
bool is_consume_finished = false;
bool is_pipeline_created = false;