restore lost commits [CLICKHOUSE-3578]

This commit is contained in:
chertus 2018-11-12 17:52:30 +03:00
commit 2fe3cdb3f5
36 changed files with 393 additions and 158 deletions

View File

@ -25,6 +25,7 @@ if (ENABLE_CAPNP)
if (NOT USE_INTERNAL_CAPNP_LIBRARY)
set (CAPNP_PATHS "/usr/local/opt/capnp/lib")
set (CAPNP_BIN_PATH "/usr/bin:/usr/local/bin")
set (CAPNP_INCLUDE_PATHS "/usr/local/opt/capnp/include")
find_library (CAPNP capnp PATHS ${CAPNP_PATHS})
find_library (CAPNPC capnpc PATHS ${CAPNP_PATHS})
@ -40,6 +41,8 @@ if (ENABLE_CAPNP)
set (CAPNP_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/capnproto/c++/src")
set (CAPNP_LIBRARY capnpc)
set (USE_CAPNP 1)
set (CAPNP_BIN_PATH ${ClickHouse_BINARY_DIR}/contrib/capnproto/c++/src/capnp)
set (CAPNP_BIN_TARGETS capnp_tool capnpc_cpp capnpc_capnp)
endif ()
endif ()

View File

@ -240,6 +240,7 @@ if (USE_CAPNP)
if (NOT USE_INTERNAL_CAPNP_LIBRARY)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${CAPNP_INCLUDE_DIR})
endif ()
set (PROTO_LIB clickhouse_proto)
endif ()
if (USE_RDKAFKA)

View File

@ -1,5 +1,5 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54409 CACHE STRING "")
set(VERSION_REVISION 54410 CACHE STRING "") # changed manually for tests
set(VERSION_MAJOR 18 CACHE STRING "")
set(VERSION_MINOR 14 CACHE STRING "")
set(VERSION_PATCH 9 CACHE STRING "")

View File

@ -1,5 +1,6 @@
add_library (clickhouse-client-lib ${LINK_MODE} Client.cpp)
target_link_libraries (clickhouse-client-lib clickhouse_common_io clickhouse_functions clickhouse_aggregate_functions ${LINE_EDITING_LIBS} ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_link_libraries (clickhouse-client-lib clickhouse_common_io clickhouse_functions clickhouse_aggregate_functions
${PROTO_LIB} ${LINE_EDITING_LIBS} ${Boost_PROGRAM_OPTIONS_LIBRARY})
if (READLINE_INCLUDE_DIR)
target_include_directories (clickhouse-client-lib SYSTEM PRIVATE ${READLINE_INCLUDE_DIR})
endif ()

View File

@ -62,6 +62,7 @@
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Proto/protoHelpers.h>
#include <Storages/TableMetadata.h>
#if USE_READLINE
#include "Suggest.h" // Y_IGNORE
@ -894,11 +895,12 @@ private:
/// Receive description of table structure.
Block sample;
if (receiveSampleBlock(sample))
TableMetadata table_meta(parsed_insert_query.database, parsed_insert_query.table);
if (receiveSampleBlock(sample, table_meta))
{
/// If structure was received (thus, server has not thrown an exception),
/// send our data with that structure.
sendData(sample);
sendData(sample, table_meta);
receiveEndOfQuery();
}
}
@ -936,7 +938,7 @@ private:
}
void sendData(Block & sample)
void sendData(Block & sample, const TableMetadata & table_meta)
{
/// If INSERT data must be sent.
const ASTInsertQuery * parsed_insert_query = typeid_cast<const ASTInsertQuery *>(&*parsed_query);
@ -947,42 +949,34 @@ private:
{
/// Send data contained in the query.
ReadBufferFromMemory data_in(parsed_insert_query->data, parsed_insert_query->end - parsed_insert_query->data);
sendDataFrom(data_in, sample);
sendDataFrom(data_in, sample, table_meta);
}
else if (!is_interactive)
{
/// Send data read from stdin.
sendDataFrom(std_in, sample);
sendDataFrom(std_in, sample, table_meta);
}
else
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
}
void sendDataFrom(ReadBuffer & buf, Block & sample)
void sendDataFrom(ReadBuffer & buf, Block & sample, const TableMetadata & table_meta)
{
String current_format = insert_format;
ColumnDefaults column_defaults;
/// Data format can be specified in the INSERT query.
if (ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(&*parsed_query))
{
if (!insert->format.empty())
current_format = insert->format;
if (context.isTableExist(insert->database, insert->table))
{
StoragePtr table = context.getTable(insert->database, insert->table);
if (table)
column_defaults = table->getColumns().defaults;
}
}
BlockInputStreamPtr block_input = context.getInputFormat(
current_format, buf, sample, insert_format_max_block_size);
BlockInputStreamPtr defs_block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context);
BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(defs_block_input);
const ColumnDefaults & column_defaults = table_meta.column_defaults;
if (!column_defaults.empty())
block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context);
BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(block_input);
async_block_input->readPrefix();
@ -1119,7 +1113,7 @@ private:
/// Receive the block that serves as an example of the structure of table where data will be inserted.
bool receiveSampleBlock(Block & out)
bool receiveSampleBlock(Block & out, TableMetadata & table_meta)
{
while (true)
{
@ -1131,6 +1125,12 @@ private:
out = packet.block;
return true;
case Protocol::Server::CapnProto:
#if USE_CAPNP
loadTableMetadata(packet.block, table_meta);
#endif
return receiveSampleBlock(packet.block, table_meta);
case Protocol::Server::Exception:
onException(*packet.exception);
last_exception = std::move(packet.exception);
@ -1170,10 +1170,6 @@ private:
onLogData(packet.block);
break;
case Protocol::Server::CapnProto:
loadContext(packet.block.getColumnsWithTypeAndName()[0], context);
return receiveSampleBlock(packet.block);
default:
throw NetException("Unexpected packet from server (expected Exception, EndOfStream or Log, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);

View File

@ -31,6 +31,8 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Core/ExternalTable.h>
#include <Proto/protoHelpers.h>
#include "TCPHandler.h"
@ -359,6 +361,20 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
*/
state.io.out->writePrefix();
#if USE_CAPNP
/// Send table metadata (column defaults)
if (client_revision >= DBMS_MIN_REVISION_WITH_PROTO_METADATA &&
query_context.getSettingsRef().insert_sample_with_metadata)
{
TableMetadata table_meta(query_context.getCurrentDatabase(), query_context.getCurrentTable());
if (table_meta.loadFromContext(query_context) && table_meta.hasDefaults())
{
Block meta_block = storeTableMetadata(table_meta);
sendMetadata(meta_block);
}
}
#endif
/// Send block to the client - table structure.
Block block = state.io.out->getHeader();
sendData(block);
@ -844,6 +860,19 @@ void TCPHandler::sendLogData(const Block & block)
}
void TCPHandler::sendMetadata(const Block & block)
{
initBlockOutput(block);
writeVarUInt(Protocol::Server::CapnProto, *out);
writeStringBinary("", *out);
state.block_out->write(block);
state.maybe_compressed_out->next();
out->next();
}
void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
{
writeVarUInt(Protocol::Server::Exception, *out);

View File

@ -144,6 +144,7 @@ private:
void sendHello();
void sendData(const Block & block); /// Write a block to the network.
void sendLogData(const Block & block);
void sendMetadata(const Block & block);
void sendException(const Exception & e, bool with_stack_trace);
void sendProgress();
void sendLogs();

View File

@ -575,6 +575,7 @@ Connection::Packet Connection::receivePacket()
switch (res.type)
{
case Protocol::Server::Data:
case Protocol::Server::CapnProto:
res.block = receiveData();
return res;

View File

@ -35,6 +35,8 @@ private:
public:
BlockInfo info;
/// Input stream could use delayed_defaults to add addition info at which rows it have inserted default values.
/// Such values would be replaced later by column defaults in AddingDefaultsBlockInputStream (if any).
BlockDelayedDefaults delayed_defaults;
Block() = default;

View File

@ -60,14 +60,14 @@ void BlockInfo::read(ReadBuffer & in)
void BlockDelayedDefaults::setBit(size_t column_idx, size_t row_idx)
{
BitMask & mask = columns_defaults[column_idx];
RowsBitMask & mask = columns_defaults[column_idx];
mask.resize(row_idx + 1);
mask[row_idx] = true;
}
const BlockDelayedDefaults::BitMask & BlockDelayedDefaults::getColumnBitmask(size_t column_idx) const
const BlockDelayedDefaults::RowsBitMask & BlockDelayedDefaults::getDefaultsBitmask(size_t column_idx) const
{
static BitMask none;
static RowsBitMask none;
auto it = columns_defaults.find(column_idx);
if (it != columns_defaults.end())
return it->second;

View File

@ -45,22 +45,24 @@ struct BlockInfo
void read(ReadBuffer & in);
};
/// Block extention to support delayed defaults.
/// It's expected that it would be lots unset defaults or none.
/// NOTE It's possible to make better solution for sparse values.
/// Block extention to support delayed defaults. Used in AddingDefaultsBlockInputStream to replace type defauls set by RowInputStream
/// with column defaults.
class BlockDelayedDefaults
{
public:
using BitMask = std::vector<bool>;
using MaskById = std::unordered_map<size_t, BitMask>;
using RowsBitMask = std::vector<bool>; /// a bit per row for a column
const BitMask & getColumnBitmask(size_t column_idx) const;
const RowsBitMask & getDefaultsBitmask(size_t column_idx) const;
void setBit(size_t column_idx, size_t row_idx);
bool empty() const { return columns_defaults.empty(); }
size_t size() const { return columns_defaults.size(); }
private:
MaskById columns_defaults;
using RowsMaskByColumnId = std::unordered_map<size_t, RowsBitMask>;
/// If columns_defaults[column_id][row_id] is true related value in Block should be replaced with column default.
/// It could contain less columns and rows then related block.
RowsMaskByColumnId columns_defaults;
};
}

View File

@ -51,6 +51,7 @@
/// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules
/// (keys will be placed in different buckets and result will not be fully aggregated).
#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54408
#define DBMS_MIN_REVISION_WITH_PROTO_METADATA 54410
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54226

View File

@ -6,7 +6,6 @@
#include <Columns/ColumnArray.h>
#include <Interpreters/evaluateMissingDefaults.h>
#include <Core/Block.h>
#include <Interpreters/evaluateMissingDefaults.h>
namespace DB

View File

@ -43,12 +43,15 @@ Block AddingDefaultsBlockInputStream::readImpl()
Block evaluate_block{res};
for (const auto & column : column_defaults)
evaluate_block.erase(column.first);
{
/// column_defaults contain aliases that could be ommited in evaluate_block
if (evaluate_block.has(column.first))
evaluate_block.erase(column.first);
}
evaluateMissingDefaultsUnsafe(evaluate_block, header.getNamesAndTypesList(), column_defaults, context);
ColumnsWithTypeAndName mixed_columns;
mixed_columns.reserve(std::min(column_defaults.size(), delayed_defaults.size()));
std::unordered_map<size_t, MutableColumnPtr> mixed_columns;
for (const ColumnWithTypeAndName & column_def : evaluate_block)
{
@ -63,26 +66,39 @@ Block AddingDefaultsBlockInputStream::readImpl()
if (column_read.column->size() != column_def.column->size())
throw Exception("Mismach column sizes while adding defaults", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
const BlockDelayedDefaults::BitMask & mask = delayed_defaults.getColumnBitmask(block_column_position);
MutableColumnPtr column_mixed = column_read.column->cloneEmpty();
for (size_t row_idx = 0; row_idx < column_read.column->size(); ++row_idx)
const auto & defaults_mask = delayed_defaults.getDefaultsBitmask(block_column_position);
if (!defaults_mask.empty())
{
if (mask[row_idx])
column_mixed->insertFrom(*column_def.column, row_idx);
else
column_mixed->insertFrom(*column_read.column, row_idx);
}
MutableColumnPtr column_mixed = column_read.column->cloneEmpty();
ColumnWithTypeAndName mix = column_read.cloneEmpty();
mix.column = std::move(column_mixed);
mixed_columns.emplace_back(std::move(mix));
for (size_t row_idx = 0; row_idx < column_read.column->size(); ++row_idx)
{
if (row_idx < defaults_mask.size() && defaults_mask[row_idx])
{
if (column_def.column->isColumnConst())
column_mixed->insert((*column_def.column)[row_idx]);
else
column_mixed->insertFrom(*column_def.column, row_idx);
}
else
column_mixed->insertFrom(*column_read.column, row_idx);
}
mixed_columns.emplace(std::make_pair(block_column_position, std::move(column_mixed)));
}
}
for (auto & column : mixed_columns)
if (!mixed_columns.empty())
{
res.erase(column.name);
res.insert(std::move(column));
/// replace columns saving block structure
MutableColumns mutation = res.mutateColumns();
for (size_t position = 0; position < mutation.size(); ++position)
{
auto it = mixed_columns.find(position);
if (it != mixed_columns.end())
mutation[position] = std::move(it->second);
}
res.setColumns(std::move(mutation));
}
return res;

View File

@ -4,7 +4,8 @@
#include <IO/ReadBufferFromMemory.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Storages/TableMetadata.h>
namespace DB
{
@ -44,6 +45,12 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size);
TableMetadata table_meta(ast_insert_query->database, ast_insert_query->table);
table_meta.loadFromContext(context);
if (!table_meta.column_defaults.empty())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, table_meta.column_defaults, context);
}
}

View File

@ -19,7 +19,6 @@ namespace ErrorCodes
extern const int TOO_LARGE_STRING_SIZE;
extern const int CANNOT_READ_ALL_DATA;
extern const int INCORRECT_DATA;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
}
@ -53,7 +52,6 @@ Block BlockInputStreamFromRowInputStream::readImpl()
{
size_t num_columns = sample.columns();
MutableColumns columns = sample.cloneEmptyColumns();
BlockDelayedDefaults delayed_defaults;
try
{
@ -62,19 +60,8 @@ Block BlockInputStreamFromRowInputStream::readImpl()
try
{
++total_rows;
RowReadExtention info;
if (!row_input->extendedRead(columns, info))
if (!row_input->read(columns))
break;
for (size_t column_idx = 0; column_idx < info.read_columns.size(); ++column_idx)
{
if (!info.read_columns[column_idx]) {
size_t column_size = columns[column_idx]->size();
if (column_size == 0)
throw Exception("Unexpected empty column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
delayed_defaults.setBit(column_idx, column_size - 1);
}
}
}
catch (Exception & e)
{
@ -143,10 +130,7 @@ Block BlockInputStreamFromRowInputStream::readImpl()
if (columns.empty() || columns[0]->empty())
return {};
auto out_block = sample.cloneWithColumns(std::move(columns));
if (!delayed_defaults.empty())
out_block.delayed_defaults = std::move(delayed_defaults);
return out_block;
return sample.cloneWithColumns(std::move(columns));
}

View File

@ -10,11 +10,12 @@
namespace DB
{
/// A way to set some extentions to read and return extra information too.
/// A way to set some extentions to read and return extra information too. IRowInputStream.extendedRead() output.
struct RowReadExtention
{
/// IRowInputStream.extendedRead() output value.
/// Contains true for columns that actually read from the source and false for defaults
/// Contains one bit per column in resently read row. IRowInputStream could leave it empty, or partialy set.
/// It should contain true for columns that actually read from the source and false for defaults.
std::vector<UInt8> read_columns;
};

View File

@ -1014,6 +1014,12 @@ String Context::getCurrentDatabase() const
}
String Context::getCurrentTable() const
{
return current_table;
}
String Context::getCurrentQueryId() const
{
return client_info.current_query_id;
@ -1028,6 +1034,15 @@ void Context::setCurrentDatabase(const String & name)
}
void Context::setCurrentTable(const String & database, const String & table)
{
auto lock = getLock();
assertTableExists(database, table);
current_database = database;
current_table = table;
}
void Context::setCurrentQueryId(const String & query_id)
{
if (!client_info.current_query_id.empty())

View File

@ -118,6 +118,7 @@ private:
std::shared_ptr<QuotaForIntervals> quota; /// Current quota. By default - empty quota, that have no limits.
String current_database;
String current_table;
Settings settings; /// Setting for query execution.
using ProgressCallback = std::function<void(const Progress & progress)>;
ProgressCallback progress_callback; /// Callback for tracking progress of query execution.
@ -228,8 +229,10 @@ public:
std::unique_ptr<DDLGuard> getDDLGuard(const String & database, const String & table) const;
String getCurrentDatabase() const;
String getCurrentTable() const;
String getCurrentQueryId() const;
void setCurrentDatabase(const String & name);
void setCurrentTable(const String & database, const String & table);
void setCurrentQueryId(const String & query_id);
String getDefaultFormat() const; /// If default_format is not specified, some global default format is returned.

View File

@ -159,4 +159,11 @@ void InterpreterInsertQuery::checkAccess(const ASTInsertQuery & query)
throw Exception("Cannot insert into table in readonly mode", ErrorCodes::READONLY);
}
void InterpreterInsertQuery::getDatabaseTable(String & database, String & table) const
{
ASTInsertQuery & query = typeid_cast<ASTInsertQuery &>(*query_ptr);
database = query.database;
table = query.table;
}
}

View File

@ -24,6 +24,8 @@ public:
*/
BlockIO execute() override;
void getDatabaseTable(String & database, String & table) const;
private:
StoragePtr getTable(const ASTInsertQuery & query);
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table);

View File

@ -125,6 +125,7 @@ struct Settings
M(SettingUInt64, max_concurrent_queries_for_user, 0, "The maximum number of concurrent requests per user.") \
\
M(SettingBool, insert_deduplicate, true, "For INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be preformed") \
M(SettingBool, insert_sample_with_metadata, true, "For INSERT queries, specifies that the server need to send metadata about column defaults to the client. This will be used to calculate default expressions.") \
\
M(SettingUInt64, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.") \
M(SettingMilliseconds, insert_quorum_timeout, 600000, "") \

View File

@ -204,6 +204,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto interpreter = InterpreterFactory::get(ast, context, stage);
res = interpreter->execute();
if (InterpreterInsertQuery * insertInterpreter = typeid_cast<InterpreterInsertQuery *>(&*interpreter))
{
String database;
String table_name;
insertInterpreter->getDatabaseTable(database, table_name);
if (!database.empty())
context.setCurrentTable(database, table_name);
}
if (process_list_entry)
{

View File

@ -1,10 +1,7 @@
set (CAPNP_PATH ${CMAKE_BINARY_DIR}/contrib/capnproto/c++/src/capnp)
set (CAPNP_BIN ${CAPNP_PATH}/capnp)
add_custom_command (OUTPUT ServerMessage.capnp.c++ ServerMessage.capnp.h
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/ServerMessage.capnp ${CMAKE_CURRENT_BINARY_DIR}/ServerMessage.capnp
COMMAND ${CMAKE_COMMAND} -E env PATH=${CAPNP_PATH} ${CAPNP_BIN} compile -I ${CAPNP_INCLUDE_DIR} -oc++ ServerMessage.capnp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR} DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/ServerMessage.capnp)
COMMAND ${CMAKE_COMMAND} -E env PATH=${CAPNP_BIN_PATH} capnp compile -I ${CAPNP_INCLUDE_DIR} -oc++ ServerMessage.capnp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR} DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/ServerMessage.capnp ${CAPNP_BIN_TARGETS})
add_library (clickhouse_proto ServerMessage.capnp.c++ protoHelpers.cpp)
target_link_libraries (clickhouse_proto clickhouse_common_io ${CAPNP_LIBRARY})

View File

@ -3,96 +3,166 @@
#include <Interpreters/Context.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
#include <Storages/TableMetadata.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <ServerMessage.capnp.h>
#include <capnp/serialize.h>
#include <boost/algorithm/string.hpp>
#include <boost/range/join.hpp>
#include <common/logger_useful.h>
#include <sstream>
/// @sa https://capnproto.org/cxx.html
namespace DB
{
ColumnWithTypeAndName storeContext(Context & context)
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static MutableColumnPtr serializeProto(capnp::MessageBuilder & message)
{
MutableColumnPtr data = DataTypeUInt8().createColumn();
kj::Array<capnp::word> serialized = messageToFlatArray(message);
kj::ArrayPtr<const char> bytes = serialized.asChars();
data->reserve(bytes.size());
for (size_t i = 0 ; i < bytes.size(); ++i)
data->insertData(&bytes[i], 1);
return data;
}
///
template <typename T>
class ProtoDeserializer
{
public:
ProtoDeserializer(const char * data, size_t data_size)
: serialized(kj::arrayPtr(reinterpret_cast<const capnp::word *>(data), data_size / sizeof(capnp::word))),
reader(serialized)
{}
typename T::Reader getReader() { return reader.getRoot<T>(); }
private:
kj::ArrayPtr<const capnp::word> serialized;
capnp::FlatArrayMessageReader reader;
};
static MutableColumnPtr storeTableMeta(const TableMetadata & meta)
{
if (meta.database.empty() || meta.table.empty())
throw Exception("storeTableMeta: table is not set", ErrorCodes::LOGICAL_ERROR);
capnp::MallocMessageBuilder message;
Proto::Context::Builder proto_context = message.initRoot<Proto::Context>();
Databases dbs = context.getDatabases();
auto proto_databases = proto_context.initDatabases(dbs.size());
auto proto_databases = proto_context.initDatabases(1);
auto proto_db = proto_databases[0];
proto_db.setName(meta.database);
size_t db_nomber = 0;
for (auto & pr_db : dbs)
auto proto_db_tables = proto_db.initTables(1);
auto proto_table = proto_db_tables[0];
proto_table.setName(meta.table);
auto proto_columns = proto_table.initColumns(meta.column_defaults.size());
size_t column_no = 0;
for (const auto & pr_column : meta.column_defaults)
{
const String& db_name = pr_db.first;
IDatabase& db = *pr_db.second;
const String & column_name = pr_column.first;
const ColumnDefault & def = pr_column.second;
std::stringstream ss;
ss << def.expression;
auto proto_db = proto_databases[db_nomber];
proto_db.setName(db_name);
auto current_column = proto_columns[column_no];
current_column.setName(column_name);
current_column.getDefault().setKind(static_cast<UInt16>(def.kind));
current_column.getDefault().setExpression(ss.str());
std::unordered_map<String, StoragePtr> tables;
DatabaseIteratorPtr it_tables = db.getIterator(context);
while (it_tables->isValid())
{
tables[it_tables->name()] = it_tables->table();
it_tables->next();
}
auto proto_tables = proto_db.initTables(tables.size());
size_t table_no = 0;
for (const auto & pr_table : tables)
{
auto current_table = proto_tables[table_no];
current_table.setName(pr_table.first);
const ColumnsDescription & columns = pr_table.second->getColumns();
auto proto_columns = current_table.initColumns(columns.defaults.size());
size_t column_no = 0;
for (const auto& pr_column : columns.defaults)
{
const String & column_name = pr_column.first;
const ColumnDefault & def = pr_column.second;
std::stringstream ss;
ss << def.expression;
auto current_column = proto_columns[column_no];
current_column.setName(column_name);
current_column.getDefault().setKind(static_cast<UInt16>(def.kind));
current_column.getDefault().setExpression(ss.str());
++column_no;
}
++table_no;
}
++db_nomber;
++column_no;
}
ColumnWithTypeAndName proto_column;
proto_column.name = "context";
proto_column.type = std::make_shared<DataTypeUInt64>();
MutableColumnPtr data = proto_column.type->createColumn();
kj::Array<capnp::word> serialized = messageToFlatArray(message);
data->insertData(reinterpret_cast<const char *>(serialized.begin()), serialized.size() * sizeof(capnp::word));
proto_column.column = std::move(data);
return proto_column;
return serializeProto(message);
}
void loadContext(const ColumnWithTypeAndName & , Context & )
{
#if 0
kj::Array<word> messageToFlatArray(MessageBuilder& builder);
capnp::MallocMessageBuilder message;
Proto::ServerMessage::Builder serverMessage = message.initRoot<Proto::ServerMessage>();
/// TODO
#endif
static void loadTableMeta(const char * data, size_t data_size, TableMetadata & table_meta)
{
if (data == nullptr || data_size == 0)
throw Exception("loadTableMeta: empty metadata column", ErrorCodes::LOGICAL_ERROR);
ProtoDeserializer<Proto::Context> deserializer(data, data_size);
Proto::Context::Reader proto_context = deserializer.getReader();
ParserTernaryOperatorExpression parser;
for (auto proto_database : proto_context.getDatabases())
{
const String & database_name = proto_database.getName().cStr();
if (database_name != table_meta.database)
continue;
for (auto proto_table : proto_database.getTables())
{
String table_name = proto_table.getName().cStr();
if (table_name != table_meta.table)
continue;
for (auto column : proto_table.getColumns())
{
String column_name = column.getName().cStr();
String expression = column.getDefault().getExpression().cStr();
ColumnDefaultKind expression_kind = static_cast<ColumnDefaultKind>(column.getDefault().getKind());
if (expression_kind == ColumnDefaultKind::Default)
{
ASTPtr ast = parseQuery(parser, expression, expression.size());
table_meta.column_defaults.emplace(column_name, ColumnDefault{expression_kind, ast});
}
}
}
}
}
static constexpr const char * tableMetaColumnName()
{
return "tableMeta";
}
Block storeTableMetadata(const TableMetadata & table_meta)
{
ColumnWithTypeAndName proto_column;
proto_column.name = tableMetaColumnName();
proto_column.type = std::make_shared<DataTypeUInt8>();
proto_column.column = std::move(storeTableMeta(table_meta));
Block block;
block.insert(std::move(proto_column));
return block;
}
void loadTableMetadata(const Block & block, TableMetadata & table_meta)
{
/// select metadata type by column name
if (block.has(tableMetaColumnName()))
{
const ColumnWithTypeAndName & column = block.getByName(tableMetaColumnName());
StringRef raw_data = column.column->getRawData();
loadTableMeta(raw_data.data, raw_data.size, table_meta);
}
}
}

View File

@ -1,11 +1,14 @@
#pragma once
#if USE_CAPNP
namespace DB
{
class Context;
struct ColumnWithTypeAndName;
class Block;
struct TableMetadata;
ColumnWithTypeAndName storeContext(Context & context);
void loadContext(const ColumnWithTypeAndName & proto_column, Context & context);
Block storeTableMetadata(const TableMetadata & table_meta);
void loadTableMetadata(const Block & block, TableMetadata & table_meta);
}
#endif

View File

@ -167,6 +167,9 @@ StorageDistributed::StorageDistributed(
if (num_local_shards && remote_database == database_name && remote_table == table_name)
throw Exception("Distributed table " + table_name + " looks at itself", ErrorCodes::INFINITE_LOOP);
}
/// HACK: disable metadata for StorageDistributed queries
const_cast<Context &>(context).getSettingsRef().insert_sample_with_metadata = false;
}

View File

@ -14,6 +14,7 @@
#include <Formats/FormatFactory.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
@ -194,7 +195,11 @@ BlockInputStreams StorageFile::read(
size_t max_block_size,
unsigned /*num_streams*/)
{
return BlockInputStreams(1, std::make_shared<StorageFileBlockInputStream>(*this, context, max_block_size));
BlockInputStreamPtr block_input = std::make_shared<StorageFileBlockInputStream>(*this, context, max_block_size);
const ColumnsDescription & columns = getColumns();
if (columns.defaults.empty())
return {block_input};
return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, columns.defaults, context)};
}

View File

@ -12,6 +12,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Poco/Net/HTTPRequest.h>
@ -164,7 +165,7 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
return {std::make_shared<StorageURLBlockInputStream>(request_uri,
BlockInputStreamPtr block_input = std::make_shared<StorageURLBlockInputStream>(request_uri,
getReadMethod(),
getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size),
format_name,
@ -172,7 +173,13 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
getHeaderBlock(column_names),
context,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))};
ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()));
const ColumnsDescription & columns = getColumns();
if (columns.defaults.empty())
return {block_input};
return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, columns.defaults, context)};
}
void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) {}

View File

@ -0,0 +1,18 @@
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
#include "TableMetadata.h"
namespace DB
{
bool TableMetadata::loadFromContext(const Context & context)
{
if (!context.isTableExist(database, table))
return false;
StoragePtr storage = context.getTable(database, table);
const ColumnsDescription & table_columns = storage->getColumns();
column_defaults = table_columns.defaults;
return true;
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Storages/ColumnDefault.h>
namespace DB
{
class Context;
class Block;
/// Additional information for query that could not be get from sample block
struct TableMetadata
{
TableMetadata(const String & database_, const String & table_)
: database(database_), table(table_)
{}
const String & database;
const String & table;
ColumnDefaults column_defaults;
bool loadFromContext(const Context & context);
bool hasDefaults() const { return !column_defaults.empty(); }
};
}

View File

@ -0,0 +1,7 @@
0 0 6 6 6
0 5 5 1.7917595 5
1 1 2 1.0986123 42
1 1 2 1.0986123 42
2 2 4 1.609438 2
3 3 3 3 3
4 0 4 1.609438 42

View File

@ -0,0 +1,21 @@
CREATE DATABASE IF NOT EXISTS test;
DROP TABLE IF EXISTS test.defaults;
CREATE TABLE IF NOT EXISTS test.defaults
(
x UInt32,
y UInt32,
a DEFAULT x + y,
b Float32 DEFAULT log(1 + x + y),
c UInt32 DEFAULT 42,
e MATERIALIZED x + y,
f ALIAS x + y
) ENGINE = Memory;
INSERT INTO test.defaults FORMAT JSONEachRow {"x":1, "y":1};
INSERT INTO test.defaults (x, y) SELECT x, y FROM test.defaults LIMIT 1;
INSERT INTO test.defaults FORMAT JSONEachRow {"x":2, "y":2, "c":2};
INSERT INTO test.defaults FORMAT JSONEachRow {"x":3, "y":3, "a":3, "b":3, "c":3};
INSERT INTO test.defaults FORMAT JSONEachRow {"x":4} {"y":5, "c":5} {"a":6, "b":6, "c":6};
SELECT * FROM test.defaults ORDER BY (x, y);
DROP TABLE IF EXISTS test.defaults;

View File

@ -22,5 +22,5 @@ env TEST_RUN=1 \
`# Use all possible contrib libs from system` \
`# psmisc - killall` \
`# gdb - symbol test in pbuilder` \
EXTRAPACKAGES="psmisc gdb clang-6.0 libc++abi-dev libc++-dev libboost-program-options-dev libboost-system-dev libboost-filesystem-dev libboost-thread-dev zlib1g-dev liblz4-dev libdouble-conversion-dev libsparsehash-dev librdkafka-dev libpoco-dev unixodbc-dev libsparsehash-dev libgoogle-perftools-dev libzstd-dev libre2-dev libunwind-dev googletest libcctz-dev libcapnp-dev libjemalloc-dev libssl-dev $EXTRAPACKAGES" \
EXTRAPACKAGES="psmisc gdb clang-6.0 libc++abi-dev libc++-dev libboost-program-options-dev libboost-system-dev libboost-filesystem-dev libboost-thread-dev zlib1g-dev liblz4-dev libdouble-conversion-dev libsparsehash-dev librdkafka-dev libpoco-dev unixodbc-dev libsparsehash-dev libgoogle-perftools-dev libzstd-dev libre2-dev libunwind-dev googletest libcctz-dev libcapnp-dev capnproto libjemalloc-dev libssl-dev $EXTRAPACKAGES" \
pdebuild --configfile $ROOT_DIR/debian/.pbuilderrc $PDEBUILD_OPT

View File

@ -32,7 +32,7 @@ cmake $CUR_DIR/../.. -DCMAKE_CXX_COMPILER=`which $DEB_CXX $CXX` -DCMAKE_C_COMPIL
`# Use all possible contrib libs from system` \
-DUNBUNDLED=1 \
`# Disable all features` \
-DENABLE_CAPNP=0 -DENABLE_RDKAFKA=0 -DENABLE_EMBEDDED_COMPILER=0 -DUSE_INTERNAL_LLVM_LIBRARY=0 -DENABLE_JEMALLOC=0 -DENABLE_UNWIND=0 -DENABLE_MYSQL=0 -DENABLE_POCO_ODBC=0 -DENABLE_ODBC=0 $CMAKE_FLAGS
-DENABLE_RDKAFKA=0 -DENABLE_EMBEDDED_COMPILER=0 -DUSE_INTERNAL_LLVM_LIBRARY=0 -DENABLE_JEMALLOC=0 -DENABLE_UNWIND=0 -DENABLE_MYSQL=0 -DENABLE_POCO_ODBC=0 -DENABLE_ODBC=0 $CMAKE_FLAGS
ninja clickhouse-bundle

View File

@ -24,7 +24,7 @@ env TEST_RUN=${TEST_RUN=1} \
DEB_CC=${DEB_CC=$CC} DEB_CXX=${DEB_CXX=$CXX} \
CCACHE_SIZE=${CCACHE_SIZE:=4G} \
`# Disable all features` \
CMAKE_FLAGS="-DCMAKE_BUILD_TYPE=Debug -DUNBUNDLED=1 -DENABLE_CAPNP=0 -DENABLE_RDKAFKA=0 -DENABLE_JEMALLOC=0 -DENABLE_UNWIND=0 -DENABLE_MYSQL=0 -DENABLE_POCO_ODBC=0 -DENABLE_ODBC=0 -DUSE_INTERNAL_LLVM_LIBRARY=0 -DCMAKE_C_FLAGS_ADD='-O0 -g0' -DCMAKE_CXX_FLAGS_ADD='-O0 -g0' $CMAKE_FLAGS" \
CMAKE_FLAGS="-DCMAKE_BUILD_TYPE=Debug -DUNBUNDLED=1 -DENABLE_RDKAFKA=0 -DENABLE_JEMALLOC=0 -DENABLE_UNWIND=0 -DENABLE_MYSQL=0 -DENABLE_POCO_ODBC=0 -DENABLE_ODBC=0 -DUSE_INTERNAL_LLVM_LIBRARY=0 -DCMAKE_C_FLAGS_ADD='-O0 -g0' -DCMAKE_CXX_FLAGS_ADD='-O0 -g0' $CMAKE_FLAGS" \
`# Use all possible contrib libs from system` \
`# psmisc - killall` \
EXTRAPACKAGES="psmisc clang-5.0 lld-5.0 liblld-5.0-dev libclang-5.0-dev liblld-5.0 libc++abi-dev libc++-dev libboost-program-options-dev libboost-system-dev libboost-filesystem-dev libboost-thread-dev zlib1g-dev liblz4-dev libdouble-conversion-dev libsparsehash-dev librdkafka-dev libpoco-dev libsparsehash-dev libgoogle-perftools-dev libzstd-dev libre2-dev libjemalloc-dev $EXTRAPACKAGES" \