diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index e286a254f0e..fc103493899 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -104,6 +104,16 @@ public: */ void disconnect(); + const std::string & getHost() const + { + return host; + } + + UInt16 getPort() const + { + return port; + } + private: String host; UInt16 port; diff --git a/dbms/include/DB/Common/VirtualColumnUtils.h b/dbms/include/DB/Common/VirtualColumnUtils.h index 79c95e18c67..efad19da951 100644 --- a/dbms/include/DB/Common/VirtualColumnUtils.h +++ b/dbms/include/DB/Common/VirtualColumnUtils.h @@ -13,11 +13,11 @@ namespace DB { -class VirtualColumnUtils +namespace VirtualColumnUtils { -public: + /// Вычислить минимальный числовый суффикс, который надо добавить к строке, чтобы она не присутствовала в множестве -static String chooseSuffix(const NamesAndTypesList & columns, const String & name) +inline String chooseSuffix(const NamesAndTypesList & columns, const String & name) { int id = 0; String current_suffix; @@ -39,7 +39,7 @@ static String chooseSuffix(const NamesAndTypesList & columns, const String & nam /// Вычислить минимальный общий числовый суффикс, который надо добавить к каждой строке, /// чтобы ниодна не присутствовала в множестве. -static String chooseSuffixForSet(const NamesAndTypesList & columns, const std::vector & names) +inline String chooseSuffixForSet(const NamesAndTypesList & columns, const std::vector & names) { int id = 0; String current_suffix; @@ -67,45 +67,19 @@ static String chooseSuffixForSet(const NamesAndTypesList & columns, const std::v return current_suffix; } -/// На данный момент не дописана и не используется. -static void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field &value) +/// Добавляет в селект запрос секцию select clumn_name as value +/// Например select _port as 9000. +inline void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value) { - { - ASTSelectQuery & select = dynamic_cast(*ast); - ASTExpressionList & node = dynamic_cast(*select.select_expression_list); - ASTs & asts = node.children; - ASTLiteral * cur = new ASTLiteral(StringRange(NULL, NULL), value); - cur->alias = column_name; - ASTPtr column_value = cur; - asts.insert(asts.begin(), column_value); - return; - } - - if (ASTExpressionList * node = dynamic_cast(&*ast)) - { - ASTs & asts = node->children; - for (int i = static_cast(asts.size()) - 1; i >= 0; --i) - { - if (ASTIdentifier * child = dynamic_cast(&*asts[i])) - { - if (child->kind == ASTIdentifier::Column && child->getColumnName() == column_name) - { - ASTLiteral * cur = new ASTLiteral(StringRange(NULL, NULL), value); - cur->alias = column_name; - - ASTPtr column_value = cur; - - asts.erase(asts.begin() + i); - asts.insert(asts.begin() + i, column_value); - } - } - } - } - - for (auto it : ast->children) - rewriteEntityInAst(it, column_name, value); + ASTSelectQuery & select = dynamic_cast(*ast); + ASTExpressionList & node = dynamic_cast(*select.select_expression_list); + ASTs & asts = node.children; + ASTLiteral * cur = new ASTLiteral(StringRange(NULL, NULL), value); + cur->alias = column_name; + ASTPtr column_value = cur; + asts.insert(asts.begin(), column_value); } -}; +} } diff --git a/dbms/include/DB/DataStreams/AddingConstColumnBlockInputStream.h b/dbms/include/DB/DataStreams/AddingConstColumnBlockInputStream.h index 539da47c982..6e40c9bdda0 100644 --- a/dbms/include/DB/DataStreams/AddingConstColumnBlockInputStream.h +++ b/dbms/include/DB/DataStreams/AddingConstColumnBlockInputStream.h @@ -42,7 +42,6 @@ protected: return res; ColumnPtr column_ptr = ColumnConst (res.rows(), value, data_type).convertToFullColumn(); ColumnWithNameAndType column(column_ptr, data_type, column_name); - res.insert(column); return res; } diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 2c3c5d965d3..e60c7c8de47 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -3,6 +3,7 @@ #include #include +#include #include @@ -16,8 +17,8 @@ class RemoteBlockInputStream : public IProfilingBlockInputStream { public: RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, - QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) - : connection(connection_), query(query_), stage(stage_), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) + : connection(connection_), query(query_), _host_column(""), _port_column(""), stage(stage_), sent_query(false), finished(false), was_cancelled(false), got_exception_from_server(false), log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")")) { @@ -33,9 +34,24 @@ public: /// Захватывает владение соединением из пула. RemoteBlockInputStream(ConnectionPool::Entry pool_entry_, const String & query_, const Settings * settings_, QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) - : pool_entry(pool_entry_), connection(*pool_entry), query(query_), stage(stage_), - sent_query(false), finished(false), was_cancelled(false), got_exception_from_server(false), - log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")")) + : pool_entry(pool_entry_), connection(*pool_entry), query(query_), _host_column(""), + _port_column(""), stage(stage_), sent_query(false), finished(false), was_cancelled(false), + got_exception_from_server(false), log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")")) + { + if (settings_) + { + send_settings = true; + settings = *settings_; + } + else + send_settings = false; + } + + RemoteBlockInputStream(ConnectionPool::Entry pool_entry_, const String & query_, const Settings * settings_, + const String & _host_column_, const String & _port_column_, QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) + : pool_entry(pool_entry_), connection(*pool_entry), query(query_), _host_column(_host_column_), + _port_column(_port_column_), stage(stage_), sent_query(false), finished(false), was_cancelled(false), + got_exception_from_server(false), log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")")) { if (settings_) { @@ -90,6 +106,22 @@ public: } protected: + void populateBlock(Block & res) + { + if (_host_column != "") + { + ColumnPtr column_ptr = ColumnConst (res.rows(), connection.getHost(), new DataTypeString).convertToFullColumn(); + ColumnWithNameAndType column(column_ptr, new DataTypeString, _host_column); + res.insert(column); + } + if (_port_column != "") + { + ColumnPtr column_ptr = ColumnConst (res.rows(), connection.getPort(), new DataTypeUInt16).convertToFullColumn(); + ColumnWithNameAndType column(column_ptr, new DataTypeUInt16, _port_column); + res.insert(column); + } + } + Block readImpl() { if (!sent_query) @@ -107,7 +139,10 @@ protected: case Protocol::Server::Data: /// Если блок не пуст и не является заголовочным блоком if (packet.block && packet.block.rows() > 0) + { + populateBlock(packet.block); return packet.block; + } break; /// Если блок пустой - получим другие пакеты до EndOfStream. case Protocol::Server::Exception: @@ -214,6 +249,10 @@ private: const String query; bool send_settings; Settings settings; + /// Имя столбца, куда записать имя хоста. Пустая строка, если записывать не надо. + String _host_column; + /// Имя столбца, куда записать номер порта. Пустая строка, если записывать не надо. + String _port_column; QueryProcessingStage::Enum stage; /// Отправили запрос (это делается перед получением первого блока). diff --git a/dbms/include/DB/Storages/StorageDistributed.h b/dbms/include/DB/Storages/StorageDistributed.h index 22c27cc2449..d7b162ad68a 100644 --- a/dbms/include/DB/Storages/StorageDistributed.h +++ b/dbms/include/DB/Storages/StorageDistributed.h @@ -38,6 +38,8 @@ public: bool supportsPrewhere() const { return true; } const NamesAndTypesList & getColumnsList() const { return *columns; } + NameAndTypePair getColumn(const String &column_name) const; + bool hasColumn(const String &column_name) const; bool isRemote() const { return true; } @@ -74,6 +76,9 @@ private: const DataTypeFactory & data_type_factory; String sign_column_name; + String _host_column_name; + String _port_column_name; + const Context & context; Cluster & cluster; }; diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 9029d2cc54c..9ace656e73f 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -29,6 +30,12 @@ StorageDistributed::StorageDistributed( context(context_), cluster(cluster_) { + std::vector virtual_columns; + virtual_columns.push_back("_host"); + virtual_columns.push_back("_port"); + String suffix = VirtualColumnUtils::chooseSuffixForSet(getColumnsList(), virtual_columns); + _host_column_name = virtual_columns[0] + suffix; + _port_column_name = virtual_columns[1] + suffix; } StoragePtr StorageDistributed::create( @@ -46,6 +53,20 @@ StoragePtr StorageDistributed::create( return (new StorageDistributed(name_, columns_, remote_database_, remote_table_, context_.getCluster(cluster_name), data_type_factory_, settings, context_, sign_column_name_))->thisPtr(); } +NameAndTypePair StorageDistributed::getColumn(const String &column_name) const +{ + if (column_name == _host_column_name) return std::make_pair(_host_column_name, new DataTypeString); + if (column_name == _port_column_name) return std::make_pair(_port_column_name, new DataTypeUInt16); + return getRealColumn(column_name); +} + +bool StorageDistributed::hasColumn(const String &column_name) const +{ + if (column_name == _host_column_name) return true; + if (column_name == _port_column_name) return true; + return hasRealColumn(column_name); +} + BlockInputStreams StorageDistributed::read( const Names & column_names, ASTPtr query, @@ -54,30 +75,67 @@ BlockInputStreams StorageDistributed::read( size_t max_block_size, unsigned threads) { + Names virt_column_names(2, ""), real_column_names; + for (const auto & it : column_names) + if (it == _host_column_name) + virt_column_names[0] = _host_column_name; + else if (it == _port_column_name) + virt_column_names[1] = _port_column_name; + else + real_column_names.push_back(it); + processed_stage = (cluster.pools.size() + cluster.getLocalNodesNum()) == 1 ? QueryProcessingStage::Complete : QueryProcessingStage::WithMergeableState; - /// Заменим в запросе имена БД и таблицы. - ASTPtr modified_query_ast = query->clone(); - ASTSelectQuery & select = dynamic_cast(*modified_query_ast); - select.database = new ASTIdentifier(StringRange(), remote_database, ASTIdentifier::Database); - select.table = new ASTIdentifier(StringRange(), remote_table, ASTIdentifier::Table); - /// Установим sign_rewrite = 0, чтобы второй раз не переписывать запрос Settings new_settings = settings; new_settings.sign_rewrite = false; new_settings.queue_max_wait_ms = Cluster::saturation(new_settings.queue_max_wait_ms, settings.limits.max_execution_time); - std::stringstream s; - formatAST(select, s, 0, false, true); - String modified_query = s.str(); - BlockInputStreams res; for (ConnectionPools::iterator it = cluster.pools.begin(); it != cluster.pools.end(); ++it) - res.push_back(new RemoteBlockInputStream((*it)->get(&new_settings), modified_query, &new_settings, processed_stage)); + { + /// Заменим в запросе имена БД и таблицы. + ASTPtr modified_query_ast = query->clone(); + /// Добавляем в запрос значения хоста и порта + String trash_host = (*it)->get()->getHost(); + size_t trash_port = (*it)->get()->getPort(); + VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _host_column_name, trash_host); + VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _port_column_name, trash_port); + + /// Меняем имена таблицы и базы данных + ASTSelectQuery & select = dynamic_cast(*modified_query_ast); + select.database = new ASTIdentifier(StringRange(), remote_database, ASTIdentifier::Database); + select.table = new ASTIdentifier(StringRange(), remote_table, ASTIdentifier::Table); + + std::stringstream s; + formatAST(select, s, 0, false, true); + String modified_query = s.str(); + + res.push_back(new RemoteBlockInputStream((*it)->get(&new_settings), modified_query, &new_settings, virt_column_names[0], virt_column_names[1], processed_stage)); + } + + + /// Заменим в запросе имена БД и таблицы. + ASTPtr modified_query_ast = query->clone(); + + /// Добавляем в запрос значения хоста и порта + String trash_host = "localhost"; + size_t trash_port = 9000; + VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _host_column_name, trash_host); + VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _port_column_name, trash_port); + + /// Меняем имена таблицы и базы данных + ASTSelectQuery & select = dynamic_cast(*modified_query_ast); + select.database = new ASTIdentifier(StringRange(), remote_database, ASTIdentifier::Database); + select.table = new ASTIdentifier(StringRange(), remote_table, ASTIdentifier::Table); + + std::stringstream s; + formatAST(select, s, 0, false, true); + String modified_query = s.str(); /// добавляем запросы к локальному clickhouse DB::Context new_context = context; diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index ea6afd908be..bfb1516d545 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -49,7 +49,7 @@ BlockInputStreams StorageMerge::read( BlockInputStreams res; Names virt_column_names, real_column_names; - for (auto & it : column_names) + for (const auto & it : column_names) if (it != _table_column_name) real_column_names.push_back(it); else