Revert "Simplification [#CLICKHOUSE-2]."

This reverts commit 98ad6a5db3.
This commit is contained in:
proller 2017-09-13 21:09:21 +03:00 committed by alexey-milovidov
parent e5cf3ba5b4
commit a43b9ec398

View File

@ -57,7 +57,10 @@ public:
if (started)
{
pool.wait();
if (exception)
std::rethrow_exception(exception);
children.back()->readSuffix();
started = false;
}
}
@ -79,23 +82,18 @@ public:
~AsynchronousBlockInputStream() override
{
try
{
pool.wait(); /// It's ok to call wait even if there is no active threads.
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (started)
pool.wait();
}
protected:
ThreadPool pool{1}; /// Rethrows exceptions automatically on wait.
ThreadPool pool{1};
Poco::Event ready;
bool started = false;
bool first = true;
Block block;
std::exception_ptr exception;
Block readImpl() override
@ -103,12 +101,15 @@ protected:
/// If there were no calculations yet, calculate the first block synchronously
if (!started)
{
started = true;
calculate(current_memory_tracker);
started = true;
}
else /// If the calculations are already in progress - wait for the result
pool.wait();
if (exception)
std::rethrow_exception(exception);
Block res = block;
if (!res)
return res;
@ -133,15 +134,22 @@ protected:
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
if (first)
try
{
first = false;
setThreadName("AsyncBlockInput");
current_memory_tracker = memory_tracker;
children.back()->readPrefix();
}
if (first)
{
first = false;
setThreadName("AsyncBlockInput");
current_memory_tracker = memory_tracker;
children.back()->readPrefix();
}
block = children.back()->read();
block = children.back()->read();
}
catch (...)
{
exception = std::current_exception();
}
ready.set();
}