From 220ccca2820658bddf4a57b7a918ef9478211be1 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 6 Nov 2019 21:54:34 +0300 Subject: [PATCH] executed in destructor --- dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp | 8 ++++---- dbms/src/DataStreams/ParallelParsingBlockInputStream.h | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp index 02b0c9d47fa..2f164e5186d 100644 --- a/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp @@ -8,14 +8,14 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction() setThreadName("Segmentator"); try { - while (!is_cancelled && !is_exception_occured) + while (!is_cancelled && !is_exception_occured && !executed) { ++segmentator_ticket_number; const auto current_unit_number = segmentator_ticket_number % max_threads_to_use; { std::unique_lock lock(mutex); - segmentator_condvar.wait(lock, [&]{ return status[current_unit_number] == READY_TO_INSERT || is_exception_occured || is_cancelled; }); + segmentator_condvar.wait(lock, [&]{ return status[current_unit_number] == READY_TO_INSERT || is_exception_occured || is_cancelled || executed; }); } if (is_exception_occured) @@ -119,12 +119,12 @@ Block ParallelParsingBlockInputStream::readImpl() std::unique_lock lock(mutex); const auto current_number = reader_ticket_number % max_threads_to_use; - reader_condvar.wait(lock, [&](){ return status[current_number] == READY_TO_READ || is_exception_occured || is_cancelled; }); + reader_condvar.wait(lock, [&](){ return status[current_number] == READY_TO_READ || is_exception_occured || is_cancelled || executed; }); /// Check for an exception and rethrow it if (is_exception_occured) { - LOG_TRACE(&Poco::Logger::get("ParallelParsingBLockInputStream::readImpl()"), "Exception occured. Will cancel the query."); + //LOG_TRACE(&Poco::Logger::get("ParallelParsingBLockInputStream::readImpl()"), "Exception occured. Will cancel the query."); lock.unlock(); cancel(false); rethrowFirstException(exceptions); diff --git a/dbms/src/DataStreams/ParallelParsingBlockInputStream.h b/dbms/src/DataStreams/ParallelParsingBlockInputStream.h index 96d459aa2fe..961c36b7bb3 100644 --- a/dbms/src/DataStreams/ParallelParsingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelParsingBlockInputStream.h @@ -87,6 +87,7 @@ public: ~ParallelParsingBlockInputStream() override { + executed = true; waitForAllThreads(); }