ClickHouse/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp

158 lines
4.8 KiB
C++
Raw Normal View History

2019-10-01 10:51:17 +00:00
#include <DataStreams/ParallelParsingBlockInputStream.h>
2019-11-11 11:20:11 +00:00
#include "ParallelParsingBlockInputStream.h"
2019-10-01 10:51:17 +00:00
namespace DB
{
void ParallelParsingBlockInputStream::segmentatorThreadFunction()
{
setThreadName("Segmentator");
try
{
while (!is_cancelled && !is_exception_occured && !finished)
2019-10-01 10:51:17 +00:00
{
++segmentator_ticket_number;
const auto current_unit_number = segmentator_ticket_number % max_threads_to_use;
2019-11-11 11:20:11 +00:00
auto & unit = processing_units[current_unit_number];
2019-10-01 10:51:17 +00:00
{
std::unique_lock lock(mutex);
segmentator_condvar.wait(lock, [&]{ return unit.status == READY_TO_INSERT || is_exception_occured || finished; });
2019-10-01 10:51:17 +00:00
}
if (is_exception_occured)
break;
2019-10-30 15:49:10 +00:00
// Segmentating the original input.
2019-11-11 11:20:11 +00:00
unit.segment.used_size = 0;
2019-10-30 15:49:10 +00:00
//It returns bool, but it is useless
2019-11-11 11:20:11 +00:00
const auto res = file_segmentation_engine(original_buffer, unit.segment.memory, unit.segment.used_size, min_chunk_size);
2019-10-30 15:49:10 +00:00
if (!res)
2019-10-24 14:00:51 +00:00
{
2019-11-11 11:20:11 +00:00
unit.is_last = true;
unit.status = READY_TO_PARSE;
2019-10-24 14:00:51 +00:00
scheduleParserThreadForUnitWithNumber(current_unit_number);
break;
}
2019-10-01 10:51:17 +00:00
// Creating buffer from the segment of data.
2019-11-11 11:20:11 +00:00
auto new_buffer = BufferBase::Buffer(unit.segment.memory.data(),
unit.segment.memory.data() + unit.segment.used_size);
2019-10-23 10:39:33 +00:00
2019-11-11 11:20:11 +00:00
unit.readbuffer->buffer().swap(new_buffer);
unit.readbuffer->position() = unit.readbuffer->buffer().begin();
2019-10-01 10:51:17 +00:00
2019-11-11 11:20:11 +00:00
unit.parser = std::make_unique<InputStreamFromInputFormat>(
input_processor_creator(*unit.readbuffer, header, context, row_input_format_params, format_settings)
2019-10-24 16:52:55 +00:00
);
2019-10-01 10:51:17 +00:00
2019-11-11 11:20:11 +00:00
unit.status = READY_TO_PARSE;
2019-10-01 10:51:17 +00:00
scheduleParserThreadForUnitWithNumber(current_unit_number);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2019-10-22 18:01:44 +00:00
void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_number)
2019-10-01 10:51:17 +00:00
{
setThreadName("ChunkParser");
if (is_exception_occured && is_cancelled)
return;
2019-11-11 11:20:11 +00:00
auto & unit = processing_units[current_unit_number];
2019-10-01 10:51:17 +00:00
try
{
{
std::unique_lock lock(mutex);
2019-10-22 18:01:44 +00:00
2019-11-11 11:20:11 +00:00
unit.block_ext.block.clear();
unit.block_ext.block_missing_values.clear();
2019-10-23 13:15:03 +00:00
2019-11-11 11:20:11 +00:00
if (unit.is_last || unit.readbuffer->position() == nullptr)
2019-10-01 10:51:17 +00:00
{
2019-11-11 11:20:11 +00:00
unit.block_ext.block.emplace_back(Block());
unit.block_ext.block_missing_values.emplace_back(BlockMissingValues());
unit.status = READY_TO_READ;
2019-10-01 10:51:17 +00:00
reader_condvar.notify_all();
return;
}
}
2019-10-24 16:52:55 +00:00
//We don't know how many blocks will be. So we have to read them all until an empty block occured.
2019-10-23 13:15:03 +00:00
while (true)
2019-10-01 10:51:17 +00:00
{
2019-11-11 11:20:11 +00:00
auto block = unit.parser->read();
2019-10-23 13:15:03 +00:00
if (block == Block())
break;
2019-10-24 14:00:51 +00:00
2019-11-11 11:20:11 +00:00
unit.block_ext.block.emplace_back(block);
unit.block_ext.block_missing_values.emplace_back(unit.parser->getMissingValues());
2019-10-01 10:51:17 +00:00
}
{
std::unique_lock lock(mutex);
2019-11-11 11:20:11 +00:00
unit.status = READY_TO_READ;
2019-10-01 10:51:17 +00:00
reader_condvar.notify_all();
}
}
catch (...)
{
std::unique_lock lock(mutex);
2019-10-24 16:52:55 +00:00
tryLogCurrentException(__PRETTY_FUNCTION__);
2019-10-22 18:01:44 +00:00
exceptions[current_unit_number] = std::current_exception();
2019-10-01 10:51:17 +00:00
is_exception_occured = true;
reader_condvar.notify_all();
}
}
Block ParallelParsingBlockInputStream::readImpl()
{
Block res;
if (isCancelledOrThrowIfKilled() || finished)
2019-10-01 10:51:17 +00:00
return res;
std::unique_lock lock(mutex);
2019-11-11 11:20:11 +00:00
const auto current_unit_number = reader_ticket_number % max_threads_to_use;
auto & unit = processing_units[current_unit_number];
reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || is_exception_occured || finished; });
2019-10-01 10:51:17 +00:00
/// Check for an exception and rethrow it
if (is_exception_occured)
{
lock.unlock();
cancel(false);
rethrowFirstException(exceptions);
}
2019-11-11 11:20:11 +00:00
res = std::move(unit.block_ext.block.at(internal_block_iter));
last_block_missing_values = std::move(unit.block_ext.block_missing_values[internal_block_iter]);
2019-10-01 10:51:17 +00:00
2019-11-11 11:20:11 +00:00
if (++internal_block_iter == unit.block_ext.block.size())
2019-10-01 10:51:17 +00:00
{
2019-11-11 11:20:11 +00:00
if (unit.is_last)
2019-11-01 12:31:48 +00:00
{
2019-11-01 18:59:54 +00:00
//In case that all data was read we don't need to cancel.
finished = true;
2019-11-01 12:31:48 +00:00
return res;
}
2019-10-23 13:15:03 +00:00
internal_block_iter = 0;
++reader_ticket_number;
2019-11-11 11:20:11 +00:00
unit.status = READY_TO_INSERT;
2019-10-01 10:51:17 +00:00
segmentator_condvar.notify_all();
}
2019-10-23 13:15:03 +00:00
2019-10-01 10:51:17 +00:00
return res;
}
}