mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 17:20:50 +00:00
dbms: TableFunctionRemote, StorageDistributed: improvements [#METR-9750].
This commit is contained in:
parent
f5f95b7cd6
commit
8cf5eb001a
@ -26,7 +26,6 @@ public:
|
||||
const String & remote_table_, /// Имя таблицы на удалённых серверах.
|
||||
const String & cluster_name,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
Context & context_,
|
||||
const String & sign_column_name_ = "");
|
||||
|
||||
@ -37,7 +36,6 @@ public:
|
||||
const String & remote_table_, /// Имя таблицы на удалённых серверах.
|
||||
Cluster & cluster_,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
Context & context_,
|
||||
const String & sign_column_name_ = "");
|
||||
|
||||
@ -78,7 +76,6 @@ private:
|
||||
const String & remote_table_,
|
||||
Cluster & cluster_,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
const Context & context_,
|
||||
const String & sign_column_name_ = "");
|
||||
|
||||
|
@ -65,47 +65,47 @@ public:
|
||||
|
||||
cluster = new Cluster(context.getSettings(), context.getDataTypeFactory(), names, username, password);
|
||||
|
||||
return StorageDistributed::create(getName(), chooseColumns(*cluster, remote_database, remote_table, context), remote_database,
|
||||
remote_table, *cluster, context.getDataTypeFactory(), context.getSettings(), context);
|
||||
return StorageDistributed::create(getName(), chooseColumns(*cluster, remote_database, remote_table, context),
|
||||
remote_database, remote_table, *cluster, context.getDataTypeFactory(), context);
|
||||
}
|
||||
|
||||
private:
|
||||
Poco::SharedPtr<Cluster> cluster; /// Ссылка на объект кластер передается в StorageDistributed и должен существовать до выполнения запроса
|
||||
|
||||
/// Узнать имена и типы столбцов для создания таблицы
|
||||
NamesAndTypesListPtr chooseColumns(Cluster & cluster, const String & database, const String &table, const Context & context) const
|
||||
NamesAndTypesListPtr chooseColumns(Cluster & cluster, const String & database, const String & table, const Context & context) const
|
||||
{
|
||||
/// Запрос на описание таблицы
|
||||
String query = "DESC TABLE " + database + "." + table;
|
||||
Settings settings = context.getSettings();
|
||||
/// Отправляем на первый попавшийся сервер
|
||||
detail::ConnectionPoolEntry entry = (*cluster.pools.begin())->get(&settings);
|
||||
auto entry = (*cluster.pools.begin())->get(&settings);
|
||||
|
||||
NamesAndTypesList res;
|
||||
/// Парсим результат запроса и формируем NamesAndTypesList
|
||||
|
||||
BlockInputStreamPtr input = new RemoteBlockInputStream(entry, query, &settings, QueryProcessingStage::Complete);
|
||||
input->readPrefix();
|
||||
|
||||
while (true)
|
||||
{
|
||||
BlockInputStreamPtr input = new RemoteBlockInputStream(entry, query, &settings, QueryProcessingStage::Complete);
|
||||
input->readPrefix();
|
||||
while (true)
|
||||
Block current = input->read();
|
||||
if (!current)
|
||||
break;
|
||||
ColumnPtr name = current.getByName("name").column;
|
||||
ColumnPtr type = current.getByName("type").column;
|
||||
size_t size = name->size();
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
Block current = input->read();
|
||||
if (!current)
|
||||
break;
|
||||
ColumnPtr name = current.getByName("name").column;
|
||||
ColumnPtr type = current.getByName("type").column;
|
||||
size_t size = name->size();
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
Field column, data_type;
|
||||
name->get(i, column);
|
||||
type->get(i, data_type);
|
||||
String column_name = column.get<String>();
|
||||
String data_type_name = data_type.get<String>();
|
||||
res.push_back(std::make_pair(column_name, context.getDataTypeFactory().get(data_type_name)));
|
||||
}
|
||||
String column_name = (*name)[i].get<const String &>();
|
||||
String data_type_name = (*type)[i].get<const String &>();
|
||||
|
||||
res.emplace_back(std::piecewise_construct,
|
||||
std::forward_as_tuple(column_name),
|
||||
std::forward_as_tuple(context.getDataTypeFactory().get(data_type_name)));
|
||||
}
|
||||
}
|
||||
return new NamesAndTypesList(res);
|
||||
|
||||
return new NamesAndTypesList(std::move(res));
|
||||
}
|
||||
|
||||
/// Декартово произведение двух множеств строк, результат записываем на место первого аргумента
|
||||
@ -128,7 +128,7 @@ private:
|
||||
}
|
||||
|
||||
/// Парсим число из подстроки
|
||||
bool parseId(const String & description, size_t l, size_t r, size_t & res) const
|
||||
static bool parseNumber(const String & description, size_t l, size_t r, size_t & res)
|
||||
{
|
||||
res = 0;
|
||||
for (size_t pos = l; pos < r; pos ++)
|
||||
@ -196,11 +196,11 @@ private:
|
||||
if (description[last_dot - 1] != '.')
|
||||
throw Exception("Storage Distributed, incorrect argument in braces (only one dot): " + description.substr(i, m - i + 1),
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
if (!parseId(description, i + 1, last_dot - 1, left))
|
||||
if (!parseNumber(description, i + 1, last_dot - 1, left))
|
||||
throw Exception("Storage Distributed, incorrect argument in braces (Incorrect left number): "
|
||||
+ description.substr(i, m - i + 1),
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
if (!parseId(description, last_dot + 1, m, right))
|
||||
if (!parseNumber(description, last_dot + 1, m, right))
|
||||
throw Exception("Storage Distributed, incorrect argument in braces (Incorrect right number): "
|
||||
+ description.substr(i, m - i + 1),
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
|
@ -22,7 +22,6 @@ StorageDistributed::StorageDistributed(
|
||||
const String & remote_table_,
|
||||
Cluster & cluster_,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
const Context & context_,
|
||||
const String & sign_column_name_)
|
||||
: name(name_), columns(columns_),
|
||||
@ -47,12 +46,11 @@ StoragePtr StorageDistributed::create(
|
||||
const String & remote_table_,
|
||||
const String & cluster_name,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
Context & context_,
|
||||
const String & sign_column_name_)
|
||||
{
|
||||
context_.initClusters();
|
||||
return (new StorageDistributed(name_, columns_, remote_database_, remote_table_, context_.getCluster(cluster_name), data_type_factory_, settings, context_, sign_column_name_))->thisPtr();
|
||||
return (new StorageDistributed(name_, columns_, remote_database_, remote_table_, context_.getCluster(cluster_name), data_type_factory_, context_, sign_column_name_))->thisPtr();
|
||||
}
|
||||
|
||||
|
||||
@ -63,11 +61,10 @@ StoragePtr StorageDistributed::create(
|
||||
const String & remote_table_,
|
||||
Cluster & cluster_,
|
||||
const DataTypeFactory & data_type_factory_,
|
||||
const Settings & settings,
|
||||
Context & context_,
|
||||
const String & sign_column_name_)
|
||||
{
|
||||
return (new StorageDistributed(name_, columns_, remote_database_, remote_table_, cluster_, data_type_factory_, settings, context_, sign_column_name_))->thisPtr();
|
||||
return (new StorageDistributed(name_, columns_, remote_database_, remote_table_, cluster_, data_type_factory_, context_, sign_column_name_))->thisPtr();
|
||||
}
|
||||
|
||||
NameAndTypePair StorageDistributed::getColumn(const String &column_name) const
|
||||
|
@ -149,7 +149,7 @@ StoragePtr StorageFactory::get(
|
||||
String sign_column_name = args.size() == 4 ? dynamic_cast<ASTIdentifier &>(*args[3]).name : "";
|
||||
|
||||
return StorageDistributed::create(table_name, columns, remote_database, remote_table, cluster_name,
|
||||
context.getDataTypeFactory(), context.getSettings(), context, sign_column_name);
|
||||
context.getDataTypeFactory(), context, sign_column_name);
|
||||
}
|
||||
else if (name == "MergeTree" || name == "SummingMergeTree")
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user