mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-28 20:50:49 +00:00
delete stream parsing
This commit is contained in:
parent
a89d6bc75a
commit
7dbf71cf23
@ -1,306 +0,0 @@
|
|||||||
#include <DataStreams/ParallelParsingBlockInputStream.h>
|
|
||||||
#include <IO/ReadBuffer.h>
|
|
||||||
#include <Common/CurrentThread.h>
|
|
||||||
#include <Common/setThreadName.h>
|
|
||||||
#include <ext/scope_guard.h>
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
ParallelParsingBlockInputStream::ParallelParsingBlockInputStream(const Params & params)
|
|
||||||
: header(params.input_creator_params.sample),
|
|
||||||
row_input_format_params(params.input_creator_params.row_input_format_params),
|
|
||||||
format_settings(params.input_creator_params.settings),
|
|
||||||
input_processor_creator(params.input_processor_creator),
|
|
||||||
min_chunk_bytes(params.min_chunk_bytes),
|
|
||||||
original_buffer(params.read_buffer),
|
|
||||||
// Subtract one thread that we use for segmentation and one for
|
|
||||||
// reading. After that, must have at least two threads left for
|
|
||||||
// parsing. See the assertion below.
|
|
||||||
pool(std::max(2, static_cast<int>(params.max_threads) - 2)),
|
|
||||||
file_segmentation_engine(params.file_segmentation_engine)
|
|
||||||
{
|
|
||||||
// See comment above.
|
|
||||||
assert(params.max_threads >= 4);
|
|
||||||
|
|
||||||
// One unit for each thread, including segmentator and reader, plus a
|
|
||||||
// couple more units so that the segmentation thread doesn't spuriously
|
|
||||||
// bump into reader thread on wraparound.
|
|
||||||
processing_units.resize(params.max_threads + 2);
|
|
||||||
|
|
||||||
segmentator_thread = ThreadFromGlobalPool(
|
|
||||||
&ParallelParsingBlockInputStream::segmentatorThreadFunction, this, CurrentThread::getGroup());
|
|
||||||
}
|
|
||||||
|
|
||||||
ParallelParsingBlockInputStream::~ParallelParsingBlockInputStream()
|
|
||||||
{
|
|
||||||
finishAndWait();
|
|
||||||
}
|
|
||||||
|
|
||||||
void ParallelParsingBlockInputStream::cancel(bool kill)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Can be called multiple times, from different threads. Saturate the
|
|
||||||
* the kill flag with OR.
|
|
||||||
*/
|
|
||||||
if (kill)
|
|
||||||
is_killed = true;
|
|
||||||
is_cancelled = true;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* The format parsers themselves are not being cancelled here, so we'll
|
|
||||||
* have to wait until they process the current block. Given that the
|
|
||||||
* chunk size is on the order of megabytes, this shouldn't be too long.
|
|
||||||
* We can't call IInputFormat->cancel here, because the parser object is
|
|
||||||
* local to the parser thread, and we don't want to introduce any
|
|
||||||
* synchronization between parser threads and the other threads to get
|
|
||||||
* better performance. An ideal solution would be to add a callback to
|
|
||||||
* IInputFormat that checks whether it was cancelled.
|
|
||||||
*/
|
|
||||||
|
|
||||||
finishAndWait();
|
|
||||||
}
|
|
||||||
|
|
||||||
void ParallelParsingBlockInputStream::scheduleParserThreadForUnitWithNumber(size_t ticket_number)
|
|
||||||
{
|
|
||||||
pool.scheduleOrThrowOnError([this, ticket_number, group = CurrentThread::getGroup()]()
|
|
||||||
{
|
|
||||||
parserThreadFunction(group, ticket_number);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void ParallelParsingBlockInputStream::finishAndWait()
|
|
||||||
{
|
|
||||||
finished = true;
|
|
||||||
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lock(mutex);
|
|
||||||
segmentator_condvar.notify_all();
|
|
||||||
reader_condvar.notify_all();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (segmentator_thread.joinable())
|
|
||||||
segmentator_thread.join();
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
pool.wait();
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void ParallelParsingBlockInputStream::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group)
|
|
||||||
{
|
|
||||||
SCOPE_EXIT(
|
|
||||||
if (thread_group)
|
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
|
||||||
);
|
|
||||||
if (thread_group)
|
|
||||||
CurrentThread::attachTo(thread_group);
|
|
||||||
|
|
||||||
setThreadName("Segmentator");
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
while (!finished)
|
|
||||||
{
|
|
||||||
const auto current_unit_number = segmentator_ticket_number % processing_units.size();
|
|
||||||
auto & unit = processing_units[current_unit_number];
|
|
||||||
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lock(mutex);
|
|
||||||
segmentator_condvar.wait(lock,
|
|
||||||
[&]{ return unit.status == READY_TO_INSERT || finished; });
|
|
||||||
}
|
|
||||||
|
|
||||||
if (finished)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(unit.status == READY_TO_INSERT);
|
|
||||||
|
|
||||||
// Segmentating the original input.
|
|
||||||
unit.segment.resize(0);
|
|
||||||
|
|
||||||
const bool have_more_data = file_segmentation_engine(original_buffer,
|
|
||||||
unit.segment, min_chunk_bytes);
|
|
||||||
|
|
||||||
unit.is_last = !have_more_data;
|
|
||||||
unit.status = READY_TO_PARSE;
|
|
||||||
scheduleParserThreadForUnitWithNumber(segmentator_ticket_number);
|
|
||||||
++segmentator_ticket_number;
|
|
||||||
|
|
||||||
if (!have_more_data)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
onBackgroundException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void ParallelParsingBlockInputStream::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
|
|
||||||
{
|
|
||||||
SCOPE_EXIT(
|
|
||||||
if (thread_group)
|
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
|
||||||
);
|
|
||||||
if (thread_group)
|
|
||||||
CurrentThread::attachTo(thread_group);
|
|
||||||
|
|
||||||
setThreadName("ChunkParser");
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
const auto current_unit_number = current_ticket_number % processing_units.size();
|
|
||||||
auto & unit = processing_units[current_unit_number];
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This is kind of suspicious -- the input_process_creator contract with
|
|
||||||
* respect to multithreaded use is not clear, but we hope that it is
|
|
||||||
* just a 'normal' factory class that doesn't have any state, and so we
|
|
||||||
* can use it from multiple threads simultaneously.
|
|
||||||
*/
|
|
||||||
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
|
|
||||||
auto format = input_processor_creator(read_buffer, header, row_input_format_params, format_settings);
|
|
||||||
auto parser = std::make_unique<InputStreamFromInputFormat>(std::move(format));
|
|
||||||
|
|
||||||
unit.block_ext.block.clear();
|
|
||||||
unit.block_ext.block_missing_values.clear();
|
|
||||||
|
|
||||||
// We don't know how many blocks will be. So we have to read them all
|
|
||||||
// until an empty block occurred.
|
|
||||||
Block block;
|
|
||||||
while (!finished && (block = parser->read()) != Block())
|
|
||||||
{
|
|
||||||
unit.block_ext.block.emplace_back(block);
|
|
||||||
unit.block_ext.block_missing_values.emplace_back(parser->getMissingValues());
|
|
||||||
}
|
|
||||||
|
|
||||||
// We suppose we will get at least some blocks for a non-empty buffer,
|
|
||||||
// except at the end of file. Also see a matching assert in readImpl().
|
|
||||||
assert(unit.is_last || !unit.block_ext.block.empty());
|
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(mutex);
|
|
||||||
unit.status = READY_TO_READ;
|
|
||||||
reader_condvar.notify_all();
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
onBackgroundException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void ParallelParsingBlockInputStream::onBackgroundException()
|
|
||||||
{
|
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(mutex);
|
|
||||||
if (!background_exception)
|
|
||||||
{
|
|
||||||
background_exception = std::current_exception();
|
|
||||||
}
|
|
||||||
finished = true;
|
|
||||||
reader_condvar.notify_all();
|
|
||||||
segmentator_condvar.notify_all();
|
|
||||||
}
|
|
||||||
|
|
||||||
Block ParallelParsingBlockInputStream::readImpl()
|
|
||||||
{
|
|
||||||
if (isCancelledOrThrowIfKilled() || finished)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Check for background exception and rethrow it before we return.
|
|
||||||
*/
|
|
||||||
std::unique_lock<std::mutex> lock(mutex);
|
|
||||||
if (background_exception)
|
|
||||||
{
|
|
||||||
lock.unlock();
|
|
||||||
cancel(false);
|
|
||||||
std::rethrow_exception(background_exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
return Block{};
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto current_unit_number = reader_ticket_number % processing_units.size();
|
|
||||||
auto & unit = processing_units[current_unit_number];
|
|
||||||
|
|
||||||
if (!next_block_in_current_unit.has_value())
|
|
||||||
{
|
|
||||||
// We have read out all the Blocks from the previous Processing Unit,
|
|
||||||
// wait for the current one to become ready.
|
|
||||||
std::unique_lock<std::mutex> lock(mutex);
|
|
||||||
reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || finished; });
|
|
||||||
|
|
||||||
if (finished)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Check for background exception and rethrow it before we return.
|
|
||||||
*/
|
|
||||||
if (background_exception)
|
|
||||||
{
|
|
||||||
lock.unlock();
|
|
||||||
cancel(false);
|
|
||||||
std::rethrow_exception(background_exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
return Block{};
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(unit.status == READY_TO_READ);
|
|
||||||
next_block_in_current_unit = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (unit.block_ext.block.empty())
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Can we get zero blocks for an entire segment, when the format parser
|
|
||||||
* skips it entire content and does not create any blocks? Probably not,
|
|
||||||
* but if we ever do, we should add a loop around the above if, to skip
|
|
||||||
* these. Also see a matching assert in the parser thread.
|
|
||||||
*/
|
|
||||||
assert(unit.is_last);
|
|
||||||
finished = true;
|
|
||||||
return Block{};
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(next_block_in_current_unit.value() < unit.block_ext.block.size());
|
|
||||||
|
|
||||||
Block res = std::move(unit.block_ext.block.at(*next_block_in_current_unit));
|
|
||||||
last_block_missing_values = std::move(unit.block_ext.block_missing_values[*next_block_in_current_unit]);
|
|
||||||
|
|
||||||
next_block_in_current_unit.value() += 1;
|
|
||||||
|
|
||||||
if (*next_block_in_current_unit == unit.block_ext.block.size())
|
|
||||||
{
|
|
||||||
// Finished reading this Processing Unit, move to the next one.
|
|
||||||
next_block_in_current_unit.reset();
|
|
||||||
++reader_ticket_number;
|
|
||||||
|
|
||||||
if (unit.is_last)
|
|
||||||
{
|
|
||||||
// It it was the last unit, we're finished.
|
|
||||||
finished = true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// Pass the unit back to the segmentator.
|
|
||||||
std::unique_lock<std::mutex> lock(mutex);
|
|
||||||
unit.status = READY_TO_INSERT;
|
|
||||||
segmentator_condvar.notify_all();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
@ -1,174 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <DataStreams/IBlockInputStream.h>
|
|
||||||
#include <Formats/FormatFactory.h>
|
|
||||||
#include <Common/ThreadPool.h>
|
|
||||||
#include <Processors/Formats/IRowInputFormat.h>
|
|
||||||
#include <Processors/Formats/InputStreamFromInputFormat.h>
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
class ReadBuffer;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* ORDER-PRESERVING parallel parsing of data formats.
|
|
||||||
* It splits original data into chunks. Then each chunk is parsed by different thread.
|
|
||||||
* The number of chunks equals to the number or parser threads.
|
|
||||||
* The size of chunk is equal to min_chunk_bytes_for_parallel_parsing setting.
|
|
||||||
*
|
|
||||||
* This stream has three kinds of threads: one segmentator, multiple parsers,
|
|
||||||
* and one reader thread -- that is, the one from which readImpl() is called.
|
|
||||||
* They operate one after another on parts of data called "processing units".
|
|
||||||
* One unit consists of buffer with raw data from file, filled by segmentator
|
|
||||||
* thread. This raw data is then parsed by a parser thread to form a number of
|
|
||||||
* Blocks. These Blocks are returned to the parent stream from readImpl().
|
|
||||||
* After being read out, a processing unit is reused, to save on allocating
|
|
||||||
* memory for the raw buffer. The processing units are organized into a circular
|
|
||||||
* array to facilitate reuse and to apply backpressure on the segmentator thread
|
|
||||||
* -- after it runs out of processing units, it has to wait for the reader to
|
|
||||||
* read out the previous blocks.
|
|
||||||
* The outline of what the threads do is as follows:
|
|
||||||
* segmentator thread:
|
|
||||||
* 1) wait for the next processing unit to become empty
|
|
||||||
* 2) fill it with a part of input file
|
|
||||||
* 3) start a parser thread
|
|
||||||
* 4) repeat until eof
|
|
||||||
* parser thread:
|
|
||||||
* 1) parse the given raw buffer without any synchronization
|
|
||||||
* 2) signal that the given unit is ready to read
|
|
||||||
* 3) finish
|
|
||||||
* readImpl():
|
|
||||||
* 1) wait for the next processing unit to become ready to read
|
|
||||||
* 2) take the blocks from the processing unit to return them to the caller
|
|
||||||
* 3) signal that the processing unit is empty
|
|
||||||
* 4) repeat until it encounters unit that is marked as "past_the_end"
|
|
||||||
* All threads must also check for cancel/eof/exception flags.
|
|
||||||
*/
|
|
||||||
class ParallelParsingBlockInputStream : public IBlockInputStream
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
using ReadCallback = std::function<void()>;
|
|
||||||
|
|
||||||
using InputProcessorCreator = std::function<InputFormatPtr(
|
|
||||||
ReadBuffer & buf,
|
|
||||||
const Block & header,
|
|
||||||
const RowInputFormatParams & params,
|
|
||||||
const FormatSettings & settings)>;
|
|
||||||
public:
|
|
||||||
struct InputCreatorParams
|
|
||||||
{
|
|
||||||
const Block & sample;
|
|
||||||
const RowInputFormatParams & row_input_format_params;
|
|
||||||
const FormatSettings &settings;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct Params
|
|
||||||
{
|
|
||||||
ReadBuffer & read_buffer;
|
|
||||||
const InputProcessorCreator & input_processor_creator;
|
|
||||||
const InputCreatorParams & input_creator_params;
|
|
||||||
FormatFactory::FileSegmentationEngine file_segmentation_engine;
|
|
||||||
size_t max_threads;
|
|
||||||
size_t min_chunk_bytes;
|
|
||||||
};
|
|
||||||
|
|
||||||
explicit ParallelParsingBlockInputStream(const Params & params);
|
|
||||||
~ParallelParsingBlockInputStream() override;
|
|
||||||
|
|
||||||
String getName() const override { return "ParallelParsing"; }
|
|
||||||
Block getHeader() const override { return header; }
|
|
||||||
|
|
||||||
void cancel(bool kill) override;
|
|
||||||
|
|
||||||
protected:
|
|
||||||
// Reader routine
|
|
||||||
Block readImpl() override;
|
|
||||||
|
|
||||||
const BlockMissingValues & getMissingValues() const override
|
|
||||||
{
|
|
||||||
return last_block_missing_values;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
const Block header;
|
|
||||||
const RowInputFormatParams row_input_format_params;
|
|
||||||
const FormatSettings format_settings;
|
|
||||||
const InputProcessorCreator input_processor_creator;
|
|
||||||
|
|
||||||
const size_t min_chunk_bytes;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This is declared as atomic to avoid UB, because parser threads access it
|
|
||||||
* without synchronization.
|
|
||||||
*/
|
|
||||||
std::atomic<bool> finished{false};
|
|
||||||
|
|
||||||
BlockMissingValues last_block_missing_values;
|
|
||||||
|
|
||||||
// Original ReadBuffer to read from.
|
|
||||||
ReadBuffer & original_buffer;
|
|
||||||
|
|
||||||
//Non-atomic because it is used in one thread.
|
|
||||||
std::optional<size_t> next_block_in_current_unit;
|
|
||||||
size_t segmentator_ticket_number{0};
|
|
||||||
size_t reader_ticket_number{0};
|
|
||||||
|
|
||||||
std::mutex mutex;
|
|
||||||
std::condition_variable reader_condvar;
|
|
||||||
std::condition_variable segmentator_condvar;
|
|
||||||
|
|
||||||
// There are multiple "parsers", that's why we use thread pool.
|
|
||||||
ThreadPool pool;
|
|
||||||
// Reading and segmentating the file
|
|
||||||
ThreadFromGlobalPool segmentator_thread;
|
|
||||||
|
|
||||||
// Function to segment the file. Then "parsers" will parse that segments.
|
|
||||||
FormatFactory::FileSegmentationEngine file_segmentation_engine;
|
|
||||||
|
|
||||||
enum ProcessingUnitStatus
|
|
||||||
{
|
|
||||||
READY_TO_INSERT,
|
|
||||||
READY_TO_PARSE,
|
|
||||||
READY_TO_READ
|
|
||||||
};
|
|
||||||
|
|
||||||
struct BlockExt
|
|
||||||
{
|
|
||||||
std::vector<Block> block;
|
|
||||||
std::vector<BlockMissingValues> block_missing_values;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct ProcessingUnit
|
|
||||||
{
|
|
||||||
explicit ProcessingUnit()
|
|
||||||
: status(ProcessingUnitStatus::READY_TO_INSERT)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
BlockExt block_ext;
|
|
||||||
Memory<> segment;
|
|
||||||
std::atomic<ProcessingUnitStatus> status;
|
|
||||||
bool is_last{false};
|
|
||||||
};
|
|
||||||
|
|
||||||
std::exception_ptr background_exception = nullptr;
|
|
||||||
|
|
||||||
// We use deque instead of vector, because it does not require a move
|
|
||||||
// constructor, which is absent for atomics that are inside ProcessingUnit.
|
|
||||||
std::deque<ProcessingUnit> processing_units;
|
|
||||||
|
|
||||||
void scheduleParserThreadForUnitWithNumber(size_t ticket_number);
|
|
||||||
void finishAndWait();
|
|
||||||
|
|
||||||
void segmentatorThreadFunction(ThreadGroupStatusPtr thread_group);
|
|
||||||
void parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number);
|
|
||||||
|
|
||||||
// Save/log a background exception, set termination flag, wake up all
|
|
||||||
// threads. This function is used by segmentator and parsed threads.
|
|
||||||
// readImpl() is called from the main thread, so the exception handling
|
|
||||||
// is different.
|
|
||||||
void onBackgroundException();
|
|
||||||
};
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user