From b664e10a373ffadeb0c1f8f1ee8a8935bf720ada Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 10 Nov 2012 04:42:26 +0000 Subject: [PATCH] dbms: fixed race condition [#CONV-2944]. --- dbms/include/DB/DataStreams/UnionBlockInputStream.h | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dbms/include/DB/DataStreams/UnionBlockInputStream.h b/dbms/include/DB/DataStreams/UnionBlockInputStream.h index 73925bfbe8e..ab105bbf377 100644 --- a/dbms/include/DB/DataStreams/UnionBlockInputStream.h +++ b/dbms/include/DB/DataStreams/UnionBlockInputStream.h @@ -91,7 +91,7 @@ class UnionBlockInputStream : public IProfilingBlockInputStream public: UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1) : max_threads(std::min(inputs_.size(), static_cast(max_threads_))), - output_queue(max_threads), exhausted_inputs(0), finish(false), + output_queue(max_threads), exhausted_inputs(0), finish(false), all_read(false), log(&Logger::get("UnionBlockInputStream")) { children.insert(children.end(), inputs_.begin(), inputs_.end()); @@ -134,7 +134,7 @@ public: void cancel() { is_cancelled = true; - + ExceptionPtr exception; for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it) { @@ -175,7 +175,7 @@ protected: Block readImpl() { OutputData res; - if (finish) + if (all_read) return res.block; /// Запускаем потоки, если это ещё не было сделано. @@ -196,6 +196,9 @@ protected: if (res.exception) res.exception->rethrow(); + if (!res.block) + all_read = true; + return res.block; } @@ -356,6 +359,7 @@ private: /// Завершить работу потоков (раньше, чем иссякнут источники). volatile bool finish; + bool all_read; Logger * log; };