diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 34558e2d260..25ea5fc9c06 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -216,7 +216,7 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, const Settings & settings = context.getSettingsRef(); bool parallel_formatting = settings.output_format_parallel_formatting; - if (parallel_formatting && name != "PrettyCompactMonoBlock") + if (parallel_formatting && getCreators(name).supports_parallel_formatting) { const auto & output_getter = getCreators(name).output_processor_creator; @@ -351,6 +351,16 @@ void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegm target = std::move(file_segmentation_engine); } + +void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & name) +{ + auto & target = dict[name].supports_parallel_formatting; + if (target) + throw Exception("FormatFactory: Output format " + name + " is already marked as supporting parallel formatting.", ErrorCodes::LOGICAL_ERROR); + target = true; +} + + FormatFactory & FormatFactory::instance() { static FormatFactory ret; diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index d191597fe25..ddf557b9dfc 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -100,6 +100,7 @@ private: InputProcessorCreator input_processor_creator; OutputProcessorCreator output_processor_creator; FileSegmentationEngine file_segmentation_engine; + bool supports_parallel_formatting{false}; }; using FormatsDictionary = std::unordered_map; @@ -140,6 +141,8 @@ public: void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator); void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator); + void markOutputFormatSupportsParallelFormatting(const String & name); + const FormatsDictionary & getAllFormats() const { return dict; diff --git a/src/IO/BufferWithOwnMemory.h b/src/IO/BufferWithOwnMemory.h index 8b5bb552525..104c15a4acc 100644 --- a/src/IO/BufferWithOwnMemory.h +++ b/src/IO/BufferWithOwnMemory.h @@ -176,7 +176,7 @@ private: { const size_t prev_size = memory.size(); memory.resize(2 * prev_size + 1); - Base::set(memory.data(), memory.size(), prev_size + 1); + Base::set(memory.data() + prev_size, memory.size() - prev_size, 0); } }; diff --git a/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp b/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp index 2d6a49ccb6f..90fc768d311 100644 --- a/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp @@ -82,6 +82,7 @@ void registerOutputFormatProcessorCSV(FormatFactory & factory) { return std::make_shared(buf, sample, with_names, params, format_settings); }); + factory.markOutputFormatSupportsParallelFormatting(with_names ? "CSVWithNames" : "CSV"); } } diff --git a/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp b/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp index e12ca966a93..6efecc1b978 100644 --- a/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp @@ -108,6 +108,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory) { return std::make_shared(buf, sample, params, format_settings, false, false); }); + factory.markOutputFormatSupportsParallelFormatting("JSONCompactEachRow"); factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", []( WriteBuffer &buf, @@ -117,6 +118,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory) { return std::make_shared(buf, sample, params, format_settings, true, false); }); + factory.markOutputFormatSupportsParallelFormatting("JSONCompactEachRowWithNamesAndTypes"); factory.registerOutputFormatProcessor("JSONCompactStringsEachRow", []( WriteBuffer & buf, @@ -126,6 +128,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory) { return std::make_shared(buf, sample, params, format_settings, false, true); }); + factory.markOutputFormatSupportsParallelFormatting("JSONCompactStringsEachRow"); factory.registerOutputFormatProcessor("JSONCompactStringsEachRowWithNamesAndTypes", []( WriteBuffer &buf, @@ -135,6 +138,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory) { return std::make_shared(buf, sample, params, format_settings, true, true); }); + factory.markOutputFormatSupportsParallelFormatting("JSONCompactStringsEachRowWithNamesAndTypes"); } diff --git a/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp b/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp index 15d8a843f41..30cd0660682 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.cpp @@ -138,6 +138,7 @@ void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory) return std::make_shared(buf, sample, params, settings); }); + factory.markOutputFormatSupportsParallelFormatting("JSONEachRow"); factory.registerOutputFormatProcessor("JSONStringsEachRow", []( WriteBuffer & buf, @@ -150,6 +151,7 @@ void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory) return std::make_shared(buf, sample, params, settings); }); + factory.markOutputFormatSupportsParallelFormatting("JSONStringEachRow"); } } diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h index f5775f80be9..d926ba4e125 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -82,7 +83,6 @@ protected: } private: - InternalFormatterCreator internal_formatter_creator; enum ProcessingUnitStatus @@ -104,6 +104,7 @@ private: void addChunk(Chunk chunk, ProcessingUnitType type) { + // std::cout << "AddChunk of size " << chunk.getNumRows() << std::endl; const auto current_unit_number = writer_unit_number % processing_units.size(); auto & unit = processing_units[current_unit_number]; @@ -119,7 +120,6 @@ private: unit.chunk = std::move(chunk); /// Resize memory without deallocate - unit.segment.resize(0); unit.status = READY_TO_FORMAT; unit.type = type; @@ -143,6 +143,8 @@ private: }; + std::promise finalizator{}; + std::atomic_bool need_flush{false}; // There are multiple "formatters", that's why we use thread pool. @@ -165,6 +167,9 @@ private: void finishAndWait() { + std::future future_finalizator = finalizator.get_future(); + future_finalizator.get(); + formatting_finished = true; { @@ -230,6 +235,8 @@ private: { const auto current_unit_number = collector_unit_number % processing_units.size(); + // std::cout << "collecting " << current_unit_number << std::endl; + auto & unit = processing_units[current_unit_number]; { @@ -238,13 +245,11 @@ private: [&]{ return unit.status == READY_TO_READ; }); } - if (unit.type == ProcessingUnitType::TOTALS) { - - } - assert(unit.status == READY_TO_READ); assert(unit.segment.size() > 0); + auto copy_if_unit_type = unit.type; + /// Do main work here. out.write(unit.segment.data(), unit.actual_memory_size); @@ -259,8 +264,12 @@ private: writer_condvar.notify_all(); } - if (unit.type == ProcessingUnitType::FINALIZE) + if (copy_if_unit_type == ProcessingUnitType::FINALIZE) + { + finalizator.set_value(true); break; + } + } } catch (...) @@ -280,9 +289,8 @@ private: assert(unit.status = READY_TO_FORMAT); - unit.segment.resize(DBMS_DEFAULT_BUFFER_SIZE); + unit.segment.resize(1); - /// TODO: Implement proper nextImpl BufferWithOutsideMemory out_buffer(unit.segment); auto formatter = internal_formatter_creator(out_buffer); diff --git a/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp b/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp index d65ce95313e..149ba3f0a2a 100644 --- a/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/TSKVRowOutputFormat.cpp @@ -49,6 +49,7 @@ void registerOutputFormatProcessorTSKV(FormatFactory & factory) { return std::make_shared(buf, sample, params, settings); }); + factory.markOutputFormatSupportsParallelFormatting("TSKV"); } } diff --git a/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp b/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp index da8221b11c5..dd3adfa40eb 100644 --- a/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp @@ -85,6 +85,7 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory) { return std::make_shared(buf, sample, false, false, params, settings); }); + factory.markOutputFormatSupportsParallelFormatting(name); } for (const auto * name : {"TabSeparatedRaw", "TSVRaw"}) @@ -97,6 +98,7 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory) { return std::make_shared(buf, sample, false, false, params, settings); }); + factory.markOutputFormatSupportsParallelFormatting(name); } for (const auto * name : {"TabSeparatedWithNames", "TSVWithNames"}) @@ -109,6 +111,7 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory) { return std::make_shared(buf, sample, true, false, params, settings); }); + factory.markOutputFormatSupportsParallelFormatting(name); } for (const auto * name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"}) @@ -121,6 +124,7 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory) { return std::make_shared(buf, sample, true, true, params, settings); }); + factory.markOutputFormatSupportsParallelFormatting(name); } } diff --git a/src/Processors/Formats/Impl/ValuesRowOutputFormat.cpp b/src/Processors/Formats/Impl/ValuesRowOutputFormat.cpp index 7791e1296e0..7f249200404 100644 --- a/src/Processors/Formats/Impl/ValuesRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ValuesRowOutputFormat.cpp @@ -51,6 +51,7 @@ void registerOutputFormatProcessorValues(FormatFactory & factory) { return std::make_shared(buf, sample, params, settings); }); + factory.markOutputFormatSupportsParallelFormatting("Values"); } } diff --git a/tests/queries/0_stateless/01514_parallel_formatting.reference b/tests/queries/0_stateless/01514_parallel_formatting.reference new file mode 100644 index 00000000000..c9bddf62490 --- /dev/null +++ b/tests/queries/0_stateless/01514_parallel_formatting.reference @@ -0,0 +1,6 @@ +10000000 +10000000 +20000000 +20000000 +30000000 +30000000 diff --git a/tests/queries/0_stateless/01514_parallel_formatting.sql b/tests/queries/0_stateless/01514_parallel_formatting.sql new file mode 100644 index 00000000000..90fa9e4b4b8 --- /dev/null +++ b/tests/queries/0_stateless/01514_parallel_formatting.sql @@ -0,0 +1,19 @@ +drop table if exists tsv; +create table tsv(a int, b int default 7) engine File(TSV); + +insert into tsv(a) select number from numbers(10000000); +select '10000000'; +select count() from tsv; + + +insert into tsv(a) select number from numbers(10000000); +select '20000000'; +select count() from tsv; + + +insert into tsv(a) select number from numbers(10000000); +select '30000000'; +select count() from tsv; + + +drop table tsv;