From 7dbf71cf2307eca7547b4f02c003bd36fc905259 Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Tue, 6 Oct 2020 20:50:50 +0300 Subject: [PATCH] delete stream parsing --- .../ParallelParsingBlockInputStream.cpp | 306 ------------------ .../ParallelParsingBlockInputStream.h | 174 ---------- 2 files changed, 480 deletions(-) delete mode 100644 src/DataStreams/ParallelParsingBlockInputStream.cpp delete mode 100644 src/DataStreams/ParallelParsingBlockInputStream.h diff --git a/src/DataStreams/ParallelParsingBlockInputStream.cpp b/src/DataStreams/ParallelParsingBlockInputStream.cpp deleted file mode 100644 index 8ec1f2bc499..00000000000 --- a/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ /dev/null @@ -1,306 +0,0 @@ -#include -#include -#include -#include -#include - -namespace DB -{ - -ParallelParsingBlockInputStream::ParallelParsingBlockInputStream(const Params & params) - : header(params.input_creator_params.sample), - row_input_format_params(params.input_creator_params.row_input_format_params), - format_settings(params.input_creator_params.settings), - input_processor_creator(params.input_processor_creator), - min_chunk_bytes(params.min_chunk_bytes), - original_buffer(params.read_buffer), - // Subtract one thread that we use for segmentation and one for - // reading. After that, must have at least two threads left for - // parsing. See the assertion below. - pool(std::max(2, static_cast(params.max_threads) - 2)), - file_segmentation_engine(params.file_segmentation_engine) -{ - // See comment above. - assert(params.max_threads >= 4); - - // One unit for each thread, including segmentator and reader, plus a - // couple more units so that the segmentation thread doesn't spuriously - // bump into reader thread on wraparound. - processing_units.resize(params.max_threads + 2); - - segmentator_thread = ThreadFromGlobalPool( - &ParallelParsingBlockInputStream::segmentatorThreadFunction, this, CurrentThread::getGroup()); -} - -ParallelParsingBlockInputStream::~ParallelParsingBlockInputStream() -{ - finishAndWait(); -} - -void ParallelParsingBlockInputStream::cancel(bool kill) -{ - /** - * Can be called multiple times, from different threads. Saturate the - * the kill flag with OR. - */ - if (kill) - is_killed = true; - is_cancelled = true; - - /* - * The format parsers themselves are not being cancelled here, so we'll - * have to wait until they process the current block. Given that the - * chunk size is on the order of megabytes, this shouldn't be too long. - * We can't call IInputFormat->cancel here, because the parser object is - * local to the parser thread, and we don't want to introduce any - * synchronization between parser threads and the other threads to get - * better performance. An ideal solution would be to add a callback to - * IInputFormat that checks whether it was cancelled. - */ - - finishAndWait(); -} - -void ParallelParsingBlockInputStream::scheduleParserThreadForUnitWithNumber(size_t ticket_number) -{ - pool.scheduleOrThrowOnError([this, ticket_number, group = CurrentThread::getGroup()]() - { - parserThreadFunction(group, ticket_number); - }); -} - -void ParallelParsingBlockInputStream::finishAndWait() -{ - finished = true; - - { - std::unique_lock lock(mutex); - segmentator_condvar.notify_all(); - reader_condvar.notify_all(); - } - - if (segmentator_thread.joinable()) - segmentator_thread.join(); - - try - { - pool.wait(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } -} - -void ParallelParsingBlockInputStream::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group) -{ - SCOPE_EXIT( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); - if (thread_group) - CurrentThread::attachTo(thread_group); - - setThreadName("Segmentator"); - - try - { - while (!finished) - { - const auto current_unit_number = segmentator_ticket_number % processing_units.size(); - auto & unit = processing_units[current_unit_number]; - - { - std::unique_lock lock(mutex); - segmentator_condvar.wait(lock, - [&]{ return unit.status == READY_TO_INSERT || finished; }); - } - - if (finished) - { - break; - } - - assert(unit.status == READY_TO_INSERT); - - // Segmentating the original input. - unit.segment.resize(0); - - const bool have_more_data = file_segmentation_engine(original_buffer, - unit.segment, min_chunk_bytes); - - unit.is_last = !have_more_data; - unit.status = READY_TO_PARSE; - scheduleParserThreadForUnitWithNumber(segmentator_ticket_number); - ++segmentator_ticket_number; - - if (!have_more_data) - { - break; - } - } - } - catch (...) - { - onBackgroundException(); - } -} - -void ParallelParsingBlockInputStream::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number) -{ - SCOPE_EXIT( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); - if (thread_group) - CurrentThread::attachTo(thread_group); - - setThreadName("ChunkParser"); - - try - { - const auto current_unit_number = current_ticket_number % processing_units.size(); - auto & unit = processing_units[current_unit_number]; - - /* - * This is kind of suspicious -- the input_process_creator contract with - * respect to multithreaded use is not clear, but we hope that it is - * just a 'normal' factory class that doesn't have any state, and so we - * can use it from multiple threads simultaneously. - */ - ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0); - auto format = input_processor_creator(read_buffer, header, row_input_format_params, format_settings); - auto parser = std::make_unique(std::move(format)); - - unit.block_ext.block.clear(); - unit.block_ext.block_missing_values.clear(); - - // We don't know how many blocks will be. So we have to read them all - // until an empty block occurred. - Block block; - while (!finished && (block = parser->read()) != Block()) - { - unit.block_ext.block.emplace_back(block); - unit.block_ext.block_missing_values.emplace_back(parser->getMissingValues()); - } - - // We suppose we will get at least some blocks for a non-empty buffer, - // except at the end of file. Also see a matching assert in readImpl(). - assert(unit.is_last || !unit.block_ext.block.empty()); - - std::unique_lock lock(mutex); - unit.status = READY_TO_READ; - reader_condvar.notify_all(); - } - catch (...) - { - onBackgroundException(); - } -} - -void ParallelParsingBlockInputStream::onBackgroundException() -{ - tryLogCurrentException(__PRETTY_FUNCTION__); - - std::unique_lock lock(mutex); - if (!background_exception) - { - background_exception = std::current_exception(); - } - finished = true; - reader_condvar.notify_all(); - segmentator_condvar.notify_all(); -} - -Block ParallelParsingBlockInputStream::readImpl() -{ - if (isCancelledOrThrowIfKilled() || finished) - { - /** - * Check for background exception and rethrow it before we return. - */ - std::unique_lock lock(mutex); - if (background_exception) - { - lock.unlock(); - cancel(false); - std::rethrow_exception(background_exception); - } - - return Block{}; - } - - const auto current_unit_number = reader_ticket_number % processing_units.size(); - auto & unit = processing_units[current_unit_number]; - - if (!next_block_in_current_unit.has_value()) - { - // We have read out all the Blocks from the previous Processing Unit, - // wait for the current one to become ready. - std::unique_lock lock(mutex); - reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || finished; }); - - if (finished) - { - /** - * Check for background exception and rethrow it before we return. - */ - if (background_exception) - { - lock.unlock(); - cancel(false); - std::rethrow_exception(background_exception); - } - - return Block{}; - } - - assert(unit.status == READY_TO_READ); - next_block_in_current_unit = 0; - } - - if (unit.block_ext.block.empty()) - { - /* - * Can we get zero blocks for an entire segment, when the format parser - * skips it entire content and does not create any blocks? Probably not, - * but if we ever do, we should add a loop around the above if, to skip - * these. Also see a matching assert in the parser thread. - */ - assert(unit.is_last); - finished = true; - return Block{}; - } - - assert(next_block_in_current_unit.value() < unit.block_ext.block.size()); - - Block res = std::move(unit.block_ext.block.at(*next_block_in_current_unit)); - last_block_missing_values = std::move(unit.block_ext.block_missing_values[*next_block_in_current_unit]); - - next_block_in_current_unit.value() += 1; - - if (*next_block_in_current_unit == unit.block_ext.block.size()) - { - // Finished reading this Processing Unit, move to the next one. - next_block_in_current_unit.reset(); - ++reader_ticket_number; - - if (unit.is_last) - { - // It it was the last unit, we're finished. - finished = true; - } - else - { - // Pass the unit back to the segmentator. - std::unique_lock lock(mutex); - unit.status = READY_TO_INSERT; - segmentator_condvar.notify_all(); - } - } - - return res; -} - - -} diff --git a/src/DataStreams/ParallelParsingBlockInputStream.h b/src/DataStreams/ParallelParsingBlockInputStream.h deleted file mode 100644 index 147c86a4cf7..00000000000 --- a/src/DataStreams/ParallelParsingBlockInputStream.h +++ /dev/null @@ -1,174 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -namespace DB -{ - -class ReadBuffer; - -/** - * ORDER-PRESERVING parallel parsing of data formats. - * It splits original data into chunks. Then each chunk is parsed by different thread. - * The number of chunks equals to the number or parser threads. - * The size of chunk is equal to min_chunk_bytes_for_parallel_parsing setting. - * - * This stream has three kinds of threads: one segmentator, multiple parsers, - * and one reader thread -- that is, the one from which readImpl() is called. - * They operate one after another on parts of data called "processing units". - * One unit consists of buffer with raw data from file, filled by segmentator - * thread. This raw data is then parsed by a parser thread to form a number of - * Blocks. These Blocks are returned to the parent stream from readImpl(). - * After being read out, a processing unit is reused, to save on allocating - * memory for the raw buffer. The processing units are organized into a circular - * array to facilitate reuse and to apply backpressure on the segmentator thread - * -- after it runs out of processing units, it has to wait for the reader to - * read out the previous blocks. - * The outline of what the threads do is as follows: - * segmentator thread: - * 1) wait for the next processing unit to become empty - * 2) fill it with a part of input file - * 3) start a parser thread - * 4) repeat until eof - * parser thread: - * 1) parse the given raw buffer without any synchronization - * 2) signal that the given unit is ready to read - * 3) finish - * readImpl(): - * 1) wait for the next processing unit to become ready to read - * 2) take the blocks from the processing unit to return them to the caller - * 3) signal that the processing unit is empty - * 4) repeat until it encounters unit that is marked as "past_the_end" - * All threads must also check for cancel/eof/exception flags. - */ -class ParallelParsingBlockInputStream : public IBlockInputStream -{ -private: - using ReadCallback = std::function; - - using InputProcessorCreator = std::function; -public: - struct InputCreatorParams - { - const Block & sample; - const RowInputFormatParams & row_input_format_params; - const FormatSettings &settings; - }; - - struct Params - { - ReadBuffer & read_buffer; - const InputProcessorCreator & input_processor_creator; - const InputCreatorParams & input_creator_params; - FormatFactory::FileSegmentationEngine file_segmentation_engine; - size_t max_threads; - size_t min_chunk_bytes; - }; - - explicit ParallelParsingBlockInputStream(const Params & params); - ~ParallelParsingBlockInputStream() override; - - String getName() const override { return "ParallelParsing"; } - Block getHeader() const override { return header; } - - void cancel(bool kill) override; - -protected: - // Reader routine - Block readImpl() override; - - const BlockMissingValues & getMissingValues() const override - { - return last_block_missing_values; - } - -private: - const Block header; - const RowInputFormatParams row_input_format_params; - const FormatSettings format_settings; - const InputProcessorCreator input_processor_creator; - - const size_t min_chunk_bytes; - - /* - * This is declared as atomic to avoid UB, because parser threads access it - * without synchronization. - */ - std::atomic finished{false}; - - BlockMissingValues last_block_missing_values; - - // Original ReadBuffer to read from. - ReadBuffer & original_buffer; - - //Non-atomic because it is used in one thread. - std::optional next_block_in_current_unit; - size_t segmentator_ticket_number{0}; - size_t reader_ticket_number{0}; - - std::mutex mutex; - std::condition_variable reader_condvar; - std::condition_variable segmentator_condvar; - - // There are multiple "parsers", that's why we use thread pool. - ThreadPool pool; - // Reading and segmentating the file - ThreadFromGlobalPool segmentator_thread; - - // Function to segment the file. Then "parsers" will parse that segments. - FormatFactory::FileSegmentationEngine file_segmentation_engine; - - enum ProcessingUnitStatus - { - READY_TO_INSERT, - READY_TO_PARSE, - READY_TO_READ - }; - - struct BlockExt - { - std::vector block; - std::vector block_missing_values; - }; - - struct ProcessingUnit - { - explicit ProcessingUnit() - : status(ProcessingUnitStatus::READY_TO_INSERT) - { - } - - BlockExt block_ext; - Memory<> segment; - std::atomic status; - bool is_last{false}; - }; - - std::exception_ptr background_exception = nullptr; - - // We use deque instead of vector, because it does not require a move - // constructor, which is absent for atomics that are inside ProcessingUnit. - std::deque processing_units; - - void scheduleParserThreadForUnitWithNumber(size_t ticket_number); - void finishAndWait(); - - void segmentatorThreadFunction(ThreadGroupStatusPtr thread_group); - void parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number); - - // Save/log a background exception, set termination flag, wake up all - // threads. This function is used by segmentator and parsed threads. - // readImpl() is called from the main thread, so the exception handling - // is different. - void onBackgroundException(); -}; - -}