diff --git a/dbms/include/DB/DataStreams/UnionBlockInputStream.h b/dbms/include/DB/DataStreams/UnionBlockInputStream.h index 9ed197970da..6d250384f4c 100644 --- a/dbms/include/DB/DataStreams/UnionBlockInputStream.h +++ b/dbms/include/DB/DataStreams/UnionBlockInputStream.h @@ -37,8 +37,8 @@ class UnionBlockInputStream : public IProfilingBlockInputStream public: UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1) : max_threads(std::min(inputs_.size(), static_cast(max_threads_))), - output_queue(max_threads), exhausted_inputs(0), finish(false), all_read(false), - log(&Logger::get("UnionBlockInputStream")) + output_queue(max_threads), exhausted_inputs(0), finish(false), + pushed_end_of_output_queue(false), all_read(false), log(&Logger::get("UnionBlockInputStream")) { children.insert(children.end(), inputs_.begin(), inputs_.end()); @@ -276,7 +276,11 @@ private: Poco::ScopedLock lock(parent.mutex); /// Отдаём в основной поток пустой блок, что означает, что данных больше нет. - parent.output_queue.push(OutputData()); + if (!parent.pushed_end_of_output_queue) + { + parent.pushed_end_of_output_queue = true; + parent.output_queue.push(OutputData()); + } } } @@ -318,6 +322,8 @@ private: /// Завершить работу потоков (раньше, чем иссякнут источники). volatile bool finish; + /// Положили ли в output_queue пустой блок. + bool pushed_end_of_output_queue; bool all_read; Logger * log;