executed in destructor

This commit is contained in:
Nikita Mikhaylov 2019-11-06 21:54:34 +03:00
parent 5416914862
commit 220ccca282
2 changed files with 5 additions and 4 deletions

View File

@ -8,14 +8,14 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
setThreadName("Segmentator");
try
{
while (!is_cancelled && !is_exception_occured)
while (!is_cancelled && !is_exception_occured && !executed)
{
++segmentator_ticket_number;
const auto current_unit_number = segmentator_ticket_number % max_threads_to_use;
{
std::unique_lock lock(mutex);
segmentator_condvar.wait(lock, [&]{ return status[current_unit_number] == READY_TO_INSERT || is_exception_occured || is_cancelled; });
segmentator_condvar.wait(lock, [&]{ return status[current_unit_number] == READY_TO_INSERT || is_exception_occured || is_cancelled || executed; });
}
if (is_exception_occured)
@ -119,12 +119,12 @@ Block ParallelParsingBlockInputStream::readImpl()
std::unique_lock lock(mutex);
const auto current_number = reader_ticket_number % max_threads_to_use;
reader_condvar.wait(lock, [&](){ return status[current_number] == READY_TO_READ || is_exception_occured || is_cancelled; });
reader_condvar.wait(lock, [&](){ return status[current_number] == READY_TO_READ || is_exception_occured || is_cancelled || executed; });
/// Check for an exception and rethrow it
if (is_exception_occured)
{
LOG_TRACE(&Poco::Logger::get("ParallelParsingBLockInputStream::readImpl()"), "Exception occured. Will cancel the query.");
//LOG_TRACE(&Poco::Logger::get("ParallelParsingBLockInputStream::readImpl()"), "Exception occured. Will cancel the query.");
lock.unlock();
cancel(false);
rethrowFirstException(exceptions);

View File

@ -87,6 +87,7 @@ public:
~ParallelParsingBlockInputStream() override
{
executed = true;
waitForAllThreads();
}