From bd8c9733e30119c7f0f43034a167561c86891d78 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 5 Aug 2019 12:35:46 +0300 Subject: [PATCH] Update MySQLOutputFormat. --- dbms/src/Processors/Formats/IOutputFormat.h | 4 +- .../Formats/Impl/MySQLOutputFormat.cpp | 42 +++++++++++-------- .../Formats/Impl/MySQLOutputFormat.h | 4 ++ 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/dbms/src/Processors/Formats/IOutputFormat.h b/dbms/src/Processors/Formats/IOutputFormat.h index 3242ab9da9a..53e5b9e2158 100644 --- a/dbms/src/Processors/Formats/IOutputFormat.h +++ b/dbms/src/Processors/Formats/IOutputFormat.h @@ -64,8 +64,8 @@ public: void write(const Block & block) { consume(Chunk(block.getColumns(), block.rows())); } - void doWritePrefix() {} - void doWriteSuffix() { finalize(); } + virtual void doWritePrefix() {} + virtual void doWriteSuffix() { finalize(); } void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); } void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); } diff --git a/dbms/src/Processors/Formats/Impl/MySQLOutputFormat.cpp b/dbms/src/Processors/Formats/Impl/MySQLOutputFormat.cpp index 0487d6334b1..2e48d0643e9 100644 --- a/dbms/src/Processors/Formats/Impl/MySQLOutputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/MySQLOutputFormat.cpp @@ -23,32 +23,40 @@ MySQLOutputFormat::MySQLOutputFormat(WriteBuffer & out_, const Block & header, c packet_sender.max_packet_size = context.mysql.max_packet_size; } -void MySQLOutputFormat::consume(Chunk chunk) +void MySQLOutputFormat::initialize() { + if (initialized) + return; + + initialized = true; auto & header = getPort(PortKind::Main).getHeader(); - if (!initialized) + + if (header.columns()) { - initialized = true; - if (header.columns()) + packet_sender.sendPacket(LengthEncodedNumber(header.columns())); + + for (const ColumnWithTypeAndName & column : header.getColumnsWithTypeAndName()) { + ColumnDefinition column_definition(column.name, CharacterSet::binary, 0, ColumnType::MYSQL_TYPE_STRING, + 0, 0); + packet_sender.sendPacket(column_definition); + } - packet_sender.sendPacket(LengthEncodedNumber(header.columns())); - - for (const ColumnWithTypeAndName & column : header.getColumnsWithTypeAndName()) - { - ColumnDefinition column_definition(column.name, CharacterSet::binary, 0, ColumnType::MYSQL_TYPE_STRING, - 0, 0); - packet_sender.sendPacket(column_definition); - } - - if (!(context.mysql.client_capabilities & Capability::CLIENT_DEPRECATE_EOF)) - { - packet_sender.sendPacket(EOF_Packet(0, 0)); - } + if (!(context.mysql.client_capabilities & Capability::CLIENT_DEPRECATE_EOF)) + { + packet_sender.sendPacket(EOF_Packet(0, 0)); } } +} + + +void MySQLOutputFormat::consume(Chunk chunk) +{ + initialize(); + + auto & header = getPort(PortKind::Main).getHeader(); size_t rows = chunk.getNumRows(); auto & columns = chunk.getColumns(); diff --git a/dbms/src/Processors/Formats/Impl/MySQLOutputFormat.h b/dbms/src/Processors/Formats/Impl/MySQLOutputFormat.h index b9457a6369d..e6b319f659a 100644 --- a/dbms/src/Processors/Formats/Impl/MySQLOutputFormat.h +++ b/dbms/src/Processors/Formats/Impl/MySQLOutputFormat.h @@ -26,6 +26,10 @@ public: void consume(Chunk) override; void finalize() override; void flush() override; + void doWritePrefix() override { initialize(); } + + void initialize(); + private: bool initialized = false;