Update MySQLOutputFormat.

This commit is contained in:
Nikolai Kochetov 2019-08-05 12:35:46 +03:00
parent b0b7ed791d
commit bd8c9733e3
3 changed files with 31 additions and 19 deletions

View File

@ -64,8 +64,8 @@ public:
void write(const Block & block) { consume(Chunk(block.getColumns(), block.rows())); }
void doWritePrefix() {}
void doWriteSuffix() { finalize(); }
virtual void doWritePrefix() {}
virtual void doWriteSuffix() { finalize(); }
void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); }
void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); }

View File

@ -23,32 +23,40 @@ MySQLOutputFormat::MySQLOutputFormat(WriteBuffer & out_, const Block & header, c
packet_sender.max_packet_size = context.mysql.max_packet_size;
}
void MySQLOutputFormat::consume(Chunk chunk)
void MySQLOutputFormat::initialize()
{
if (initialized)
return;
initialized = true;
auto & header = getPort(PortKind::Main).getHeader();
if (!initialized)
if (header.columns())
{
initialized = true;
if (header.columns())
packet_sender.sendPacket(LengthEncodedNumber(header.columns()));
for (const ColumnWithTypeAndName & column : header.getColumnsWithTypeAndName())
{
ColumnDefinition column_definition(column.name, CharacterSet::binary, 0, ColumnType::MYSQL_TYPE_STRING,
0, 0);
packet_sender.sendPacket(column_definition);
}
packet_sender.sendPacket(LengthEncodedNumber(header.columns()));
for (const ColumnWithTypeAndName & column : header.getColumnsWithTypeAndName())
{
ColumnDefinition column_definition(column.name, CharacterSet::binary, 0, ColumnType::MYSQL_TYPE_STRING,
0, 0);
packet_sender.sendPacket(column_definition);
}
if (!(context.mysql.client_capabilities & Capability::CLIENT_DEPRECATE_EOF))
{
packet_sender.sendPacket(EOF_Packet(0, 0));
}
if (!(context.mysql.client_capabilities & Capability::CLIENT_DEPRECATE_EOF))
{
packet_sender.sendPacket(EOF_Packet(0, 0));
}
}
}
void MySQLOutputFormat::consume(Chunk chunk)
{
initialize();
auto & header = getPort(PortKind::Main).getHeader();
size_t rows = chunk.getNumRows();
auto & columns = chunk.getColumns();

View File

@ -26,6 +26,10 @@ public:
void consume(Chunk) override;
void finalize() override;
void flush() override;
void doWritePrefix() override { initialize(); }
void initialize();
private:
bool initialized = false;