From c305afd77ac29703023e7a5c75909c6dec11915e Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 21 Nov 2022 18:26:31 +0000 Subject: [PATCH] Fix race. --- src/Storages/MergeTree/MergeTreeSource.cpp | 79 ++++++++++++---------- 1 file changed, 44 insertions(+), 35 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeSource.cpp b/src/Storages/MergeTree/MergeTreeSource.cpp index 39c4c803d1a..5d4f047eb22 100644 --- a/src/Storages/MergeTree/MergeTreeSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSource.cpp @@ -39,39 +39,58 @@ struct MergeTreeSource::AsyncReadingState struct Control { + /// setResult and setException are the only methods + /// which can be called from background thread. + /// Invariant: + /// * background thread changes status InProgress -> IsFinished + /// * (status == InProgress) => (MergeTreeBaseSelectProcessor is alive) + + void setResult(ChunkAndProgress chunk_) + { + chassert(stage == Stage::InProgress); + chunk = std::move(chunk_); + finish(); + } + + void setException(std::exception_ptr exception_) + { + chassert(stage == Stage::InProgress); + exception = exception_; + finish(); + } + + private: + EventFD event; std::atomic stage = Stage::NotStarted; + ChunkAndProgress chunk; + std::exception_ptr exception; + void finish() { stage = Stage::IsFinished; event.write(); } + + ChunkAndProgress getResult() + { + chassert(stage == Stage::IsFinished); + event.read(); + stage = Stage::NotStarted; + + if (exception) + std::rethrow_exception(exception); + + return std::move(chunk); + } + + friend struct AsyncReadingState; }; - /// setResult and setException are the only methods - /// which can be called from background thread. - /// Invariant: - /// * background thread changes status InProgress -> IsFinished - /// * (status == InProgress) => (MergeTreeBaseSelectProcessor is alive) - - void setResult(ChunkAndProgress chunk_) - { - assert(control->stage == Stage::InProgress); - chunk = std::move(chunk_); - control->finish(); - } - - void setException(std::exception_ptr exception_) - { - assert(control->stage == Stage::InProgress); - exception = exception_; - control->finish(); - } - std::shared_ptr start() { - assert(control->stage == Stage::NotStarted); + chassert(control->stage == Stage::NotStarted); control->stage = Stage::InProgress; return control; } @@ -83,14 +102,7 @@ struct MergeTreeSource::AsyncReadingState ChunkAndProgress getResult() { - assert(control->stage == Stage::IsFinished); - control->event.read(); - control->stage = Stage::NotStarted; - - if (exception) - std::rethrow_exception(exception); - - return std::move(chunk); + return control->getResult(); } Stage getStage() const { return control->stage; } @@ -123,8 +135,6 @@ struct MergeTreeSource::AsyncReadingState private: ThreadPoolCallbackRunner callback_runner; - ChunkAndProgress chunk; - std::exception_ptr exception; std::shared_ptr control; }; @@ -168,8 +178,7 @@ std::optional MergeTreeSource::tryGenerate() if (async_reading_state->getStage() == AsyncReadingState::Stage::IsFinished) return reportProgress(async_reading_state->getResult()); - assert(async_reading_state->getStage() == AsyncReadingState::Stage::NotStarted); - async_reading_state->start(); + chassert(async_reading_state->getStage() == AsyncReadingState::Stage::NotStarted); /// It is important to store control into job. /// Otherwise, race between job and ~MergeTreeBaseSelectProcessor is possible. @@ -179,11 +188,11 @@ std::optional MergeTreeSource::tryGenerate() try { - async_reading_state->setResult(algorithm->read()); + holder->setResult(algorithm->read()); } catch (...) { - async_reading_state->setException(std::current_exception()); + holder->setException(std::current_exception()); } };