diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index d20cf3bea86..34558e2d260 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -216,7 +216,7 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, const Settings & settings = context.getSettingsRef(); bool parallel_formatting = settings.output_format_parallel_formatting; - if (parallel_formatting) + if (parallel_formatting && name != "PrettyCompactMonoBlock") { const auto & output_getter = getCreators(name).output_processor_creator; diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h index 2117bd2ee9b..f5775f80be9 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h @@ -54,6 +54,11 @@ public: need_flush = true; } + void doWritePrefix() override + { + addChunk(Chunk{}, ProcessingUnitType::START); + } + protected: void consume(Chunk chunk) override final { @@ -90,6 +95,7 @@ private: enum class ProcessingUnitType { + START, PLAIN, TOTALS, EXTREMES, @@ -232,9 +238,6 @@ private: [&]{ return unit.status == READY_TO_READ; }); } - if (unit.type == ProcessingUnitType::FINALIZE) - break; - if (unit.type == ProcessingUnitType::TOTALS) { } @@ -250,9 +253,14 @@ private: ++collector_unit_number; - std::lock_guard lock(mutex); - unit.status = READY_TO_INSERT; - writer_condvar.notify_all(); + { + std::lock_guard lock(mutex); + unit.status = READY_TO_INSERT; + writer_condvar.notify_all(); + } + + if (unit.type == ProcessingUnitType::FINALIZE) + break; } } catch (...) @@ -283,6 +291,11 @@ private: switch (unit.type) { + case ProcessingUnitType::START : + { + formatter->doWritePrefix(); + break; + } case ProcessingUnitType::PLAIN : { formatter->consume(std::move(unit.chunk)); @@ -300,6 +313,7 @@ private: } case ProcessingUnitType::FINALIZE : { + formatter->doWriteSuffix(); break; } }