From 312feedecb3a743aafecac53bc40a762f29c9d08 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 7 Apr 2014 01:59:27 +0400 Subject: [PATCH] dbms: removed virtual columns from StorageDistributed [#METR-9172]. --- dbms/include/DB/Common/VirtualColumnUtils.h | 22 --- .../DB/DataStreams/RemoteBlockInputStream.h | 29 +--- dbms/include/DB/Storages/StorageDistributed.h | 13 +- dbms/src/Storages/StorageDistributed.cpp | 156 +----------------- 4 files changed, 14 insertions(+), 206 deletions(-) diff --git a/dbms/include/DB/Common/VirtualColumnUtils.h b/dbms/include/DB/Common/VirtualColumnUtils.h index 37897dc568f..00f5b8ce2c7 100644 --- a/dbms/include/DB/Common/VirtualColumnUtils.h +++ b/dbms/include/DB/Common/VirtualColumnUtils.h @@ -49,28 +49,6 @@ std::multiset extractSingleValueFromBlocks(BlockInputStreamPtr input, const return res; } -/// Извлечь из входного потока множество пар значений в столбцах first_name и second_name -template -std::multiset< std::pair > extractTwoValuesFromBlocks(BlockInputStreamPtr input, - const String & first_name, const String & second_name) -{ - std::multiset< std::pair > res; - input->readPrefix(); - while(1) - { - Block block = input->read(); - if (!block) break; - const ColumnWithNameAndType & first = block.getByName(first_name); - const ColumnWithNameAndType & second = block.getByName(second_name); - for (size_t i = 0; i < block.rows(); ++i) - { - T1 val1 = (*first.column)[i].get(); - T2 val2 = (*second.column)[i].get(); - res.insert(std::make_pair(val1, val2)); - } - } - return res; } } -} diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 5828d2f64f6..d4b0906fb1b 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -48,9 +48,9 @@ public: } RemoteBlockInputStream(ConnectionPool::Entry pool_entry_, const String & query_, const Settings * settings_, - const String & _host_column_, const String & _port_column_, const Tables & external_tables_, QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) - : pool_entry(pool_entry_), connection(*pool_entry), query(query_), _host_column(_host_column_), - _port_column(_port_column_), external_tables(external_tables_), stage(stage_), sent_query(false), finished(false), + const Tables & external_tables_, QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) + : pool_entry(pool_entry_), connection(*pool_entry), query(query_), + external_tables(external_tables_), stage(stage_), sent_query(false), finished(false), was_cancelled(false), got_exception_from_server(false), log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")")) { @@ -107,22 +107,6 @@ public: } protected: - void populateBlock(Block & res) - { - if (!_host_column.empty() && !res.has(_host_column)) - { - ColumnPtr column_ptr = ColumnConst(res.rows(), connection.getHost(), new DataTypeString).convertToFullColumn(); - ColumnWithNameAndType column(column_ptr, new DataTypeString, _host_column); - res.insert(column); - } - if (!_port_column.empty() && !res.has(_port_column)) - { - ColumnPtr column_ptr = ColumnConst(res.rows(), connection.getPort(), new DataTypeUInt16).convertToFullColumn(); - ColumnWithNameAndType column(column_ptr, new DataTypeUInt16, _port_column); - res.insert(column); - } - } - /// Отправить на удаленные сервера все временные таблицы void sendExternalTables() { @@ -159,10 +143,7 @@ protected: case Protocol::Server::Data: /// Если блок не пуст и не является заголовочным блоком if (packet.block && packet.block.rows() > 0) - { - populateBlock(packet.block); return packet.block; - } break; /// Если блок пустой - получим другие пакеты до EndOfStream. case Protocol::Server::Exception: @@ -269,10 +250,6 @@ private: const String query; bool send_settings; Settings settings; - /// Имя столбца, куда записать имя хоста (Например "_host"). Пустая строка, если записывать не надо. - String _host_column; - /// Имя столбца, куда записать номер порта (Например "_port"). Пустая строка, если записывать не надо. - String _port_column; /// Временные таблицы, которые необходимо переслать на удаленные сервера. Tables external_tables; QueryProcessingStage::Enum stage; diff --git a/dbms/include/DB/Storages/StorageDistributed.h b/dbms/include/DB/Storages/StorageDistributed.h index 416527b5207..4c426543325 100644 --- a/dbms/include/DB/Storages/StorageDistributed.h +++ b/dbms/include/DB/Storages/StorageDistributed.h @@ -45,8 +45,6 @@ public: bool supportsPrewhere() const { return true; } const NamesAndTypesList & getColumnsList() const { return *columns; } - NameAndTypePair getColumn(const String &column_name) const; - bool hasColumn(const String &column_name) const; bool isRemote() const { return true; } /// Сохранить временные таблицы, чтобы при следующем вызове метода read переслать их на удаленные сервера @@ -66,8 +64,6 @@ public: /// структура подтаблиц не проверяется void alter(const ASTAlterQuery::Parameters ¶ms); - Block getBlockWithVirtualColumns(); - private: StorageDistributed( const std::string & name_, @@ -78,8 +74,8 @@ private: const Context & context_, const String & sign_column_name_ = ""); - /// Создает копию запроса, меняет имена базы данных и таблицы, записавыет значения переменных host и port, если они не пустые. - ASTPtr remakeQuery(ASTPtr query, const String & host, size_t port); + /// Создает копию запроса, меняет имена базы данных и таблицы. + ASTPtr rewriteQuery(ASTPtr query); String name; NamesAndTypesListPtr columns; @@ -87,11 +83,6 @@ private: String remote_table; String sign_column_name; - /// Имя виртуального столбца, куда записывается имя хоста (Например "_host"). - String _host_column_name; - /// Имя виртуального столбца, куда записывается номер порта (Например "_port"). - String _port_column_name; - const Context & context; /// Временные таблицы, которые необходимо отправить на сервер. Переменная очищается после каждого вызова метода read diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index d60c2f80b64..650bbb6e71c 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -29,12 +29,6 @@ StorageDistributed::StorageDistributed( context(context_), cluster(cluster_) { - std::vector virtual_columns; - virtual_columns.push_back("_host"); - virtual_columns.push_back("_port"); - String suffix = VirtualColumnUtils::chooseSuffixForSet(getColumnsList(), virtual_columns); - _host_column_name = virtual_columns[0] + suffix; - _port_column_name = virtual_columns[1] + suffix; } StoragePtr StorageDistributed::create( @@ -68,37 +62,11 @@ StoragePtr StorageDistributed::create( return res->thisPtr(); } -NameAndTypePair StorageDistributed::getColumn(const String &column_name) const -{ - if (column_name == _host_column_name) - return std::make_pair(_host_column_name, new DataTypeString); - if (column_name == _port_column_name) - return std::make_pair(_port_column_name, new DataTypeUInt16); - - return getRealColumn(column_name); -} - -bool StorageDistributed::hasColumn(const String &column_name) const -{ - if (column_name == _host_column_name) - return true; - if (column_name == _port_column_name) - return true; - - return hasRealColumn(column_name); -} - -ASTPtr StorageDistributed::remakeQuery(ASTPtr query, const String & host, size_t port) +ASTPtr StorageDistributed::rewriteQuery(ASTPtr query) { /// Создаем копию запроса. ASTPtr modified_query_ast = query->clone(); - /// Добавляем в запрос значения хоста и порта, если требуется. - if (!host.empty()) - VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _host_column_name, host); - if (port != 0) - VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _port_column_name, port); - /// Меняем имена таблицы и базы данных ASTSelectQuery & select = dynamic_cast(*modified_query_ast); select.database = new ASTIdentifier(StringRange(), remote_database, ASTIdentifier::Database); @@ -123,110 +91,35 @@ BlockInputStreams StorageDistributed::read( size_t max_block_size, unsigned threads) { - /// Узнаем на каком порту слушает сервер - UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); - /// Установим sign_rewrite = 0, чтобы второй раз не переписывать запрос Settings new_settings = settings; new_settings.sign_rewrite = false; new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time); - /** Запрошены ли виртуальные столбцы? - * Если да - будем добавлять их в виде констант в запрос, предназначенный для выполнения на удалённом сервере, - * а также при получении результата с удалённого сервера. - */ - bool need_host_column = false; - bool need_port_column = false; - for (const auto & it : column_names) - { - if (it == _host_column_name) - need_host_column = true; - else if (it == _port_column_name) - need_port_column = true; - } - - /** Есть ли виртуальные столбцы в секции SELECT? - * Если нет - в случае вычисления запроса до стадии Complete, необходимо удалить их из блока. - */ - bool select_host_column = false; - bool select_port_column = false; - const ASTExpressionList & select_list = dynamic_cast(*(dynamic_cast(*query)).select_expression_list); - for (const auto & it : select_list.children) - { - if (const ASTIdentifier * identifier = dynamic_cast(&*it)) - { - if (identifier->name == _host_column_name) - select_host_column = true; - else if (identifier->name == _port_column_name) - select_port_column = true; - } - } - - Names columns_to_remove; - if (!select_host_column && need_host_column) - columns_to_remove.push_back(_host_column_name); - if (!select_port_column && need_port_column) - columns_to_remove.push_back(_port_column_name); - - Block virtual_columns_block = getBlockWithVirtualColumns(); - BlockInputStreamPtr virtual_columns; - - /// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать - if (need_host_column || need_port_column) - virtual_columns = VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, context); - else /// Иначе, считаем допустимыми все возможные значения - virtual_columns = new OneBlockInputStream(virtual_columns_block); - - std::multiset< std::pair > values = - VirtualColumnUtils::extractTwoValuesFromBlocks(virtual_columns, _host_column_name, _port_column_name); - bool all_inclusive = values.size() == virtual_columns_block.rows(); - - size_t result_size = values.size(); - if (cluster.getLocalNodesNum() > 0 && values.find(std::make_pair("localhost", clickhouse_port)) != values.end()) - result_size += cluster.getLocalNodesNum() - 1; + size_t result_size = cluster.pools.size() + cluster.getLocalNodesNum(); processed_stage = result_size == 1 ? QueryProcessingStage::Complete : QueryProcessingStage::WithMergeableState; BlockInputStreams res; + ASTPtr modified_query_ast = rewriteQuery(query); for (auto & conn_pool : cluster.pools) { - String current_host = conn_pool->get()->getHost(); - UInt16 current_port = conn_pool->get()->getPort(); + String modified_query = selectToString(modified_query_ast); - if (!all_inclusive && values.find(std::make_pair(current_host, current_port)) == values.end()) - continue; - - String modified_query = selectToString(remakeQuery( - query, - need_host_column ? current_host : "", - need_port_column ? current_port : 0)); - - BlockInputStreamPtr temp = new RemoteBlockInputStream( + res.push_back(new RemoteBlockInputStream( conn_pool->get(&new_settings), modified_query, &new_settings, - need_host_column ? _host_column_name : "", - need_port_column ? _port_column_name : "", external_tables, - processed_stage); - - if (processed_stage == QueryProcessingStage::WithMergeableState || columns_to_remove.empty()) - res.push_back(temp); - else - res.push_back(new RemoveColumnsBlockInputStream(temp, columns_to_remove)); + processed_stage)); } - if (cluster.getLocalNodesNum() > 0 && (all_inclusive || values.find(std::make_pair("localhost", clickhouse_port)) != values.end())) + /// Добавляем запросы к локальному ClickHouse + if (cluster.getLocalNodesNum() > 0) { - ASTPtr modified_query_ast = remakeQuery( - query, - need_host_column ? "localhost" : "", - need_port_column ? clickhouse_port : 0); - - /// Добавляем запросы к локальному ClickHouse DB::Context new_context = context; new_context.setSettings(new_settings); for (auto & it : external_tables) @@ -236,42 +129,11 @@ BlockInputStreams StorageDistributed::read( for(size_t i = 0; i < cluster.getLocalNodesNum(); ++i) { InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage); - if (processed_stage == QueryProcessingStage::WithMergeableState || columns_to_remove.empty()) res.push_back(interpreter.execute()); - else - res.push_back(new RemoveColumnsBlockInputStream(interpreter.execute(), columns_to_remove)); } } + external_tables.clear(); - - return res; -} - -/// Построить блок состоящий только из возможных значений виртуальных столбцов -Block StorageDistributed::getBlockWithVirtualColumns() -{ - Block res; - ColumnWithNameAndType _host(new ColumnString, new DataTypeString, _host_column_name); - ColumnWithNameAndType _port(new ColumnUInt16, new DataTypeUInt16, _port_column_name); - - for (ConnectionPools::iterator it = cluster.pools.begin(); it != cluster.pools.end(); ++it) - { - _host.column->insert((*it)->get()->getHost()); - _port.column->insert(static_cast((*it)->get()->getPort())); - } - - if (cluster.getLocalNodesNum() > 0) - { - /// Узнаем на каком порту слушает сервер - UInt64 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); - String clockhouse_host = "localhost"; - _host.column->insert(clockhouse_host); - _port.column->insert(clickhouse_port); - } - - res.insert(_host); - res.insert(_port); - return res; }