dbms: fixed TableFunctionRemote [#METR-9750].

This commit is contained in:
Alexey Milovidov 2014-02-23 06:27:09 +04:00
parent 05615da00a
commit 5c71627f44
4 changed files with 28 additions and 14 deletions

View File

@ -80,11 +80,13 @@ public:
*/
size_t checkDepth(size_t max_depth) const;
protected:
BlockInputStreams children;
void setOwnedStorage(StoragePtr owned_storage_) { owned_storage = owned_storage_; }
protected:
StoragePtr owned_storage;
BlockInputStreams children;
private:
void getLeavesImpl(BlockInputStreams & res, BlockInputStreamPtr this_shared_ptr = NULL);

View File

@ -34,7 +34,7 @@ public:
NamesAndTypesListPtr columns_, /// Список столбцов.
const String & remote_database_, /// БД на удалённых серверах.
const String & remote_table_, /// Имя таблицы на удалённых серверах.
Cluster & cluster_,
SharedPtr<Cluster> & owned_cluster_,
const DataTypeFactory & data_type_factory_,
Context & context_,
const String & sign_column_name_ = "");
@ -95,6 +95,11 @@ private:
String _port_column_name;
const Context & context;
/// Используется только, если таблица должна владеть объектом Cluster, которым больше никто не владеет - для реализации TableFunctionRemote.
SharedPtr<Cluster> owned_cluster;
/// Соединения с удалёнными серверами.
Cluster & cluster;
};

View File

@ -63,15 +63,13 @@ public:
for (size_t i = 0; i < shards.size(); ++i)
names.push_back(parseDescription(shards[i], 0, shards[i].size(), '|'));
cluster = new Cluster(context.getSettings(), context.getDataTypeFactory(), names, username, password);
SharedPtr<Cluster> cluster = new Cluster(context.getSettings(), context.getDataTypeFactory(), names, username, password);
return StorageDistributed::create(getName(), chooseColumns(*cluster, remote_database, remote_table, context),
remote_database, remote_table, *cluster, context.getDataTypeFactory(), context);
remote_database, remote_table, cluster, context.getDataTypeFactory(), context);
}
private:
Poco::SharedPtr<Cluster> cluster; /// Ссылка на объект кластер передается в StorageDistributed и должен существовать до выполнения запроса
/// Узнать имена и типы столбцов для создания таблицы
NamesAndTypesListPtr chooseColumns(Cluster & cluster, const String & database, const String & table, const Context & context) const
{

View File

@ -59,12 +59,17 @@ StoragePtr StorageDistributed::create(
NamesAndTypesListPtr columns_,
const String & remote_database_,
const String & remote_table_,
Cluster & cluster_,
SharedPtr<Cluster> & owned_cluster_,
const DataTypeFactory & data_type_factory_,
Context & context_,
const String & sign_column_name_)
{
return (new StorageDistributed(name_, columns_, remote_database_, remote_table_, cluster_, data_type_factory_, context_, sign_column_name_))->thisPtr();
auto res = new StorageDistributed(name_, columns_, remote_database_, remote_table_, *owned_cluster_, data_type_factory_, context_, sign_column_name_);
/// Захватываем владение объектом-кластером.
res->owned_cluster = owned_cluster_;
return res->thisPtr();
}
NameAndTypePair StorageDistributed::getColumn(const String &column_name) const
@ -184,10 +189,10 @@ BlockInputStreams StorageDistributed::read(
BlockInputStreams res;
for (ConnectionPools::iterator it = cluster.pools.begin(); it != cluster.pools.end(); ++it)
for (auto & conn_pool : cluster.pools)
{
String current_host = (*it)->get()->getHost();
UInt16 current_port = (*it)->get()->getPort();
String current_host = conn_pool->get()->getHost();
UInt16 current_port = conn_pool->get()->getPort();
if (!all_inclusive && values.find(std::make_pair(current_host, current_port)) == values.end())
continue;
@ -198,7 +203,7 @@ BlockInputStreams StorageDistributed::read(
need_port_column ? current_port : 0));
BlockInputStreamPtr temp = new RemoteBlockInputStream(
(*it)->get(&new_settings),
conn_pool->get(&new_settings),
modified_query,
&new_settings,
need_host_column ? _host_column_name : "",
@ -232,6 +237,10 @@ BlockInputStreams StorageDistributed::read(
}
}
/// Не дадим уничтожать объект до конца обработки запроса.
for (auto & stream : res)
stream->setOwnedStorage(thisPtr());
return res;
}