From a89d6bc75a1b602f46e84e996444857adaa1c06c Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Tue, 6 Oct 2020 20:49:57 +0300 Subject: [PATCH] comments + fixes for parsing --- .../Impl/ParallelFormattingOutputFormat.h | 85 ++++++++----------- .../Impl/ParallelParsingInputFormat.cpp | 27 ++++-- .../Formats/Impl/ParallelParsingInputFormat.h | 32 ++++--- 3 files changed, 72 insertions(+), 72 deletions(-) diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h index d926ba4e125..431ec00e764 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -20,9 +21,10 @@ namespace DB class ParallelFormattingOutputFormat : public IOutputFormat { public: - /* Used to recreate formatter on every new data piece. */ + /// Used to recreate formatter on every new data piece. using InternalFormatterCreator = std::function; + /// Struct to simplify constructor. struct Params { WriteBuffer & out; @@ -37,14 +39,17 @@ public: , pool(params.max_threads_for_parallel_formatting) { + /// Just heuristic. We need one thread for collecting, one thread for receiving chunks + /// and n threads for formatting. processing_units.resize(params.max_threads_for_parallel_formatting + 2); - collector_thread = ThreadFromGlobalPool([&] { collectorThreadFunction(); }); } ~ParallelFormattingOutputFormat() override { flush(); + if (!IOutputFormat::finalized) + finalize(); finishAndWait(); } @@ -80,11 +85,13 @@ protected: { IOutputFormat::finalized = true; addChunk(Chunk{}, ProcessingUnitType::FINALIZE); + collector_finished.wait(); } private: InternalFormatterCreator internal_formatter_creator; + /// Status to synchronize multiple threads. enum ProcessingUnitStatus { READY_TO_INSERT, @@ -92,7 +99,7 @@ private: READY_TO_READ }; - + /// Some information about what methods to call from internal parser. enum class ProcessingUnitType { START, @@ -104,22 +111,22 @@ private: void addChunk(Chunk chunk, ProcessingUnitType type) { - // std::cout << "AddChunk of size " << chunk.getNumRows() << std::endl; const auto current_unit_number = writer_unit_number % processing_units.size(); - auto & unit = processing_units[current_unit_number]; { std::unique_lock lock(mutex); writer_condvar.wait(lock, - [&]{ return unit.status == READY_TO_INSERT || formatting_finished; }); + [&]{ return unit.status == READY_TO_INSERT || emergency_stop; }); } + if (emergency_stop) + return; + assert(unit.status == READY_TO_INSERT); - unit.chunk = std::move(chunk); - - /// Resize memory without deallocate + /// Resize memory without deallocation. + unit.segment.resize(0); unit.status = READY_TO_FORMAT; unit.type = type; @@ -140,10 +147,9 @@ private: Chunk chunk; Memory<> segment; size_t actual_memory_size{0}; - }; - std::promise finalizator{}; + Poco::Event collector_finished{}; std::atomic_bool need_flush{false}; @@ -154,10 +160,11 @@ private: std::exception_ptr background_exception = nullptr; + /// We use deque, because ProcessingUnit doesn't have move or copy constructor. std::deque processing_units; std::mutex mutex; - std::atomic_bool formatting_finished{false}; + std::atomic_bool emergency_stop{false}; std::atomic_size_t collector_unit_number{0}; std::atomic_size_t writer_unit_number{0}; @@ -167,10 +174,7 @@ private: void finishAndWait() { - std::future future_finalizator = finalizator.get_future(); - future_finalizator.get(); - - formatting_finished = true; + emergency_stop = true; { std::unique_lock lock(mutex); @@ -201,7 +205,7 @@ private: { background_exception = std::current_exception(); } - formatting_finished = true; + emergency_stop = true; writer_condvar.notify_all(); collector_condvar.notify_all(); } @@ -211,43 +215,29 @@ private: pool.scheduleOrThrowOnError([this, ticket_number] { formatterThreadFunction(ticket_number); }); } - void waitForFormattingFinished() - { - ///FIXME - while(hasChunksToWorkWith()) - { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - } - - bool hasChunksToWorkWith() - { - return writer_unit_number - collector_unit_number > 0; - } - void collectorThreadFunction() { setThreadName("Collector"); try { - while (!formatting_finished) + while (!emergency_stop) { const auto current_unit_number = collector_unit_number % processing_units.size(); - - // std::cout << "collecting " << current_unit_number << std::endl; - auto & unit = processing_units[current_unit_number]; { std::unique_lock lock(mutex); collector_condvar.wait(lock, - [&]{ return unit.status == READY_TO_READ; }); + [&]{ return unit.status == READY_TO_READ || emergency_stop; }); } - assert(unit.status == READY_TO_READ); - assert(unit.segment.size() > 0); + if (emergency_stop) + return; + assert(unit.status == READY_TO_READ); + + /// Use this copy to after notification to stop the execution. auto copy_if_unit_type = unit.type; /// Do main work here. @@ -257,19 +247,19 @@ private: IOutputFormat::flush(); ++collector_unit_number; - + { + /// Notify other threads. std::lock_guard lock(mutex); unit.status = READY_TO_INSERT; writer_condvar.notify_all(); } - + /// We can exit only after writing last piece of to out buffer. if (copy_if_unit_type == ProcessingUnitType::FINALIZE) { - finalizator.set_value(true); + collector_finished.set(); break; } - } } catch (...) @@ -286,17 +276,14 @@ private: try { auto & unit = processing_units[current_unit_number]; - assert(unit.status = READY_TO_FORMAT); - unit.segment.resize(1); - + unit.segment.resize(0); + unit.actual_memory_size = 0; BufferWithOutsideMemory out_buffer(unit.segment); auto formatter = internal_formatter_creator(out_buffer); - unit.actual_memory_size = 0; - switch (unit.type) { case ProcessingUnitType::START : @@ -325,7 +312,7 @@ private: break; } } - + /// Flush all the data to handmade buffer. formatter->flush(); unit.actual_memory_size = out_buffer.getActualSize(); @@ -334,13 +321,11 @@ private: unit.status = READY_TO_READ; collector_condvar.notify_all(); } - } catch (...) { onBackgroundException(); } - } }; diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp index 898c103cb24..367b1c748f5 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp @@ -1,11 +1,21 @@ #include #include +#include +#include +#include namespace DB { -void ParallelParsingInputFormat::segmentatorThreadFunction() +void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group) { + SCOPE_EXIT( + if (thread_group) + CurrentThread::detachQueryIfNotDetached(); + ); + if (thread_group) + CurrentThread::attachTo(thread_group); + setThreadName("Segmentator"); try { @@ -21,9 +31,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction() } if (parsing_finished) - { break; - } assert(unit.status == READY_TO_INSERT); @@ -38,9 +46,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction() ++segmentator_ticket_number; if (!have_more_data) - { break; - } } } catch (...) @@ -49,8 +55,15 @@ void ParallelParsingInputFormat::segmentatorThreadFunction() } } -void ParallelParsingInputFormat::parserThreadFunction(size_t current_ticket_number) +void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number) { + SCOPE_EXIT( + if (thread_group) + CurrentThread::detachQueryIfNotDetached(); + ); + if (thread_group) + CurrentThread::attachTo(thread_group); + try { setThreadName("ChunkParser"); @@ -58,8 +71,6 @@ void ParallelParsingInputFormat::parserThreadFunction(size_t current_ticket_numb const auto current_unit_number = current_ticket_number % processing_units.size(); auto & unit = processing_units[current_unit_number]; - assert(unit.segment.size() > 0); - /* * This is kind of suspicious -- the input_process_creator contract with * respect to multithreaded use is not clear, but we hope that it is diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h index 35cf7047d3c..f8a85536158 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h @@ -84,7 +84,8 @@ public: initializePrepareEndUpMap(); - segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); }); + segmentator_thread = ThreadFromGlobalPool( + &ParallelParsingInputFormat::segmentatorThreadFunction, this, CurrentThread::getGroup()); } ~ParallelParsingInputFormat() override @@ -172,14 +173,14 @@ private: }; const InternalParserCreator internal_parser_creator; - // Function to segment the file. Then "parsers" will parse that segments. + /// Function to segment the file. Then "parsers" will parse that segments. FormatFactory::FileSegmentationEngine file_segmentation_engine; const String format_name; const size_t min_chunk_bytes; BlockMissingValues last_block_missing_values; - //Non-atomic because it is used in one thread. + /// Non-atomic because it is used in one thread. std::optional next_block_in_current_unit; size_t segmentator_ticket_number{0}; size_t reader_ticket_number{0}; @@ -190,9 +191,9 @@ private: std::atomic parsing_finished{false}; - // There are multiple "parsers", that's why we use thread pool. + /// There are multiple "parsers", that's why we use thread pool. ThreadPool pool; - // Reading and segmentating the file + /// Reading and segmentating the file ThreadFromGlobalPool segmentator_thread; enum ProcessingUnitStatus @@ -223,14 +224,17 @@ private: std::exception_ptr background_exception = nullptr; - // We use deque instead of vector, because it does not require a move - // constructor, which is absent for atomics that are inside ProcessingUnit. + /// We use deque instead of vector, because it does not require a move + /// constructor, which is absent for atomics that are inside ProcessingUnit. std::deque processing_units; void scheduleParserThreadForUnitWithNumber(size_t ticket_number) { - pool.scheduleOrThrowOnError([this, ticket_number] { parserThreadFunction(ticket_number); }); + pool.scheduleOrThrowOnError([this, ticket_number, group = CurrentThread::getGroup()]() + { + parserThreadFunction(group, ticket_number); + }); } void finishAndWait() @@ -256,13 +260,13 @@ private: } } - void segmentatorThreadFunction(); - void parserThreadFunction(size_t current_ticket_number); + void segmentatorThreadFunction(ThreadGroupStatusPtr thread_group); + void parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number); - // Save/log a background exception, set termination flag, wake up all - // threads. This function is used by segmentator and parsed threads. - // readImpl() is called from the main thread, so the exception handling - // is different. + /// Save/log a background exception, set termination flag, wake up all + /// threads. This function is used by segmentator and parsed threads. + /// readImpl() is called from the main thread, so the exception handling + /// is different. void onBackgroundException(); /// To store objects which will prepare and end up ReadBuffer for each format.