diff --git a/dbms/programs/client/Client.cpp b/dbms/programs/client/Client.cpp index e6064ec8860..407612257a7 100644 --- a/dbms/programs/client/Client.cpp +++ b/dbms/programs/client/Client.cpp @@ -60,7 +60,7 @@ #include #include #include -#include +#include #if USE_READLINE #include "Suggest.h" // Y_IGNORE @@ -893,11 +893,12 @@ private: /// Receive description of table structure. Block sample; - if (receiveSampleBlock(sample)) + ColumnsDescription columns_description; + if (receiveSampleBlock(sample, columns_description)) { /// If structure was received (thus, server has not thrown an exception), /// send our data with that structure. - sendData(sample); + sendData(sample, columns_description); receiveEndOfQuery(); } } @@ -935,7 +936,7 @@ private: } - void sendData(Block & sample) + void sendData(Block & sample, const ColumnsDescription & columns_description) { /// If INSERT data must be sent. const ASTInsertQuery * parsed_insert_query = typeid_cast(&*parsed_query); @@ -946,19 +947,19 @@ private: { /// Send data contained in the query. ReadBufferFromMemory data_in(parsed_insert_query->data, parsed_insert_query->end - parsed_insert_query->data); - sendDataFrom(data_in, sample); + sendDataFrom(data_in, sample, columns_description); } else if (!is_interactive) { /// Send data read from stdin. - sendDataFrom(std_in, sample); + sendDataFrom(std_in, sample, columns_description); } else throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT); } - void sendDataFrom(ReadBuffer & buf, Block & sample) + void sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDescription & columns_description) { String current_format = insert_format; @@ -970,9 +971,10 @@ private: BlockInputStreamPtr block_input = context.getInputFormat( current_format, buf, sample, insert_format_max_block_size); - auto column_defaults = ColumnDefaultsHelper::extract(sample); + const auto & column_defaults = columns_description.defaults; if (!column_defaults.empty()) block_input = std::make_shared(block_input, column_defaults, context); + BlockInputStreamPtr async_block_input = std::make_shared(block_input); async_block_input->readPrefix(); @@ -1110,7 +1112,7 @@ private: /// Receive the block that serves as an example of the structure of table where data will be inserted. - bool receiveSampleBlock(Block & out) + bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description) { while (true) { @@ -1131,6 +1133,10 @@ private: onLogData(packet.block); break; + case Protocol::Server::TableColumns: + columns_description = ColumnsDescription::parse(packet.multistring_message[1]); + return receiveSampleBlock(out, columns_description); + default: throw NetException("Unexpected packet from server (expected Data, Exception or Log, got " + String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER); diff --git a/dbms/programs/server/TCPHandler.cpp b/dbms/programs/server/TCPHandler.cpp index d1ccc3e788f..efd4ffc55d6 100644 --- a/dbms/programs/server/TCPHandler.cpp +++ b/dbms/programs/server/TCPHandler.cpp @@ -360,17 +360,16 @@ void TCPHandler::processInsertQuery(const Settings & global_settings) */ state.io.out->writePrefix(); - /// Send block to the client - table structure. - Block block = state.io.out->getHeader(); - - /// attach column defaults to sample block (allow client to attach defaults for ommited source values) + /// Send ColumnsDescription for insertion table if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA) { - auto db_and_table = query_context.getInsertionTable(); - ColumnDefaults column_defaults = ColumnDefaultsHelper::loadFromContext(query_context, db_and_table.first, db_and_table.second); - ColumnDefaultsHelper::attach(column_defaults, block); + const auto & db_and_table = query_context.getInsertionTable(); + if (auto * columns = ColumnsDescription::loadFromContext(query_context, db_and_table.first, db_and_table.second)) + sendTableColumns(*columns); } + /// Send block to the client - table structure. + Block block = state.io.out->getHeader(); sendData(block); readData(global_settings); @@ -853,6 +852,16 @@ void TCPHandler::sendLogData(const Block & block) out->next(); } +void TCPHandler::sendTableColumns(const ColumnsDescription & columns) +{ + writeVarUInt(Protocol::Server::TableColumns, *out); + + /// Send external table name (empty name is the main table) + writeStringBinary("", *out); + writeStringBinary(columns.toString(), *out); + + out->next(); +} void TCPHandler::sendException(const Exception & e, bool with_stack_trace) { diff --git a/dbms/programs/server/TCPHandler.h b/dbms/programs/server/TCPHandler.h index af422921f07..14189da6176 100644 --- a/dbms/programs/server/TCPHandler.h +++ b/dbms/programs/server/TCPHandler.h @@ -144,6 +144,7 @@ private: void sendHello(); void sendData(const Block & block); /// Write a block to the network. void sendLogData(const Block & block); + void sendTableColumns(const ColumnsDescription & columns); void sendException(const Exception & e, bool with_stack_trace); void sendProgress(); void sendLogs(); diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index ce6246fba3a..50c5ca2cebc 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -604,6 +604,10 @@ Connection::Packet Connection::receivePacket() res.block = receiveLogData(); return res; + case Protocol::Server::TableColumns: + res.multistring_message = receiveMultistringMessage(res.type); + return res; + case Protocol::Server::EndOfStream: return res; @@ -713,6 +717,16 @@ std::unique_ptr Connection::receiveException() } +std::vector Connection::receiveMultistringMessage(UInt64 msg_type) +{ + size_t num = Protocol::Server::wordsInMessage(msg_type); + std::vector out(num); + for (size_t i = 0; i < num; ++i) + readStringBinary(out[i], *in); + return out; +} + + Progress Connection::receiveProgress() { //LOG_TRACE(log_wrapper.get(), "Receiving progress"); diff --git a/dbms/src/Client/Connection.h b/dbms/src/Client/Connection.h index d8229fc3463..27b7d6bd4d8 100644 --- a/dbms/src/Client/Connection.h +++ b/dbms/src/Client/Connection.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include @@ -96,6 +98,7 @@ public: Block block; std::unique_ptr exception; + std::vector multistring_message; Progress progress; BlockStreamProfileInfo profile_info; @@ -259,6 +262,7 @@ private: Block receiveLogData(); Block receiveDataImpl(BlockInputStreamPtr & stream); + std::vector receiveMultistringMessage(UInt64 msg_type); std::unique_ptr receiveException(); Progress receiveProgress(); BlockStreamProfileInfo receiveProfileInfo(); diff --git a/dbms/src/Core/Protocol.h b/dbms/src/Core/Protocol.h index 27df4341de9..28f60cce901 100644 --- a/dbms/src/Core/Protocol.h +++ b/dbms/src/Core/Protocol.h @@ -69,7 +69,8 @@ namespace Protocol Totals = 7, /// A block with totals (compressed or not). Extremes = 8, /// A block with minimums and maximums (compressed or not). TablesStatusResponse = 9, /// A response to TablesStatus request. - Log = 10 /// System logs of the query execution + Log = 10, /// System logs of the query execution + TableColumns = 11, /// Columns' description for default values calculation }; /// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10 @@ -78,11 +79,24 @@ namespace Protocol /// See https://www.securecoding.cert.org/confluence/display/cplusplus/INT36-CPP.+Do+not+use+out-of-range+enumeration+values inline const char * toString(UInt64 packet) { - static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo", "Totals", "Extremes", "TablesStatusResponse", "Log" }; - return packet < 11 + static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo", "Totals", + "Extremes", "TablesStatusResponse", "Log", "TableColumns" }; + return packet < 12 ? data[packet] : "Unknown packet"; } + + inline size_t wordsInMessage(UInt64 msg_type) + { + switch (msg_type) + { + case TableColumns: + return 2; + default: + break; + } + return 0; + } } /// Packet types that client transmits. diff --git a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp index d232fee96ce..b78b7a59db6 100644 --- a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp +++ b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp @@ -5,7 +5,7 @@ #include #include #include -#include +#include namespace DB { @@ -46,9 +46,9 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size); - auto column_defaults = ColumnDefaultsHelper::loadFromContext(context, ast_insert_query->database, ast_insert_query->table); - if (!column_defaults.empty()) - res_stream = std::make_shared(res_stream, column_defaults, context); + auto columns_description = ColumnsDescription::loadFromContext(context, ast_insert_query->database, ast_insert_query->table); + if (columns_description && !columns_description->defaults.empty()) + res_stream = std::make_shared(res_stream, columns_description->defaults, context); } } diff --git a/dbms/src/Interpreters/evaluateMissingDefaults.cpp b/dbms/src/Interpreters/evaluateMissingDefaults.cpp index 050078b7af4..0b330fb00cc 100644 --- a/dbms/src/Interpreters/evaluateMissingDefaults.cpp +++ b/dbms/src/Interpreters/evaluateMissingDefaults.cpp @@ -37,7 +37,7 @@ static ASTPtr requiredExpressions(Block & block, const NamesAndTypesList & requi void evaluateMissingDefaults(Block & block, const NamesAndTypesList & required_columns, const ColumnDefaults & column_defaults, - const Context & context, bool save_unneded_columns) + const Context & context, bool save_unneeded_columns) { if (column_defaults.empty()) return; @@ -46,7 +46,7 @@ void evaluateMissingDefaults(Block & block, if (!default_expr_list) return; - if (!save_unneded_columns) + if (!save_unneeded_columns) { auto syntax_result = SyntaxAnalyzer(context, {}).analyze(default_expr_list, block.getNamesAndTypesList()); ExpressionAnalyzer{default_expr_list, syntax_result, context}.getActions(true)->execute(block); diff --git a/dbms/src/Interpreters/evaluateMissingDefaults.h b/dbms/src/Interpreters/evaluateMissingDefaults.h index c65cb1680a2..320fb35c9cb 100644 --- a/dbms/src/Interpreters/evaluateMissingDefaults.h +++ b/dbms/src/Interpreters/evaluateMissingDefaults.h @@ -12,10 +12,9 @@ class Context; class NamesAndTypesList; struct ColumnDefault; -/// void evaluateMissingDefaults(Block & block, const NamesAndTypesList & required_columns, const std::unordered_map & column_defaults, - const Context & context, bool save_unneded_columns = true); + const Context & context, bool save_unneeded_columns = true); } diff --git a/dbms/src/Storages/ColumnDefault.cpp b/dbms/src/Storages/ColumnDefault.cpp index 2a89cda5caf..46995e307f3 100644 --- a/dbms/src/Storages/ColumnDefault.cpp +++ b/dbms/src/Storages/ColumnDefault.cpp @@ -54,73 +54,4 @@ bool operator==(const ColumnDefault & lhs, const ColumnDefault & rhs) return lhs.kind == rhs.kind && queryToString(lhs.expression) == queryToString(rhs.expression); } -ColumnDefaults ColumnDefaultsHelper::loadFromContext(const Context & context, const String & database, const String & table) -{ - if (context.getSettingsRef().insert_sample_with_metadata) - { - if (!context.isTableExist(database, table)) - return {}; - - StoragePtr storage = context.getTable(database, table); - const ColumnsDescription & table_columns = storage->getColumns(); - return table_columns.defaults; - } - return {}; -} - -void ColumnDefaultsHelper::attach(const ColumnDefaults & column_defaults, Block & sample) -{ - if (column_defaults.empty()) - return; - - for (auto pr : column_defaults) - { - std::stringstream ss; - ss << *pr.second.expression; - - /// Serialize defaults to special columns names. - /// It looks better to send expression as a column data but sample block has 0 rows. - ColumnWithTypeAndName col; - col.type = std::make_shared(); - col.name = Block::mkSpecialColumnName(toString(pr.second.kind) + ' ' + pr.first + ' ' + ss.str()); - col.column = col.type->createColumnConst(sample.rows(), ""); - - sample.insert(std::move(col)); - } -} - -ColumnDefaults ColumnDefaultsHelper::extract(Block & sample) -{ - ParserTernaryOperatorExpression parser; - ColumnDefaults column_defaults; - std::set pos_to_erase; - - for (size_t i = 0; i < sample.columns(); ++i) - { - const ColumnWithTypeAndName & column_wtn = sample.safeGetByPosition(i); - - if (Block::isSpecialColumnName(column_wtn.name, AliasNames::DEFAULT) || - Block::isSpecialColumnName(column_wtn.name, AliasNames::MATERIALIZED) || - Block::isSpecialColumnName(column_wtn.name, AliasNames::ALIAS)) - { - String str_kind, column_name; - std::stringstream ss; - ss << column_wtn.name; - ss >> str_kind >> column_name; - size_t expression_pos = str_kind.size() + column_name.size() + 3; - StringRef expression(&column_wtn.name[expression_pos], column_wtn.name.size() - expression_pos); - - ColumnDefault def; - def.kind = columnDefaultKindFromString(str_kind); - def.expression = parseQuery(parser, expression.data, expression.size); - - column_defaults.emplace(column_name, def); - pos_to_erase.insert(i); - } - } - - sample.erase(pos_to_erase); - return column_defaults; -} - } diff --git a/dbms/src/Storages/ColumnDefault.h b/dbms/src/Storages/ColumnDefault.h index 00693b54ad5..292c0cf7495 100644 --- a/dbms/src/Storages/ColumnDefault.h +++ b/dbms/src/Storages/ColumnDefault.h @@ -36,13 +36,4 @@ bool operator==(const ColumnDefault & lhs, const ColumnDefault & rhs); using ColumnDefaults = std::unordered_map; -/// Static methods to manipulate column defaults -struct ColumnDefaultsHelper -{ - static void attach(const ColumnDefaults & column_defaults, Block & sample); - static ColumnDefaults extract(Block & sample); - - static ColumnDefaults loadFromContext(const Context & context, const String & database, const String & table); -}; - } diff --git a/dbms/src/Storages/ColumnsDescription.cpp b/dbms/src/Storages/ColumnsDescription.cpp index cb67d01a4ea..c37eaa2fc46 100644 --- a/dbms/src/Storages/ColumnsDescription.cpp +++ b/dbms/src/Storages/ColumnsDescription.cpp @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include #include @@ -162,4 +164,20 @@ ColumnsDescription ColumnsDescription::parse(const String & str) return result; } +const ColumnsDescription * ColumnsDescription::loadFromContext(const Context & context, const String & db, const String & table) +{ + if (context.getSettingsRef().insert_sample_with_metadata) + { + auto db_and_table = context.getInsertionTable(); + + if (context.isTableExist(db, table)) + { + StoragePtr storage = context.getTable(db, table); + return &storage->getColumns(); + } + } + + return nullptr; +} + } diff --git a/dbms/src/Storages/ColumnsDescription.h b/dbms/src/Storages/ColumnsDescription.h index 288d2712b3b..f06a9221dfd 100644 --- a/dbms/src/Storages/ColumnsDescription.h +++ b/dbms/src/Storages/ColumnsDescription.h @@ -57,6 +57,7 @@ struct ColumnsDescription String toString() const; static ColumnsDescription parse(const String & str); + static const ColumnsDescription * loadFromContext(const Context & context, const String & db, const String & table); }; }