diff --git a/src/Common/Arena.h b/src/Common/Arena.h index 2a05a257381..92de4e98ca5 100644 --- a/src/Common/Arena.h +++ b/src/Common/Arena.h @@ -15,7 +15,7 @@ namespace ProfileEvents { - extern const Event ArenaAllocMemoryChunks; + extern const Event ArenaAllocChunks; extern const Event ArenaAllocBytes; } @@ -45,18 +45,16 @@ private: char * end; /// does not include padding. MemoryChunk * prev; - MemoryChunk * next; MemoryChunk(size_t size_, MemoryChunk * prev_) { - ProfileEvents::increment(ProfileEvents::ArenaAllocMemoryChunks); + ProfileEvents::increment(ProfileEvents::ArenaAllocChunks); ProfileEvents::increment(ProfileEvents::ArenaAllocBytes, size_); begin = reinterpret_cast(Allocator::alloc(size_)); pos = begin; end = begin + size_ - pad_right; prev = prev_; - prev->next = this; ASAN_POISON_MEMORY_REGION(begin, size_); } diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 6fde6bee430..450f0463d6c 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -17,6 +16,7 @@ #include #include #include +#include #include #if !defined(ARCADIA_BUILD) @@ -233,10 +233,37 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, sample); } - auto format = getOutputFormat(name, buf, sample, context, std::move(callback), - format_settings); - return std::make_shared( - std::make_shared(format), sample); + + bool parallel_formatting = true; + + if (parallel_formatting && name == "CSV") + { + const auto & output_getter = getCreators(name).output_processor_creator; + if (!output_getter) + throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT); + + const Settings & settings = context.getSettingsRef(); + FormatSettings format_settings = getOutputFormatSetting(settings, context); + + /** TODO: Materialization is needed, because formats can use the functions `IDataType`, + * which only work with full columns. + */ + auto formatter_creator = [output_getter, sample, callback, format_settings] + (WriteBuffer & output) -> OutputFormatPtr + { return output_getter(output, sample, std::move(callback), format_settings);}; + + /// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query. +// if (format_settings.enable_streaming) +// format->setAutoFlush(); + + ParallelFormattingOutputFormat::Params params{buf, sample, formatter_creator}; + auto format = std::make_shared(params); + return std::make_shared(std::make_shared(format), sample); + } + + + auto format = getOutputFormat(name, buf, sample, context, std::move(callback)); + return std::make_shared(std::make_shared(format), sample); } diff --git a/src/IO/BufferWithOwnMemory.h b/src/IO/BufferWithOwnMemory.h index 713c9b12cd1..b21c8cb5d17 100644 --- a/src/IO/BufferWithOwnMemory.h +++ b/src/IO/BufferWithOwnMemory.h @@ -149,4 +149,29 @@ public: }; +/** Buffer that could write data to external memory which came from outside + * Template parameter: ReadBuffer or WriteBuffer + */ +template +class BufferWithOutsideMemory : public Base +{ +protected: + Memory<> & memory; +public: + + BufferWithOutsideMemory(Memory<> & memory_) + : Base(nullptr, 0), memory(memory_) + { + Base::set(memory.data(), memory.size()); + Base::padded = false; + } +private: + void nextImpl() override final + { + const size_t prev_size = memory.size(); + memory.resize(2 * prev_size); + Base::set(memory.data(), memory.size()); + } +}; + } diff --git a/src/IO/WriteHelpers.h b/src/IO/WriteHelpers.h index 4d070577e84..e5c6f5b4ab8 100644 --- a/src/IO/WriteHelpers.h +++ b/src/IO/WriteHelpers.h @@ -557,7 +557,6 @@ void writeProbablyBackQuotedStringMySQL(const StringRef & s, WriteBuffer & buf); template void writeCSVString(const char * begin, const char * end, WriteBuffer & buf) { - std::cout << StackTrace().toString() << std::endl; writeChar(quote, buf); const char * pos = begin; diff --git a/src/Processors/Formats/IOutputFormat.h b/src/Processors/Formats/IOutputFormat.h index 67c307df2aa..8f5d137f08f 100644 --- a/src/Processors/Formats/IOutputFormat.h +++ b/src/Processors/Formats/IOutputFormat.h @@ -39,12 +39,15 @@ protected: RowsBeforeLimitCounterPtr rows_before_limit_counter; + friend class ParallelFormattingOutputFormat; + virtual void consume(Chunk) = 0; virtual void consumeTotals(Chunk) {} virtual void consumeExtremes(Chunk) {} virtual void finalize() {} public: + IOutputFormat(const Block & header_, WriteBuffer & out_); Status prepare() override; diff --git a/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp b/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp index 4546f83d6cc..2d6a49ccb6f 100644 --- a/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp @@ -11,7 +11,6 @@ namespace DB CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_) : IRowOutputFormat(header_, out_, params_), with_names(with_names_), format_settings(format_settings_) { - std::cout << StackTrace().toString() << std::endl; const auto & sample = getPort(PortKind::Main).getHeader(); size_t columns = sample.columns(); data_types.resize(columns); diff --git a/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp b/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp index 6a87feab315..15d8a843f41 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp @@ -16,7 +16,6 @@ JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat( : IRowOutputFormat(header_, out_, params_), settings(settings_) { - std::cout << StackTrace().toString() << std::endl; const auto & sample = getPort(PortKind::Main).getHeader(); size_t columns = sample.columns(); fields.resize(columns); diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h index 52b5475420d..0a8fe511471 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h @@ -3,32 +3,50 @@ #include #include #include +#include #include #include #include +#include +#include +#include namespace DB { -const size_t min_chunk_bytes_for_parallel_formatting = 1024; +const size_t min_chunk_bytes_for_parallel_formatting_ = 1024; +const size_t max_threads_for_parallel_formatting_ = 4; class ParallelFormattingOutputFormat : public IOutputFormat { public: + /* Used to recreate formatter on every new data piece. */ + using InternalFormatterCreator = std::function; struct Params { - const Block & header; - WriteBuffer & out; + WriteBuffer & out_; + const Block & header_; + InternalFormatterCreator internal_formatter_creator_; }; explicit ParallelFormattingOutputFormat(Params params) - : IOutputFormat(params.header, params.out) - , original_write_buffer(params.out) - { + : IOutputFormat(params.header_, params.out_) + , internal_formatter_creator(params.internal_formatter_creator_) + , pool(max_threads_for_parallel_formatting_) + { +// std::cout << "ParallelFormattingOutputFormat::constructor" << std::endl; + processing_units.resize(max_threads_for_parallel_formatting_ + 2); + + collector_thread = ThreadFromGlobalPool([this] { сollectorThreadFunction(); }); + } + + ~ParallelFormattingOutputFormat() override + { + finishAndWait(); } String getName() const override final { return "ParallelFormattingOutputFormat"; } @@ -36,31 +54,46 @@ public: protected: void consume(Chunk chunk) override final { + + std::cout << StackTrace().toString() << std::endl; + if (chunk.empty()) + { +// std::cout << "Empty chunk" << std::endl; formatting_finished = true; + } + const auto current_unit_number = writer_unit_number % processing_units.size(); auto & unit = processing_units[current_unit_number]; +// std::cout << "Consume " << current_unit_number << " before wait" << std::endl; + { std::unique_lock lock(mutex); writer_condvar.wait(lock, [&]{ return unit.status == READY_TO_INSERT || formatting_finished; }); } +// std::cout << "Consume " << current_unit_number << " after wait" << std::endl; + assert(unit.status == READY_TO_INSERT); unit.chunk = std::move(chunk); - /// TODO: Allocate new arena for current chunk + /// Resize memory without deallocate + unit.segment.resize(0); + unit.status = READY_TO_FORMAT; - /// TODO: Submit task to ThreadPool + scheduleFormatterThreadForUnitWithNumber(current_unit_number); + + ++writer_unit_number; } private: - WriteBuffer & original_write_buffer; + InternalFormatterCreator internal_formatter_creator; enum ProcessingUnitStatus { @@ -71,63 +104,134 @@ private: struct ProcessingUnit { + explicit ProcessingUnit() + : status(ProcessingUnitStatus::READY_TO_INSERT) + { + } + + std::atomic status; Chunk chunk; - Arena arena; - ProcessingUnitStatus status; + Memory<> segment; }; + // There are multiple "formatters", that's why we use thread pool. + ThreadPool pool; + // Collecting all memory to original ReadBuffer + ThreadFromGlobalPool collector_thread; + std::exception_ptr background_exception = nullptr; std::deque processing_units; std::mutex mutex; std::atomic_bool formatting_finished{false}; - std::atomic_size_t collector_unit_number{0}; std::atomic_size_t writer_unit_number{0}; std::condition_variable collector_condvar; std::condition_variable writer_condvar; + void finishAndWait() + { + + std::cout << "finishAndWait()" << std::endl; + formatting_finished = true; + + { + std::unique_lock lock(mutex); + collector_condvar.notify_all(); + writer_condvar.notify_all(); + } + + if (collector_thread.joinable()) + collector_thread.join(); + + try + { + pool.wait(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + + + void onBackgroundException() + { +// std::cout << "onBackgroundException" << std::endl; + tryLogCurrentException(__PRETTY_FUNCTION__); + + std::unique_lock lock(mutex); + if (!background_exception) + { + background_exception = std::current_exception(); + } + formatting_finished = true; + writer_condvar.notify_all(); + collector_condvar.notify_all(); + } + + void scheduleFormatterThreadForUnitWithNumber(size_t ticket_number) + { + pool.scheduleOrThrowOnError([this, ticket_number] { formatterThreadFunction(ticket_number); }); + } + void сollectorThreadFunction() { setThreadName("Collector"); - while (!formatting_finished) + try { - const auto current_unit_number = collector_unit_number % processing_units.size(); - - auto & unit = processing_units[current_unit_number]; - + while (!formatting_finished) { - std::unique_lock lock(mutex); - collector_condvar.wait(lock, - [&]{ return unit.status == READY_TO_READ || formatting_finished; }); + const auto current_unit_number = collector_unit_number % processing_units.size(); + + auto &unit = processing_units[current_unit_number]; + +// std::cout << "Collector " << current_unit_number << " before wait" << std::endl; + + { + std::unique_lock lock(mutex); + collector_condvar.wait(lock, + [&]{ return unit.status == READY_TO_READ || formatting_finished; }); + } + +// std::cout << "Collector " << current_unit_number << " after wait" << std::endl; + + if (formatting_finished) + { +// std::cout << "formatting finished" << std::endl; + break; + } + + assert(unit.status == READY_TO_READ); + assert(unit.segment.size() > 0); + + for (size_t i=0; i < unit.segment.size(); ++i) + { + std::cout << *(unit.segment.data() + i); + } + std::cout << std::endl; + + + /// Do main work here. + out.write(unit.segment.data(), unit.segment.size()); + + flush(); + + ++collector_unit_number; + + std::lock_guard lock(mutex); + unit.status = READY_TO_INSERT; + writer_condvar.notify_all(); } - - if (formatting_finished) - { - break; - } - - assert(unit.status == READY_TO_READ); - - /// TODO: Arena is singly linked list, and it is simply a list of chunks. - /// We have to write them all into original WriteBuffer. - char * arena_begin = nullptr; - size_t arena_size = 0; - /// Do main work here. - original_write_buffer.write(arena_begin, arena_size); - - /// How to drop this arena? - - - std::lock_guard lock(mutex); - unit.status = READY_TO_INSERT; - writer_condvar.notify_all(); } - + catch (...) + { + onBackgroundException(); + } } @@ -135,12 +239,38 @@ private: { setThreadName("Formatter"); - auto & unit = processing_units[current_unit_number]; + try + { +// std::cout << "Formatter " << current_unit_number << std::endl; - const char * arena_begin = nullptr; - WriteBufferFromArena out(unit.arena, arena_begin); + auto & unit = processing_units[current_unit_number]; + + assert(unit.status = READY_TO_FORMAT); + + unit.segment.resize(10 * DBMS_DEFAULT_BUFFER_SIZE); + + /// TODO: Implement proper nextImpl + BufferWithOutsideMemory out(unit.segment); + + auto formatter = internal_formatter_creator(out); + + formatter->consume(std::move(unit.chunk)); + +// std::cout << "Formatter " << current_unit_number << " before notify" << std::endl; + + { + std::lock_guard lock(mutex); + unit.status = READY_TO_READ; + collector_condvar.notify_all(); + } + +// std::cout << "Formatter " << current_unit_number << " after notify" << std::endl; + } + catch (...) + { + onBackgroundException(); + } - /// TODO: create parser and parse current chunk to arena } };