AggregatingTransform initGenerate race condition fix

This commit is contained in:
Maksim Kita 2023-10-11 14:02:35 +03:00
parent 21e0cb0b18
commit 0859fc8de9
2 changed files with 8 additions and 3 deletions

View File

@ -623,7 +623,9 @@ IProcessor::Status AggregatingTransform::prepare()
void AggregatingTransform::work()
{
if (is_consume_finished)
{
initGenerate();
}
else
{
consume(std::move(current_chunk));
@ -676,11 +678,14 @@ void AggregatingTransform::consume(Chunk chunk)
void AggregatingTransform::initGenerate()
{
if (is_generate_initialized)
if (is_generate_initialized.load(std::memory_order_acquire))
return;
std::lock_guard lock(snapshot_mutex);
is_generate_initialized = true;
if (is_generate_initialized.load(std::memory_order_relaxed))
return;
is_generate_initialized.store(true, std::memory_order_release);
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.

View File

@ -217,7 +217,7 @@ private:
UInt64 src_rows = 0;
UInt64 src_bytes = 0;
bool is_generate_initialized = false;
std::atomic<bool> is_generate_initialized = false;
bool is_consume_finished = false;
bool is_pipeline_created = false;