Fix race.

This commit is contained in:
Nikolai Kochetov 2022-11-21 18:26:31 +00:00
parent be1a8054c7
commit c305afd77a

View File

@ -39,16 +39,6 @@ struct MergeTreeSource::AsyncReadingState
struct Control
{
EventFD event;
std::atomic<Stage> stage = Stage::NotStarted;
void finish()
{
stage = Stage::IsFinished;
event.write();
}
};
/// setResult and setException are the only methods
/// which can be called from background thread.
/// Invariant:
@ -57,21 +47,50 @@ struct MergeTreeSource::AsyncReadingState
void setResult(ChunkAndProgress chunk_)
{
assert(control->stage == Stage::InProgress);
chassert(stage == Stage::InProgress);
chunk = std::move(chunk_);
control->finish();
finish();
}
void setException(std::exception_ptr exception_)
{
assert(control->stage == Stage::InProgress);
chassert(stage == Stage::InProgress);
exception = exception_;
control->finish();
finish();
}
private:
EventFD event;
std::atomic<Stage> stage = Stage::NotStarted;
ChunkAndProgress chunk;
std::exception_ptr exception;
void finish()
{
stage = Stage::IsFinished;
event.write();
}
ChunkAndProgress getResult()
{
chassert(stage == Stage::IsFinished);
event.read();
stage = Stage::NotStarted;
if (exception)
std::rethrow_exception(exception);
return std::move(chunk);
}
friend struct AsyncReadingState;
};
std::shared_ptr<Control> start()
{
assert(control->stage == Stage::NotStarted);
chassert(control->stage == Stage::NotStarted);
control->stage = Stage::InProgress;
return control;
}
@ -83,14 +102,7 @@ struct MergeTreeSource::AsyncReadingState
ChunkAndProgress getResult()
{
assert(control->stage == Stage::IsFinished);
control->event.read();
control->stage = Stage::NotStarted;
if (exception)
std::rethrow_exception(exception);
return std::move(chunk);
return control->getResult();
}
Stage getStage() const { return control->stage; }
@ -123,8 +135,6 @@ struct MergeTreeSource::AsyncReadingState
private:
ThreadPoolCallbackRunner<void> callback_runner;
ChunkAndProgress chunk;
std::exception_ptr exception;
std::shared_ptr<Control> control;
};
@ -168,8 +178,7 @@ std::optional<Chunk> MergeTreeSource::tryGenerate()
if (async_reading_state->getStage() == AsyncReadingState::Stage::IsFinished)
return reportProgress(async_reading_state->getResult());
assert(async_reading_state->getStage() == AsyncReadingState::Stage::NotStarted);
async_reading_state->start();
chassert(async_reading_state->getStage() == AsyncReadingState::Stage::NotStarted);
/// It is important to store control into job.
/// Otherwise, race between job and ~MergeTreeBaseSelectProcessor is possible.
@ -179,11 +188,11 @@ std::optional<Chunk> MergeTreeSource::tryGenerate()
try
{
async_reading_state->setResult(algorithm->read());
holder->setResult(algorithm->read());
}
catch (...)
{
async_reading_state->setException(std::current_exception());
holder->setException(std::current_exception());
}
};