ClickHouse/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp

134 lines
4.0 KiB
C++
Raw Normal View History

2019-10-01 10:51:17 +00:00
#include <DataStreams/ParallelParsingBlockInputStream.h>
namespace DB
{
void ParallelParsingBlockInputStream::segmentatorThreadFunction()
{
setThreadName("Segmentator");
try
{
while (!is_cancelled && !is_exception_occured)
{
++segmentator_ticket_number;
const auto current_unit_number = segmentator_ticket_number % max_threads_to_use;
{
std::unique_lock lock(mutex);
2019-10-22 18:01:44 +00:00
segmentator_condvar.wait(lock, [&]{ return status[current_unit_number] == READY_TO_INSERT || is_exception_occured || is_cancelled; });
2019-10-01 10:51:17 +00:00
}
if (is_exception_occured)
break;
// Segmentating the original input.
2019-10-22 18:01:44 +00:00
segments[current_unit_number].used_size = 0;
bool has_data = file_segmentation_engine(original_buffer, segments[current_unit_number].memory, segments[current_unit_number].used_size, min_chunk_size);
2019-10-01 10:51:17 +00:00
// Creating buffer from the segment of data.
2019-10-23 10:39:33 +00:00
auto new_buffer = BufferBase::Buffer(segments[current_unit_number].memory.data(),
segments[current_unit_number].memory.data() + segments[current_unit_number].used_size);
2019-10-22 18:01:44 +00:00
buffers[current_unit_number]->buffer().swap(new_buffer);
buffers[current_unit_number]->position() = buffers[current_unit_number]->buffer().begin();
2019-10-01 10:51:17 +00:00
2019-10-02 14:26:15 +00:00
if (!has_data)
2019-10-01 10:51:17 +00:00
{
2019-10-22 18:01:44 +00:00
is_last[current_unit_number] = true;
status[current_unit_number] = READY_TO_PARSE;
2019-10-01 10:51:17 +00:00
scheduleParserThreadForUnitWithNumber(current_unit_number);
break;
}
2019-10-22 18:01:44 +00:00
status[current_unit_number] = READY_TO_PARSE;
2019-10-01 10:51:17 +00:00
scheduleParserThreadForUnitWithNumber(current_unit_number);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2019-10-22 18:01:44 +00:00
void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_number)
2019-10-01 10:51:17 +00:00
{
setThreadName("ChunkParser");
if (is_exception_occured && is_cancelled)
return;
try
{
{
std::unique_lock lock(mutex);
2019-10-22 18:01:44 +00:00
if (is_last[current_unit_number] || buffers[current_unit_number]->position() == nullptr)
2019-10-01 10:51:17 +00:00
{
2019-10-22 18:01:44 +00:00
blocks[current_unit_number].block = Block();
status[current_unit_number] = READY_TO_READ;
2019-10-01 10:51:17 +00:00
reader_condvar.notify_all();
return;
}
}
2019-10-22 18:01:44 +00:00
blocks[current_unit_number].block = readers[current_unit_number]->read();
2019-10-01 10:51:17 +00:00
{
std::lock_guard missing_values_lock(missing_values_mutex);
2019-10-22 18:01:44 +00:00
blocks[current_unit_number].block_missing_values = readers[current_unit_number]->getMissingValues();
2019-10-01 10:51:17 +00:00
}
{
std::unique_lock lock(mutex);
2019-10-22 18:01:44 +00:00
status[current_unit_number] = READY_TO_READ;
2019-10-01 10:51:17 +00:00
reader_condvar.notify_all();
}
}
catch (...)
{
std::unique_lock lock(mutex);
2019-10-22 18:01:44 +00:00
exceptions[current_unit_number] = std::current_exception();
2019-10-01 10:51:17 +00:00
is_exception_occured = true;
reader_condvar.notify_all();
}
}
Block ParallelParsingBlockInputStream::readImpl()
{
Block res;
if (isCancelledOrThrowIfKilled())
return res;
std::unique_lock lock(mutex);
++reader_ticket_number;
const auto unit_number = reader_ticket_number % max_threads_to_use;
2019-10-22 18:01:44 +00:00
reader_condvar.wait(lock, [&](){ return status[unit_number] == READY_TO_READ || is_exception_occured || is_cancelled; });
2019-10-01 10:51:17 +00:00
/// Check for an exception and rethrow it
if (is_exception_occured)
{
segmentator_condvar.notify_all();
lock.unlock();
cancel(false);
rethrowFirstException(exceptions);
}
2019-10-22 18:01:44 +00:00
res = std::move(blocks[unit_number].block);
last_block_missing_values = std::move(blocks[unit_number].block_missing_values);
2019-10-01 10:51:17 +00:00
2019-10-22 18:01:44 +00:00
if (is_last[unit_number])
2019-10-01 10:51:17 +00:00
is_cancelled = true;
else
{
2019-10-22 18:01:44 +00:00
status[unit_number] = READY_TO_INSERT;
2019-10-01 10:51:17 +00:00
segmentator_condvar.notify_all();
}
return res;
}
}