dbms: fixed race condition [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-11-10 04:42:26 +00:00
parent 75239c6937
commit b664e10a37

View File

@ -91,7 +91,7 @@ class UnionBlockInputStream : public IProfilingBlockInputStream
public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
output_queue(max_threads), exhausted_inputs(0), finish(false),
output_queue(max_threads), exhausted_inputs(0), finish(false), all_read(false),
log(&Logger::get("UnionBlockInputStream"))
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
@ -134,7 +134,7 @@ public:
void cancel()
{
is_cancelled = true;
ExceptionPtr exception;
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
{
@ -175,7 +175,7 @@ protected:
Block readImpl()
{
OutputData res;
if (finish)
if (all_read)
return res.block;
/// Запускаем потоки, если это ещё не было сделано.
@ -196,6 +196,9 @@ protected:
if (res.exception)
res.exception->rethrow();
if (!res.block)
all_read = true;
return res.block;
}
@ -356,6 +359,7 @@ private:
/// Завершить работу потоков (раньше, чем иссякнут источники).
volatile bool finish;
bool all_read;
Logger * log;
};