From de71157b10199020d725bca94ec99fcf40a400d8 Mon Sep 17 00:00:00 2001 From: Pavel Kartavyy Date: Thu, 28 Nov 2013 10:31:17 +0000 Subject: [PATCH] StorageDistributed: development [#METR-9244] --- dbms/include/DB/Storages/StorageDistributed.h | 6 ++ dbms/src/Storages/StorageDistributed.cpp | 84 ++++++++++++++++--- 2 files changed, 78 insertions(+), 12 deletions(-) diff --git a/dbms/include/DB/Storages/StorageDistributed.h b/dbms/include/DB/Storages/StorageDistributed.h index 3e0da47f46e..80497197b8f 100644 --- a/dbms/include/DB/Storages/StorageDistributed.h +++ b/dbms/include/DB/Storages/StorageDistributed.h @@ -105,6 +105,8 @@ private: const Settings & settings, const Context & context_, const String & sign_column_name_ = ""); + + bool checkLocalReplics(const Address & address); String name; NamesAndTypesListPtr columns; @@ -116,6 +118,10 @@ private: const Context & context; /// Соединения с удалёнными серверами. ConnectionPools pools; + + /// количество реплик clickhouse сервера, расположенных локально + /// к локальным репликам обращаемся напрямую + size_t local_replics_num; }; } diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 1270be3e872..85773f72aa9 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -4,6 +4,10 @@ #include +#include + +#include +#include namespace DB { @@ -22,13 +26,23 @@ StorageDistributed::StorageDistributed( remote_database(remote_database_), remote_table(remote_table_), data_type_factory(data_type_factory_), sign_column_name(sign_column_name_), - context(context_) + context(context_), + local_replics_num(0) { for (Addresses::const_iterator it = addresses.begin(); it != addresses.end(); ++it) - pools.push_back(new ConnectionPool( - settings.distributed_connections_pool_size, - it->host_port.host().toString(), it->host_port.port(), "", it->user, it->password, data_type_factory, "server", Protocol::Compression::Enable, - settings.connect_timeout, settings.receive_timeout, settings.send_timeout)); + { + if (checkLocalReplics(*it)) + { + ++local_replics_num; + } + else + { + pools.push_back(new ConnectionPool( + settings.distributed_connections_pool_size, + it->host_port.host().toString(), it->host_port.port(), "", it->user, it->password, data_type_factory, "server", Protocol::Compression::Enable, + settings.connect_timeout, settings.receive_timeout, settings.send_timeout)); + } + } } StorageDistributed::StorageDistributed( @@ -45,7 +59,8 @@ StorageDistributed::StorageDistributed( remote_database(remote_database_), remote_table(remote_table_), data_type_factory(data_type_factory_), sign_column_name(sign_column_name_), - context(context_) + context(context_), + local_replics_num(0) { for (AddressesWithFailover::const_iterator it = addresses.begin(); it != addresses.end(); ++it) { @@ -53,15 +68,48 @@ StorageDistributed::StorageDistributed( replicas.reserve(it->size()); for (Addresses::const_iterator jt = it->begin(); jt != it->end(); ++jt) - replicas.push_back(new ConnectionPool( - settings.distributed_connections_pool_size, - jt->host_port.host().toString(), jt->host_port.port(), "", jt->user, jt->password, data_type_factory, "server", Protocol::Compression::Enable, - settings.connect_timeout_with_failover_ms, settings.receive_timeout, settings.send_timeout)); + { + if (checkLocalReplics(*jt)) + { + ++local_replics_num; + } + else + { + replicas.push_back(new ConnectionPool( + settings.distributed_connections_pool_size, + jt->host_port.host().toString(), jt->host_port.port(), "", jt->user, jt->password, data_type_factory, "server", Protocol::Compression::Enable, + settings.connect_timeout_with_failover_ms, settings.receive_timeout, settings.send_timeout)); + } + } pools.push_back(new ConnectionPoolWithFailover(replicas, settings.connections_with_failover_max_tries)); } } +static bool interfaceEqual(const Poco::Net::NetworkInterface & interface, Poco::Net::IPAddress & address) +{ + return interface.address() == address; +} + +bool StorageDistributed::checkLocalReplics(const Address & address) +{ + /// Если среди реплик существует такая, что: + /// - её порт совпадает с портом, который слушает сервер; + /// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера + /// то нужно всегда ходить на этот шард без межпроцессного взаимодействия + UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); + Poco::Net::NetworkInterface::NetworkInterfaceList interfaces = Poco::Net::NetworkInterface::list(); + + if (clickhouse_port == address.host_port.port() && + interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(), + boost::bind(interfaceEqual, _1, address.host_port.host()))) + { + LOG_INFO(&Poco::Util::Application::instance().logger(), "Replica with address " << address.host_port.toString() << " will be processed as local."); + return true; + } + return false; +} + StoragePtr StorageDistributed::create( const std::string & name_, NamesAndTypesListPtr columns_, @@ -102,13 +150,13 @@ BlockInputStreams StorageDistributed::read( processed_stage = pools.size() == 1 ? QueryProcessingStage::Complete : QueryProcessingStage::WithMergeableState; - + /// Заменим в запросе имена БД и таблицы. ASTPtr modified_query_ast = query->clone(); ASTSelectQuery & select = dynamic_cast(*modified_query_ast); select.database = new ASTIdentifier(StringRange(), remote_database, ASTIdentifier::Database); select.table = new ASTIdentifier(StringRange(), remote_table, ASTIdentifier::Table); - + /// Установим sign_rewrite = 0, чтобы второй раз не переписывать запрос Settings new_settings = settings; new_settings.sign_rewrite = false; @@ -122,6 +170,18 @@ BlockInputStreams StorageDistributed::read( for (ConnectionPools::iterator it = pools.begin(); it != pools.end(); ++it) res.push_back(new RemoteBlockInputStream((*it)->get(), modified_query, &new_settings, processed_stage)); + + /// добавляем запросы к локальному clickhouse + { + DB::Context new_context = context; + new_context.setSettings(new_settings); + for(size_t i = 0; i < local_replics_num; ++i) + { + InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage); + res.push_back(interpreter.execute()); + } + } + return res; }