UnionBlockInputStream: fixed possible deadlock. [#METR-9007]

This commit is contained in:
Michael Kolupaev 2013-11-29 12:25:54 +00:00
parent aa3ed8bc66
commit 812127899d

View File

@ -37,8 +37,8 @@ class UnionBlockInputStream : public IProfilingBlockInputStream
public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
output_queue(max_threads), exhausted_inputs(0), finish(false), all_read(false),
log(&Logger::get("UnionBlockInputStream"))
output_queue(max_threads), exhausted_inputs(0), finish(false),
pushed_end_of_output_queue(false), all_read(false), log(&Logger::get("UnionBlockInputStream"))
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
@ -276,7 +276,11 @@ private:
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
/// Отдаём в основной поток пустой блок, что означает, что данных больше нет.
parent.output_queue.push(OutputData());
if (!parent.pushed_end_of_output_queue)
{
parent.pushed_end_of_output_queue = true;
parent.output_queue.push(OutputData());
}
}
}
@ -318,6 +322,8 @@ private:
/// Завершить работу потоков (раньше, чем иссякнут источники).
volatile bool finish;
/// Положили ли в output_queue пустой блок.
bool pushed_end_of_output_queue;
bool all_read;
Logger * log;