ClickHouse/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp

201 lines
5.5 KiB
C++
Raw Normal View History

2019-10-01 10:51:17 +00:00
#include <DataStreams/ParallelParsingBlockInputStream.h>
2019-11-11 11:20:11 +00:00
#include "ParallelParsingBlockInputStream.h"
2019-10-01 10:51:17 +00:00
namespace DB
{
void ParallelParsingBlockInputStream::segmentatorThreadFunction()
{
setThreadName("Segmentator");
try
{
2019-11-14 15:53:20 +00:00
while (!finished)
2019-10-01 10:51:17 +00:00
{
2019-11-14 15:53:20 +00:00
const auto current_unit_number = segmentator_ticket_number % processing_units.size();
2019-11-11 11:20:11 +00:00
auto & unit = processing_units[current_unit_number];
2019-10-01 10:51:17 +00:00
{
std::unique_lock lock(mutex);
2019-11-14 15:53:20 +00:00
segmentator_condvar.wait(lock,
[&]{ return unit.status == READY_TO_INSERT || finished; });
2019-10-01 10:51:17 +00:00
}
2019-11-14 15:53:20 +00:00
if (finished)
{
2019-10-01 10:51:17 +00:00
break;
2019-11-14 15:53:20 +00:00
}
assert(unit.status == READY_TO_INSERT);
2019-10-01 10:51:17 +00:00
2019-10-30 15:49:10 +00:00
// Segmentating the original input.
2019-11-18 13:10:14 +00:00
unit.segment.resize(0);
2019-10-30 15:49:10 +00:00
2019-11-14 15:53:20 +00:00
const bool have_more_data = file_segmentation_engine(original_buffer,
2019-11-18 13:10:14 +00:00
unit.segment, min_chunk_size);
2019-10-30 15:49:10 +00:00
2019-10-01 10:51:17 +00:00
// Creating buffer from the segment of data.
2019-11-18 13:10:14 +00:00
auto new_buffer = BufferBase::Buffer(unit.segment.data(),
unit.segment.data() + unit.segment.size());
2019-10-23 10:39:33 +00:00
2019-11-11 11:20:11 +00:00
unit.readbuffer->buffer().swap(new_buffer);
unit.readbuffer->position() = unit.readbuffer->buffer().begin();
2019-10-01 10:51:17 +00:00
2019-11-11 11:20:11 +00:00
unit.parser = std::make_unique<InputStreamFromInputFormat>(
input_processor_creator(*unit.readbuffer, header, context, row_input_format_params, format_settings)
2019-10-24 16:52:55 +00:00
);
2019-10-01 10:51:17 +00:00
if (!have_more_data)
{
unit.is_last = true;
}
2019-11-11 11:20:11 +00:00
unit.status = READY_TO_PARSE;
2019-10-01 10:51:17 +00:00
scheduleParserThreadForUnitWithNumber(current_unit_number);
2019-11-14 15:53:20 +00:00
++segmentator_ticket_number;
if (!have_more_data)
{
break;
}
2019-10-01 10:51:17 +00:00
}
}
catch (...)
{
2019-11-14 15:53:20 +00:00
onBackgroundException();
2019-10-01 10:51:17 +00:00
}
}
2019-10-22 18:01:44 +00:00
void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_number)
2019-10-01 10:51:17 +00:00
{
try
{
2019-11-14 15:53:20 +00:00
setThreadName("ChunkParser");
2019-10-23 13:15:03 +00:00
2019-11-14 15:53:20 +00:00
auto & unit = processing_units[current_unit_number];
2019-10-01 10:51:17 +00:00
2019-11-14 15:53:20 +00:00
unit.block_ext.block.clear();
unit.block_ext.block_missing_values.clear();
2019-10-01 10:51:17 +00:00
2019-11-14 15:53:20 +00:00
// We don't know how many blocks will be. So we have to read them all
// until an empty block occured.
Block block;
while (!finished && (block = unit.parser->read()) != Block())
2019-10-01 10:51:17 +00:00
{
2019-11-11 11:20:11 +00:00
unit.block_ext.block.emplace_back(block);
unit.block_ext.block_missing_values.emplace_back(unit.parser->getMissingValues());
2019-10-01 10:51:17 +00:00
}
2019-11-14 15:53:20 +00:00
if (!finished)
2019-10-01 10:51:17 +00:00
{
std::unique_lock lock(mutex);
2019-11-11 11:20:11 +00:00
unit.status = READY_TO_READ;
2019-10-01 10:51:17 +00:00
reader_condvar.notify_all();
}
}
catch (...)
{
2019-11-14 15:53:20 +00:00
onBackgroundException();
2019-10-01 10:51:17 +00:00
}
}
2019-11-14 15:53:20 +00:00
void ParallelParsingBlockInputStream::onBackgroundException()
2019-10-01 10:51:17 +00:00
{
2019-11-14 15:53:20 +00:00
tryLogCurrentException(__PRETTY_FUNCTION__);
2019-10-01 10:51:17 +00:00
std::unique_lock lock(mutex);
2019-11-14 15:53:20 +00:00
if (!background_exception)
{
background_exception = std::current_exception();
}
finished = true;
reader_condvar.notify_all();
segmentator_condvar.notify_all();
}
2019-10-01 10:51:17 +00:00
2019-11-14 15:53:20 +00:00
Block ParallelParsingBlockInputStream::readImpl()
{
if (isCancelledOrThrowIfKilled() || finished)
2019-10-01 10:51:17 +00:00
{
2019-11-14 15:53:20 +00:00
/**
* Check for background exception and rethrow it before we return.
*/
std::unique_lock lock(mutex);
if (background_exception)
{
lock.unlock();
cancel(false);
std::rethrow_exception(background_exception);
}
return Block{};
2019-10-01 10:51:17 +00:00
}
2019-11-14 15:53:20 +00:00
const auto current_unit_number = reader_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
2019-10-01 10:51:17 +00:00
2019-11-14 15:53:20 +00:00
if (!next_block_in_current_unit.has_value())
2019-10-01 10:51:17 +00:00
{
// We have read out all the Blocks from the previous Processing Unit,
// wait for the current one to become ready.
2019-11-14 15:53:20 +00:00
std::unique_lock lock(mutex);
reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || finished; });
if (finished)
2019-11-01 12:31:48 +00:00
{
2019-11-14 15:53:20 +00:00
/**
* Check for background exception and rethrow it before we return.
*/
if (background_exception)
{
lock.unlock();
cancel(false);
std::rethrow_exception(background_exception);
}
return Block{};
2019-11-01 12:31:48 +00:00
}
2019-11-14 15:53:20 +00:00
assert(unit.status == READY_TO_READ);
next_block_in_current_unit = 0;
}
if (unit.block_ext.block.size() == 0)
{
assert(unit.is_last);
finished = true;
return Block{};
}
assert(next_block_in_current_unit.value() < unit.block_ext.block.size());
2019-11-14 15:53:20 +00:00
Block res = std::move(unit.block_ext.block.at(*next_block_in_current_unit));
last_block_missing_values = std::move(unit.block_ext.block_missing_values[*next_block_in_current_unit]);
next_block_in_current_unit.value() += 1;
2019-11-14 15:53:20 +00:00
if (next_block_in_current_unit.value() == unit.block_ext.block.size())
2019-11-14 15:53:20 +00:00
{
// Finished reading this Processing Unit, move to the next one.
2019-11-14 15:53:20 +00:00
next_block_in_current_unit.reset();
2019-10-23 13:15:03 +00:00
++reader_ticket_number;
2019-11-14 15:53:20 +00:00
if (unit.is_last)
{
// It it was the last unit, we're finished.
finished = true;
}
else
{
// Pass the unit back to the segmentator.
std::unique_lock lock(mutex);
unit.status = READY_TO_INSERT;
segmentator_condvar.notify_all();
}
2019-10-01 10:51:17 +00:00
}
2019-10-23 13:15:03 +00:00
2019-10-01 10:51:17 +00:00
return res;
}
2019-11-14 15:53:20 +00:00
2019-10-01 10:51:17 +00:00
}