From 0bc7b868a1eeef3d4fb38aa1a9469366a3891e32 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Mon, 25 Nov 2013 10:46:25 +0000 Subject: [PATCH] ClickHouse: fixed UnionBlockInputStream calling readSuffixImpl during a call to readImpl. Better invariant for IBlockInputStream::readSuffix(). [#METR-9007] --- .../DB/Common/ConcurrentBoundedQueue.h | 6 ++ .../DB/DataStreams/IBlockInputStream.h | 2 + .../DB/DataStreams/UnionBlockInputStream.h | 67 ++++++++++--------- 3 files changed, 44 insertions(+), 31 deletions(-) diff --git a/dbms/include/DB/Common/ConcurrentBoundedQueue.h b/dbms/include/DB/Common/ConcurrentBoundedQueue.h index 7f733587152..ee771db5f96 100644 --- a/dbms/include/DB/Common/ConcurrentBoundedQueue.h +++ b/dbms/include/DB/Common/ConcurrentBoundedQueue.h @@ -76,6 +76,12 @@ public: return false; } + size_t size() + { + Poco::ScopedLock lock(mutex); + return queue.size(); + } + void clear() { while (fill_count.tryWait(0)) diff --git a/dbms/include/DB/DataStreams/IBlockInputStream.h b/dbms/include/DB/DataStreams/IBlockInputStream.h index ad248c8a671..62fab248a1b 100644 --- a/dbms/include/DB/DataStreams/IBlockInputStream.h +++ b/dbms/include/DB/DataStreams/IBlockInputStream.h @@ -43,6 +43,8 @@ public: /** Прочитать что-нибудь перед началом всех данных или после конца всех данных. * В функции readSuffix можно реализовать финализацию, которая может привести к исключению. + * readPrefix() должна вызываться до первого вызова read(). + * readSuffix() должна вызываться после того, как read() вернула пустой блок, или после вызова cancel(). */ virtual void readPrefix() {} virtual void readSuffix() {} diff --git a/dbms/include/DB/DataStreams/UnionBlockInputStream.h b/dbms/include/DB/DataStreams/UnionBlockInputStream.h index f9abc72bb7a..a08c4d12e6d 100644 --- a/dbms/include/DB/DataStreams/UnionBlockInputStream.h +++ b/dbms/include/DB/DataStreams/UnionBlockInputStream.h @@ -37,7 +37,7 @@ class UnionBlockInputStream : public IProfilingBlockInputStream public: UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1) : max_threads(std::min(inputs_.size(), static_cast(max_threads_))), - output_queue(max_threads), exhausted_inputs(0), finish(false), all_read(false), finalized(false), + output_queue(max_threads), exhausted_inputs(0), finish(false), all_read(false), log(&Logger::get("UnionBlockInputStream")) { children.insert(children.end(), inputs_.begin(), inputs_.end()); @@ -76,7 +76,13 @@ public: { try { - readSuffixImpl(); + if (!all_read) + cancel(); + if (!is_cancelled) + { + for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it) + it->thread->join(); + } } catch (...) { @@ -92,6 +98,7 @@ public: if (!__sync_bool_compare_and_swap(&is_cancelled, false, true)) return; + finish = true; for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it) { if (IProfilingBlockInputStream * child = dynamic_cast(&**it)) @@ -106,6 +113,22 @@ public: } } } + + LOG_TRACE(log, "Waiting for threads to finish"); + + /// Вынем всё, что есть в очереди готовых данных. + OutputData res; + while (output_queue.tryPop(res)) + ; + + /** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится. + * PS. Может быть, для переменной finish нужен барьер? + */ + + for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it) + it->thread->join(); + + LOG_TRACE(log, "Waited for threads to finish"); } protected: @@ -141,29 +164,8 @@ protected: void readSuffixImpl() { - if (finalized) - return; - - finalized = true; - - LOG_TRACE(log, "Waiting for threads to finish"); - - finish = true; - cancel(); - - /// Вынем всё, что есть в очереди готовых данных. - OutputData res; - while (output_queue.tryPop(res)) - ; - - /** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится. - * PS. Может быть, для переменной finish нужен барьер? - */ - - for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it) - it->thread->join(); - - LOG_TRACE(log, "Waited for threads to finish"); + if (!all_read && !is_cancelled) + throw Exception("readSuffixImpl called before all data is read", ErrorCodes::LOGICAL_ERROR); } private: @@ -262,15 +264,21 @@ private: /// Если все источники иссякли. if (parent.exhausted_inputs == parent.children.size()) { - /// Отдаём в основной поток пустой блок, что означает, что данных больше нет. - parent.output_queue.push(OutputData()); parent.finish = true; - break; } } } } + if (parent.finish) + { + Poco::ScopedLock lock(parent.mutex); + + /// Не будем оставлять очередь пустой на случай, если readImpl ее ждет. + if (parent.output_queue.size() == 0) + /// Отдаём в основной поток пустой блок, что означает, что данных больше нет. + parent.output_queue.push(OutputData()); + } } private: @@ -313,9 +321,6 @@ private: volatile bool finish; bool all_read; - /// Была вызвана функция readSuffixImpl. - bool finalized; - Logger * log; };