ClickHouse: fixed UnionBlockInputStream calling readSuffixImpl during a call to readImpl. Better invariant for IBlockInputStream::readSuffix(). [#METR-9007]

This commit is contained in:
Michael Kolupaev 2013-11-25 10:46:25 +00:00
parent d96c682c22
commit 0bc7b868a1
3 changed files with 44 additions and 31 deletions

View File

@ -76,6 +76,12 @@ public:
return false;
}
size_t size()
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
return queue.size();
}
void clear()
{
while (fill_count.tryWait(0))

View File

@ -43,6 +43,8 @@ public:
/** Прочитать что-нибудь перед началом всех данных или после конца всех данных.
* В функции readSuffix можно реализовать финализацию, которая может привести к исключению.
* readPrefix() должна вызываться до первого вызова read().
* readSuffix() должна вызываться после того, как read() вернула пустой блок, или после вызова cancel().
*/
virtual void readPrefix() {}
virtual void readSuffix() {}

View File

@ -37,7 +37,7 @@ class UnionBlockInputStream : public IProfilingBlockInputStream
public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
output_queue(max_threads), exhausted_inputs(0), finish(false), all_read(false), finalized(false),
output_queue(max_threads), exhausted_inputs(0), finish(false), all_read(false),
log(&Logger::get("UnionBlockInputStream"))
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
@ -76,7 +76,13 @@ public:
{
try
{
readSuffixImpl();
if (!all_read)
cancel();
if (!is_cancelled)
{
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
it->thread->join();
}
}
catch (...)
{
@ -92,6 +98,7 @@ public:
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
return;
finish = true;
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&**it))
@ -106,6 +113,22 @@ public:
}
}
}
LOG_TRACE(log, "Waiting for threads to finish");
/// Вынем всё, что есть в очереди готовых данных.
OutputData res;
while (output_queue.tryPop(res))
;
/** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
* PS. Может быть, для переменной finish нужен барьер?
*/
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
it->thread->join();
LOG_TRACE(log, "Waited for threads to finish");
}
protected:
@ -141,29 +164,8 @@ protected:
void readSuffixImpl()
{
if (finalized)
return;
finalized = true;
LOG_TRACE(log, "Waiting for threads to finish");
finish = true;
cancel();
/// Вынем всё, что есть в очереди готовых данных.
OutputData res;
while (output_queue.tryPop(res))
;
/** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
* PS. Может быть, для переменной finish нужен барьер?
*/
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
it->thread->join();
LOG_TRACE(log, "Waited for threads to finish");
if (!all_read && !is_cancelled)
throw Exception("readSuffixImpl called before all data is read", ErrorCodes::LOGICAL_ERROR);
}
private:
@ -262,15 +264,21 @@ private:
/// Если все источники иссякли.
if (parent.exhausted_inputs == parent.children.size())
{
/// Отдаём в основной поток пустой блок, что означает, что данных больше нет.
parent.output_queue.push(OutputData());
parent.finish = true;
break;
}
}
}
}
if (parent.finish)
{
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
/// Не будем оставлять очередь пустой на случай, если readImpl ее ждет.
if (parent.output_queue.size() == 0)
/// Отдаём в основной поток пустой блок, что означает, что данных больше нет.
parent.output_queue.push(OutputData());
}
}
private:
@ -313,9 +321,6 @@ private:
volatile bool finish;
bool all_read;
/// Была вызвана функция readSuffixImpl.
bool finalized;
Logger * log;
};