diff --git a/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp index 2f164e5186d..d2b8d65050c 100644 --- a/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp @@ -15,7 +15,7 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction() { std::unique_lock lock(mutex); - segmentator_condvar.wait(lock, [&]{ return status[current_unit_number] == READY_TO_INSERT || is_exception_occured || is_cancelled || executed; }); + segmentator_condvar.wait(lock, [&]{ return status[current_unit_number] == READY_TO_INSERT || is_exception_occured || executed; }); } if (is_exception_occured) @@ -119,7 +119,7 @@ Block ParallelParsingBlockInputStream::readImpl() std::unique_lock lock(mutex); const auto current_number = reader_ticket_number % max_threads_to_use; - reader_condvar.wait(lock, [&](){ return status[current_number] == READY_TO_READ || is_exception_occured || is_cancelled || executed; }); + reader_condvar.wait(lock, [&](){ return status[current_number] == READY_TO_READ || is_exception_occured || executed; }); /// Check for an exception and rethrow it if (is_exception_occured) diff --git a/dbms/src/DataStreams/ParallelParsingBlockInputStream.h b/dbms/src/DataStreams/ParallelParsingBlockInputStream.h index 961c36b7bb3..cf56c12ed09 100644 --- a/dbms/src/DataStreams/ParallelParsingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelParsingBlockInputStream.h @@ -100,6 +100,8 @@ public: if (!is_cancelled.compare_exchange_strong(old_val, true)) return; + executed = true; + for (auto& reader: readers) if (!reader->isCancelled()) reader->cancel(kill); diff --git a/dbms/src/IO/ReadHelpers.h b/dbms/src/IO/ReadHelpers.h index 44941d741d7..e6b3e7e91e3 100644 --- a/dbms/src/IO/ReadHelpers.h +++ b/dbms/src/IO/ReadHelpers.h @@ -914,6 +914,14 @@ void skipToUnescapedNextLineOrEOF(ReadBuffer & buf); /** Returns buffer eof() result. * And saves data if there is no pending data in buffer or it was explicitly asked. + * Why we have to use this strange function? Consider we have begin_pos in the middle of our buffer + * and the cursor in the end of the buffer. When we call eof() it calls next(). + * And this function can fill the buffer with new data, so we will lose the data from previous buffer state. + * @param buf - original buffer to read from. + * memory - where to put data from buf + * used_size - special parameter not to do useless reallocations + * begin_pos - defines from which position we will copy the data. + * forse_saving_buffer_state - allows to explicitly copy all the data from begin_pos to current_position. */ bool eofWithSavingBufferState(ReadBuffer & buf, DB::Memory<> & memory, size_t & used_size, char * & begin_pos, bool force_saving_buffer_state = false);