This commit is contained in:
Nikita Mikhaylov 2019-11-11 13:32:25 +03:00
parent 220ccca282
commit 8358b64872
3 changed files with 12 additions and 2 deletions

View File

@ -15,7 +15,7 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
{ {
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
segmentator_condvar.wait(lock, [&]{ return status[current_unit_number] == READY_TO_INSERT || is_exception_occured || is_cancelled || executed; }); segmentator_condvar.wait(lock, [&]{ return status[current_unit_number] == READY_TO_INSERT || is_exception_occured || executed; });
} }
if (is_exception_occured) if (is_exception_occured)
@ -119,7 +119,7 @@ Block ParallelParsingBlockInputStream::readImpl()
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
const auto current_number = reader_ticket_number % max_threads_to_use; const auto current_number = reader_ticket_number % max_threads_to_use;
reader_condvar.wait(lock, [&](){ return status[current_number] == READY_TO_READ || is_exception_occured || is_cancelled || executed; }); reader_condvar.wait(lock, [&](){ return status[current_number] == READY_TO_READ || is_exception_occured || executed; });
/// Check for an exception and rethrow it /// Check for an exception and rethrow it
if (is_exception_occured) if (is_exception_occured)

View File

@ -100,6 +100,8 @@ public:
if (!is_cancelled.compare_exchange_strong(old_val, true)) if (!is_cancelled.compare_exchange_strong(old_val, true))
return; return;
executed = true;
for (auto& reader: readers) for (auto& reader: readers)
if (!reader->isCancelled()) if (!reader->isCancelled())
reader->cancel(kill); reader->cancel(kill);

View File

@ -914,6 +914,14 @@ void skipToUnescapedNextLineOrEOF(ReadBuffer & buf);
/** Returns buffer eof() result. /** Returns buffer eof() result.
* And saves data if there is no pending data in buffer or it was explicitly asked. * And saves data if there is no pending data in buffer or it was explicitly asked.
* Why we have to use this strange function? Consider we have begin_pos in the middle of our buffer
* and the cursor in the end of the buffer. When we call eof() it calls next().
* And this function can fill the buffer with new data, so we will lose the data from previous buffer state.
* @param buf - original buffer to read from.
* memory - where to put data from buf
* used_size - special parameter not to do useless reallocations
* begin_pos - defines from which position we will copy the data.
* forse_saving_buffer_state - allows to explicitly copy all the data from begin_pos to current_position.
*/ */
bool eofWithSavingBufferState(ReadBuffer & buf, DB::Memory<> & memory, size_t & used_size, char * & begin_pos, bool force_saving_buffer_state = false); bool eofWithSavingBufferState(ReadBuffer & buf, DB::Memory<> & memory, size_t & used_size, char * & begin_pos, bool force_saving_buffer_state = false);