From 5c71627f44606c7dfb2185cca705038c4b082026 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 23 Feb 2014 06:27:09 +0400 Subject: [PATCH] dbms: fixed TableFunctionRemote [#METR-9750]. --- .../DB/DataStreams/IBlockInputStream.h | 6 +++-- dbms/include/DB/Storages/StorageDistributed.h | 7 +++++- .../DB/TableFunctions/TableFunctionRemote.h | 6 ++--- dbms/src/Storages/StorageDistributed.cpp | 23 +++++++++++++------ 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/dbms/include/DB/DataStreams/IBlockInputStream.h b/dbms/include/DB/DataStreams/IBlockInputStream.h index 061349ca24b..b5fda965f8f 100644 --- a/dbms/include/DB/DataStreams/IBlockInputStream.h +++ b/dbms/include/DB/DataStreams/IBlockInputStream.h @@ -80,11 +80,13 @@ public: */ size_t checkDepth(size_t max_depth) const; -protected: - BlockInputStreams children; + void setOwnedStorage(StoragePtr owned_storage_) { owned_storage = owned_storage_; } +protected: StoragePtr owned_storage; + BlockInputStreams children; + private: void getLeavesImpl(BlockInputStreams & res, BlockInputStreamPtr this_shared_ptr = NULL); diff --git a/dbms/include/DB/Storages/StorageDistributed.h b/dbms/include/DB/Storages/StorageDistributed.h index 6f8b0ea0c27..aa55b80ba81 100644 --- a/dbms/include/DB/Storages/StorageDistributed.h +++ b/dbms/include/DB/Storages/StorageDistributed.h @@ -34,7 +34,7 @@ public: NamesAndTypesListPtr columns_, /// Список столбцов. const String & remote_database_, /// БД на удалённых серверах. const String & remote_table_, /// Имя таблицы на удалённых серверах. - Cluster & cluster_, + SharedPtr & owned_cluster_, const DataTypeFactory & data_type_factory_, Context & context_, const String & sign_column_name_ = ""); @@ -95,6 +95,11 @@ private: String _port_column_name; const Context & context; + + /// Используется только, если таблица должна владеть объектом Cluster, которым больше никто не владеет - для реализации TableFunctionRemote. + SharedPtr owned_cluster; + + /// Соединения с удалёнными серверами. Cluster & cluster; }; diff --git a/dbms/include/DB/TableFunctions/TableFunctionRemote.h b/dbms/include/DB/TableFunctions/TableFunctionRemote.h index 5572cd7b65a..60f09179dad 100644 --- a/dbms/include/DB/TableFunctions/TableFunctionRemote.h +++ b/dbms/include/DB/TableFunctions/TableFunctionRemote.h @@ -63,15 +63,13 @@ public: for (size_t i = 0; i < shards.size(); ++i) names.push_back(parseDescription(shards[i], 0, shards[i].size(), '|')); - cluster = new Cluster(context.getSettings(), context.getDataTypeFactory(), names, username, password); + SharedPtr cluster = new Cluster(context.getSettings(), context.getDataTypeFactory(), names, username, password); return StorageDistributed::create(getName(), chooseColumns(*cluster, remote_database, remote_table, context), - remote_database, remote_table, *cluster, context.getDataTypeFactory(), context); + remote_database, remote_table, cluster, context.getDataTypeFactory(), context); } private: - Poco::SharedPtr cluster; /// Ссылка на объект кластер передается в StorageDistributed и должен существовать до выполнения запроса - /// Узнать имена и типы столбцов для создания таблицы NamesAndTypesListPtr chooseColumns(Cluster & cluster, const String & database, const String & table, const Context & context) const { diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 9443a4b3103..657a18f0e26 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -59,12 +59,17 @@ StoragePtr StorageDistributed::create( NamesAndTypesListPtr columns_, const String & remote_database_, const String & remote_table_, - Cluster & cluster_, + SharedPtr & owned_cluster_, const DataTypeFactory & data_type_factory_, Context & context_, const String & sign_column_name_) { - return (new StorageDistributed(name_, columns_, remote_database_, remote_table_, cluster_, data_type_factory_, context_, sign_column_name_))->thisPtr(); + auto res = new StorageDistributed(name_, columns_, remote_database_, remote_table_, *owned_cluster_, data_type_factory_, context_, sign_column_name_); + + /// Захватываем владение объектом-кластером. + res->owned_cluster = owned_cluster_; + + return res->thisPtr(); } NameAndTypePair StorageDistributed::getColumn(const String &column_name) const @@ -184,10 +189,10 @@ BlockInputStreams StorageDistributed::read( BlockInputStreams res; - for (ConnectionPools::iterator it = cluster.pools.begin(); it != cluster.pools.end(); ++it) + for (auto & conn_pool : cluster.pools) { - String current_host = (*it)->get()->getHost(); - UInt16 current_port = (*it)->get()->getPort(); + String current_host = conn_pool->get()->getHost(); + UInt16 current_port = conn_pool->get()->getPort(); if (!all_inclusive && values.find(std::make_pair(current_host, current_port)) == values.end()) continue; @@ -198,7 +203,7 @@ BlockInputStreams StorageDistributed::read( need_port_column ? current_port : 0)); BlockInputStreamPtr temp = new RemoteBlockInputStream( - (*it)->get(&new_settings), + conn_pool->get(&new_settings), modified_query, &new_settings, need_host_column ? _host_column_name : "", @@ -231,7 +236,11 @@ BlockInputStreams StorageDistributed::read( res.push_back(new RemoveColumnsBlockInputStream(interpreter.execute(), columns_to_remove)); } } - + + /// Не дадим уничтожать объект до конца обработки запроса. + for (auto & stream : res) + stream->setOwnedStorage(thisPtr()); + return res; }