diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 0b4aa65a37b..5502c692865 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -121,6 +121,7 @@ class IColumn; \ M(Bool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \ M(UInt64, min_chunk_bytes_for_parallel_parsing, (10 * 1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \ + M(Bool, output_format_parallel_formatting, false, "Enable parallel formatting for all data formats.", 0) \ \ M(UInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.", 0) \ M(UInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 07103ae1ed3..a7eb4d9df4e 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -234,7 +234,8 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, } - bool parallel_formatting = true; + const Settings & settings = context.getSettingsRef(); + bool parallel_formatting = settings.output_format_parallel_formatting; if (parallel_formatting) { @@ -242,7 +243,6 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, if (!output_getter) throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT); - const Settings & settings = context.getSettingsRef(); FormatSettings format_settings = getOutputFormatSetting(settings, context); /** TODO: Materialization is needed, because formats can use the functions `IDataType`, diff --git a/src/IO/BufferWithOwnMemory.h b/src/IO/BufferWithOwnMemory.h index b21c8cb5d17..396f8a36791 100644 --- a/src/IO/BufferWithOwnMemory.h +++ b/src/IO/BufferWithOwnMemory.h @@ -159,18 +159,25 @@ protected: Memory<> & memory; public: - BufferWithOutsideMemory(Memory<> & memory_) + explicit BufferWithOutsideMemory(Memory<> & memory_) : Base(nullptr, 0), memory(memory_) { Base::set(memory.data(), memory.size()); Base::padded = false; } + + size_t getActualSize() + { + return Base::count(); + } + private: void nextImpl() override final { + std::cout << "nextImpl" << std::endl; const size_t prev_size = memory.size(); memory.resize(2 * prev_size); - Base::set(memory.data(), memory.size()); + Base::set(memory.data() + prev_size, memory.size() - prev_size); } }; diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h index 43c822ebee2..68109f181b5 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h @@ -17,7 +17,7 @@ namespace DB { const size_t min_chunk_bytes_for_parallel_formatting = 1024; -const size_t max_threads_for_parallel_formatting = 4; +const size_t max_threads_for_parallel_formatting = 6; class ParallelFormattingOutputFormat : public IOutputFormat { @@ -38,15 +38,14 @@ public: , pool(max_threads_for_parallel_formatting) { -// std::cout << "ParallelFormattingOutputFormat::constructor" << std::endl; processing_units.resize(max_threads_for_parallel_formatting + 2); - collector_thread = ThreadFromGlobalPool([this] { collectorThreadFunction(); }); + collector_thread = ThreadFromGlobalPool([&] { collectorThreadFunction(); }); } ~ParallelFormattingOutputFormat() override { - std::cout << "ParallelFormattingOutputFormat::destructor" << std::endl; + flush(); finishAndWait(); } @@ -54,45 +53,23 @@ public: void flush() override final { - std::cout << "flush()" << std::endl; - ///FIXME - while(writer_unit_number - collector_unit_number > 1) - { - std::cout << writer_unit_number << ' ' << collector_unit_number << std::endl; - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - + waitForFormattingFinished(); IOutputFormat::flush(); } protected: void consume(Chunk chunk) override final { -// std::cout << StackTrace().toString() << std::endl; - - std::cout << chunk.dumpStructure() << std::endl; - - if (chunk.empty()) - { -// std::cout << "Empty chunk" << std::endl; - formatting_finished = true; - } - - const auto current_unit_number = writer_unit_number % processing_units.size(); auto & unit = processing_units[current_unit_number]; -// std::cout << "Consume " << current_unit_number << " before wait" << std::endl; - { std::unique_lock lock(mutex); writer_condvar.wait(lock, [&]{ return unit.status == READY_TO_INSERT || formatting_finished; }); } -// std::cout << "Consume " << current_unit_number << " after wait" << std::endl; - assert(unit.status == READY_TO_INSERT); unit.chunk = std::move(chunk); @@ -127,6 +104,7 @@ private: std::atomic status; Chunk chunk; Memory<> segment; + size_t actual_memory_size{0}; }; // There are multiple "formatters", that's why we use thread pool. @@ -149,9 +127,6 @@ private: void finishAndWait() { - -// std::cout << "finishAndWait()" << std::endl; -// std::cout << StackTrace().toString() << std::endl; formatting_finished = true; { @@ -176,7 +151,6 @@ private: void onBackgroundException() { -// std::cout << "onBackgroundException" << std::endl; tryLogCurrentException(__PRETTY_FUNCTION__); std::unique_lock lock(mutex); @@ -194,6 +168,21 @@ private: pool.scheduleOrThrowOnError([this, ticket_number] { formatterThreadFunction(ticket_number); }); } + void waitForFormattingFinished() + { + ///FIXME + while(hasChunksToWorkWith()) + { + std::cout << writer_unit_number << ' ' << collector_unit_number << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + + bool hasChunksToWorkWith() + { + return writer_unit_number - collector_unit_number > 0; + } + void collectorThreadFunction() { setThreadName("Collector"); @@ -206,36 +195,21 @@ private: auto &unit = processing_units[current_unit_number]; -// std::cout << "Collector " << current_unit_number << " before wait" << std::endl; - { std::unique_lock lock(mutex); collector_condvar.wait(lock, [&]{ return unit.status == READY_TO_READ || formatting_finished; }); } -// std::cout << "Collector " << current_unit_number << " after wait" << std::endl; - if (formatting_finished) - { - std::cout << "formatting finished" << std::endl; break; - } assert(unit.status == READY_TO_READ); assert(unit.segment.size() > 0); -// for (size_t i=0; i < unit.segment.size(); ++i) -// { -// std::cout << *(unit.segment.data() + i); -// } -// std::cout << std::endl; - - /// Do main work here. - out.write(unit.segment.data(), unit.segment.size()); - - flush(); + out.write(unit.segment.data(), unit.actual_memory_size); + out.sync(); ++collector_unit_number; @@ -257,13 +231,11 @@ private: try { -// std::cout << "Formatter " << current_unit_number << std::endl; - auto & unit = processing_units[current_unit_number]; assert(unit.status = READY_TO_FORMAT); - unit.segment.resize(10 * DBMS_DEFAULT_BUFFER_SIZE); + unit.segment.resize(DBMS_DEFAULT_BUFFER_SIZE); /// TODO: Implement proper nextImpl BufferWithOutsideMemory out_buffer(unit.segment); @@ -271,8 +243,9 @@ private: auto formatter = internal_formatter_creator(out_buffer); formatter->consume(std::move(unit.chunk)); + formatter->flush(); -// std::cout << "Formatter " << current_unit_number << " before notify" << std::endl; + unit.actual_memory_size = out_buffer.getActualSize(); { std::lock_guard lock(mutex); @@ -280,7 +253,6 @@ private: collector_condvar.notify_all(); } -// std::cout << "Formatter " << current_unit_number << " after notify" << std::endl; } catch (...) {