send defaults via serialized ColumnsDescription CLICKHOUSE-3578

This commit is contained in:
chertus 2018-12-04 23:03:04 +03:00
parent 933c055104
commit a71d03737c
13 changed files with 93 additions and 105 deletions

View File

@ -60,7 +60,7 @@
#include <Common/InterruptListener.h> #include <Common/InterruptListener.h>
#include <Functions/registerFunctions.h> #include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h> #include <AggregateFunctions/registerAggregateFunctions.h>
#include <Storages/ColumnDefault.h> #include <Storages/ColumnsDescription.h>
#if USE_READLINE #if USE_READLINE
#include "Suggest.h" // Y_IGNORE #include "Suggest.h" // Y_IGNORE
@ -893,11 +893,12 @@ private:
/// Receive description of table structure. /// Receive description of table structure.
Block sample; Block sample;
if (receiveSampleBlock(sample)) ColumnsDescription columns_description;
if (receiveSampleBlock(sample, columns_description))
{ {
/// If structure was received (thus, server has not thrown an exception), /// If structure was received (thus, server has not thrown an exception),
/// send our data with that structure. /// send our data with that structure.
sendData(sample); sendData(sample, columns_description);
receiveEndOfQuery(); receiveEndOfQuery();
} }
} }
@ -935,7 +936,7 @@ private:
} }
void sendData(Block & sample) void sendData(Block & sample, const ColumnsDescription & columns_description)
{ {
/// If INSERT data must be sent. /// If INSERT data must be sent.
const ASTInsertQuery * parsed_insert_query = typeid_cast<const ASTInsertQuery *>(&*parsed_query); const ASTInsertQuery * parsed_insert_query = typeid_cast<const ASTInsertQuery *>(&*parsed_query);
@ -946,19 +947,19 @@ private:
{ {
/// Send data contained in the query. /// Send data contained in the query.
ReadBufferFromMemory data_in(parsed_insert_query->data, parsed_insert_query->end - parsed_insert_query->data); ReadBufferFromMemory data_in(parsed_insert_query->data, parsed_insert_query->end - parsed_insert_query->data);
sendDataFrom(data_in, sample); sendDataFrom(data_in, sample, columns_description);
} }
else if (!is_interactive) else if (!is_interactive)
{ {
/// Send data read from stdin. /// Send data read from stdin.
sendDataFrom(std_in, sample); sendDataFrom(std_in, sample, columns_description);
} }
else else
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT); throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
} }
void sendDataFrom(ReadBuffer & buf, Block & sample) void sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDescription & columns_description)
{ {
String current_format = insert_format; String current_format = insert_format;
@ -970,9 +971,10 @@ private:
BlockInputStreamPtr block_input = context.getInputFormat( BlockInputStreamPtr block_input = context.getInputFormat(
current_format, buf, sample, insert_format_max_block_size); current_format, buf, sample, insert_format_max_block_size);
auto column_defaults = ColumnDefaultsHelper::extract(sample); const auto & column_defaults = columns_description.defaults;
if (!column_defaults.empty()) if (!column_defaults.empty())
block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context); block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context);
BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(block_input); BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(block_input);
async_block_input->readPrefix(); async_block_input->readPrefix();
@ -1110,7 +1112,7 @@ private:
/// Receive the block that serves as an example of the structure of table where data will be inserted. /// Receive the block that serves as an example of the structure of table where data will be inserted.
bool receiveSampleBlock(Block & out) bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description)
{ {
while (true) while (true)
{ {
@ -1131,6 +1133,10 @@ private:
onLogData(packet.block); onLogData(packet.block);
break; break;
case Protocol::Server::TableColumns:
columns_description = ColumnsDescription::parse(packet.multistring_message[1]);
return receiveSampleBlock(out, columns_description);
default: default:
throw NetException("Unexpected packet from server (expected Data, Exception or Log, got " throw NetException("Unexpected packet from server (expected Data, Exception or Log, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER); + String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);

View File

@ -360,17 +360,16 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
*/ */
state.io.out->writePrefix(); state.io.out->writePrefix();
/// Send block to the client - table structure. /// Send ColumnsDescription for insertion table
Block block = state.io.out->getHeader();
/// attach column defaults to sample block (allow client to attach defaults for ommited source values)
if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA) if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA)
{ {
auto db_and_table = query_context.getInsertionTable(); const auto & db_and_table = query_context.getInsertionTable();
ColumnDefaults column_defaults = ColumnDefaultsHelper::loadFromContext(query_context, db_and_table.first, db_and_table.second); if (auto * columns = ColumnsDescription::loadFromContext(query_context, db_and_table.first, db_and_table.second))
ColumnDefaultsHelper::attach(column_defaults, block); sendTableColumns(*columns);
} }
/// Send block to the client - table structure.
Block block = state.io.out->getHeader();
sendData(block); sendData(block);
readData(global_settings); readData(global_settings);
@ -853,6 +852,16 @@ void TCPHandler::sendLogData(const Block & block)
out->next(); out->next();
} }
void TCPHandler::sendTableColumns(const ColumnsDescription & columns)
{
writeVarUInt(Protocol::Server::TableColumns, *out);
/// Send external table name (empty name is the main table)
writeStringBinary("", *out);
writeStringBinary(columns.toString(), *out);
out->next();
}
void TCPHandler::sendException(const Exception & e, bool with_stack_trace) void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
{ {

View File

@ -144,6 +144,7 @@ private:
void sendHello(); void sendHello();
void sendData(const Block & block); /// Write a block to the network. void sendData(const Block & block); /// Write a block to the network.
void sendLogData(const Block & block); void sendLogData(const Block & block);
void sendTableColumns(const ColumnsDescription & columns);
void sendException(const Exception & e, bool with_stack_trace); void sendException(const Exception & e, bool with_stack_trace);
void sendProgress(); void sendProgress();
void sendLogs(); void sendLogs();

View File

@ -604,6 +604,10 @@ Connection::Packet Connection::receivePacket()
res.block = receiveLogData(); res.block = receiveLogData();
return res; return res;
case Protocol::Server::TableColumns:
res.multistring_message = receiveMultistringMessage(res.type);
return res;
case Protocol::Server::EndOfStream: case Protocol::Server::EndOfStream:
return res; return res;
@ -713,6 +717,16 @@ std::unique_ptr<Exception> Connection::receiveException()
} }
std::vector<String> Connection::receiveMultistringMessage(UInt64 msg_type)
{
size_t num = Protocol::Server::wordsInMessage(msg_type);
std::vector<String> out(num);
for (size_t i = 0; i < num; ++i)
readStringBinary(out[i], *in);
return out;
}
Progress Connection::receiveProgress() Progress Connection::receiveProgress()
{ {
//LOG_TRACE(log_wrapper.get(), "Receiving progress"); //LOG_TRACE(log_wrapper.get(), "Receiving progress");

View File

@ -1,5 +1,7 @@
#pragma once #pragma once
#include <optional>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Poco/Net/StreamSocket.h> #include <Poco/Net/StreamSocket.h>
@ -96,6 +98,7 @@ public:
Block block; Block block;
std::unique_ptr<Exception> exception; std::unique_ptr<Exception> exception;
std::vector<String> multistring_message;
Progress progress; Progress progress;
BlockStreamProfileInfo profile_info; BlockStreamProfileInfo profile_info;
@ -259,6 +262,7 @@ private:
Block receiveLogData(); Block receiveLogData();
Block receiveDataImpl(BlockInputStreamPtr & stream); Block receiveDataImpl(BlockInputStreamPtr & stream);
std::vector<String> receiveMultistringMessage(UInt64 msg_type);
std::unique_ptr<Exception> receiveException(); std::unique_ptr<Exception> receiveException();
Progress receiveProgress(); Progress receiveProgress();
BlockStreamProfileInfo receiveProfileInfo(); BlockStreamProfileInfo receiveProfileInfo();

View File

@ -69,7 +69,8 @@ namespace Protocol
Totals = 7, /// A block with totals (compressed or not). Totals = 7, /// A block with totals (compressed or not).
Extremes = 8, /// A block with minimums and maximums (compressed or not). Extremes = 8, /// A block with minimums and maximums (compressed or not).
TablesStatusResponse = 9, /// A response to TablesStatus request. TablesStatusResponse = 9, /// A response to TablesStatus request.
Log = 10 /// System logs of the query execution Log = 10, /// System logs of the query execution
TableColumns = 11, /// Columns' description for default values calculation
}; };
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10 /// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10
@ -78,11 +79,24 @@ namespace Protocol
/// See https://www.securecoding.cert.org/confluence/display/cplusplus/INT36-CPP.+Do+not+use+out-of-range+enumeration+values /// See https://www.securecoding.cert.org/confluence/display/cplusplus/INT36-CPP.+Do+not+use+out-of-range+enumeration+values
inline const char * toString(UInt64 packet) inline const char * toString(UInt64 packet)
{ {
static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo", "Totals", "Extremes", "TablesStatusResponse", "Log" }; static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo", "Totals",
return packet < 11 "Extremes", "TablesStatusResponse", "Log", "TableColumns" };
return packet < 12
? data[packet] ? data[packet]
: "Unknown packet"; : "Unknown packet";
} }
inline size_t wordsInMessage(UInt64 msg_type)
{
switch (msg_type)
{
case TableColumns:
return 2;
default:
break;
}
return 0;
}
} }
/// Packet types that client transmits. /// Packet types that client transmits.

View File

@ -5,7 +5,7 @@
#include <DataStreams/BlockIO.h> #include <DataStreams/BlockIO.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h> #include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h> #include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Storages/ColumnDefault.h> #include <Storages/ColumnsDescription.h>
namespace DB namespace DB
{ {
@ -46,9 +46,9 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size); res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size);
auto column_defaults = ColumnDefaultsHelper::loadFromContext(context, ast_insert_query->database, ast_insert_query->table); auto columns_description = ColumnsDescription::loadFromContext(context, ast_insert_query->database, ast_insert_query->table);
if (!column_defaults.empty()) if (columns_description && !columns_description->defaults.empty())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, column_defaults, context); res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, columns_description->defaults, context);
} }
} }

View File

@ -37,7 +37,7 @@ static ASTPtr requiredExpressions(Block & block, const NamesAndTypesList & requi
void evaluateMissingDefaults(Block & block, void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns, const NamesAndTypesList & required_columns,
const ColumnDefaults & column_defaults, const ColumnDefaults & column_defaults,
const Context & context, bool save_unneded_columns) const Context & context, bool save_unneeded_columns)
{ {
if (column_defaults.empty()) if (column_defaults.empty())
return; return;
@ -46,7 +46,7 @@ void evaluateMissingDefaults(Block & block,
if (!default_expr_list) if (!default_expr_list)
return; return;
if (!save_unneded_columns) if (!save_unneeded_columns)
{ {
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(default_expr_list, block.getNamesAndTypesList()); auto syntax_result = SyntaxAnalyzer(context, {}).analyze(default_expr_list, block.getNamesAndTypesList());
ExpressionAnalyzer{default_expr_list, syntax_result, context}.getActions(true)->execute(block); ExpressionAnalyzer{default_expr_list, syntax_result, context}.getActions(true)->execute(block);

View File

@ -12,10 +12,9 @@ class Context;
class NamesAndTypesList; class NamesAndTypesList;
struct ColumnDefault; struct ColumnDefault;
///
void evaluateMissingDefaults(Block & block, void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns, const NamesAndTypesList & required_columns,
const std::unordered_map<std::string, ColumnDefault> & column_defaults, const std::unordered_map<std::string, ColumnDefault> & column_defaults,
const Context & context, bool save_unneded_columns = true); const Context & context, bool save_unneeded_columns = true);
} }

View File

@ -54,73 +54,4 @@ bool operator==(const ColumnDefault & lhs, const ColumnDefault & rhs)
return lhs.kind == rhs.kind && queryToString(lhs.expression) == queryToString(rhs.expression); return lhs.kind == rhs.kind && queryToString(lhs.expression) == queryToString(rhs.expression);
} }
ColumnDefaults ColumnDefaultsHelper::loadFromContext(const Context & context, const String & database, const String & table)
{
if (context.getSettingsRef().insert_sample_with_metadata)
{
if (!context.isTableExist(database, table))
return {};
StoragePtr storage = context.getTable(database, table);
const ColumnsDescription & table_columns = storage->getColumns();
return table_columns.defaults;
}
return {};
}
void ColumnDefaultsHelper::attach(const ColumnDefaults & column_defaults, Block & sample)
{
if (column_defaults.empty())
return;
for (auto pr : column_defaults)
{
std::stringstream ss;
ss << *pr.second.expression;
/// Serialize defaults to special columns names.
/// It looks better to send expression as a column data but sample block has 0 rows.
ColumnWithTypeAndName col;
col.type = std::make_shared<DataTypeString>();
col.name = Block::mkSpecialColumnName(toString(pr.second.kind) + ' ' + pr.first + ' ' + ss.str());
col.column = col.type->createColumnConst(sample.rows(), "");
sample.insert(std::move(col));
}
}
ColumnDefaults ColumnDefaultsHelper::extract(Block & sample)
{
ParserTernaryOperatorExpression parser;
ColumnDefaults column_defaults;
std::set<size_t> pos_to_erase;
for (size_t i = 0; i < sample.columns(); ++i)
{
const ColumnWithTypeAndName & column_wtn = sample.safeGetByPosition(i);
if (Block::isSpecialColumnName(column_wtn.name, AliasNames::DEFAULT) ||
Block::isSpecialColumnName(column_wtn.name, AliasNames::MATERIALIZED) ||
Block::isSpecialColumnName(column_wtn.name, AliasNames::ALIAS))
{
String str_kind, column_name;
std::stringstream ss;
ss << column_wtn.name;
ss >> str_kind >> column_name;
size_t expression_pos = str_kind.size() + column_name.size() + 3;
StringRef expression(&column_wtn.name[expression_pos], column_wtn.name.size() - expression_pos);
ColumnDefault def;
def.kind = columnDefaultKindFromString(str_kind);
def.expression = parseQuery(parser, expression.data, expression.size);
column_defaults.emplace(column_name, def);
pos_to_erase.insert(i);
}
}
sample.erase(pos_to_erase);
return column_defaults;
}
} }

View File

@ -36,13 +36,4 @@ bool operator==(const ColumnDefault & lhs, const ColumnDefault & rhs);
using ColumnDefaults = std::unordered_map<std::string, ColumnDefault>; using ColumnDefaults = std::unordered_map<std::string, ColumnDefault>;
/// Static methods to manipulate column defaults
struct ColumnDefaultsHelper
{
static void attach(const ColumnDefaults & column_defaults, Block & sample);
static ColumnDefaults extract(Block & sample);
static ColumnDefaults loadFromContext(const Context & context, const String & database, const String & table);
};
} }

View File

@ -10,6 +10,8 @@
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Storages/IStorage.h>
#include <ext/collection_cast.h> #include <ext/collection_cast.h>
#include <ext/map.h> #include <ext/map.h>
@ -162,4 +164,20 @@ ColumnsDescription ColumnsDescription::parse(const String & str)
return result; return result;
} }
const ColumnsDescription * ColumnsDescription::loadFromContext(const Context & context, const String & db, const String & table)
{
if (context.getSettingsRef().insert_sample_with_metadata)
{
auto db_and_table = context.getInsertionTable();
if (context.isTableExist(db, table))
{
StoragePtr storage = context.getTable(db, table);
return &storage->getColumns();
}
}
return nullptr;
}
} }

View File

@ -57,6 +57,7 @@ struct ColumnsDescription
String toString() const; String toString() const;
static ColumnsDescription parse(const String & str); static ColumnsDescription parse(const String & str);
static const ColumnsDescription * loadFromContext(const Context & context, const String & db, const String & table);
}; };
} }