Merge pull request #29774 from kssenii/remote-connection

Predefined configuration for table function remote
This commit is contained in:
Kseniia Sumarokova 2021-10-24 19:38:01 +03:00 committed by GitHub
commit 4667ea7883
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 247 additions and 139 deletions

View File

@ -1,7 +1,7 @@
#include <Common/typeid_cast.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTFunctionHelpers.h>
#include <Parsers/ASTHelpers.h>
#include <Parsers/ASTExpressionList.h>
#include <Interpreters/OptimizeIfWithConstantConditionVisitor.h>
#include <IO/WriteHelpers.h>

View File

@ -6,11 +6,12 @@
namespace DB
{
static bool isFunctionCast(const ASTFunction * function)
static inline bool isFunctionCast(const ASTFunction * function)
{
if (function)
return function->name == "CAST" || function->name == "_CAST";
return false;
}
}

View File

@ -47,10 +47,11 @@ void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration
database = conf.database;
table = conf.table;
schema = conf.schema;
addresses_expr = conf.addresses_expr;
}
std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine)
std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection)
{
if (args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
@ -64,7 +65,14 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
const auto & config_prefix = fmt::format("named_collections.{}", collection->name());
if (!config.has(config_prefix))
{
/// For table function remote we do not throw on no collection, because then we consider first arg
/// as cluster definition from config.
if (!throw_on_no_collection)
return std::nullopt;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name());
}
configuration.host = config.getString(config_prefix + ".host", "");
configuration.port = config.getInt(config_prefix + ".port", 0);
@ -73,15 +81,19 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
configuration.database = config.getString(config_prefix + ".database", "");
configuration.table = config.getString(config_prefix + ".table", "");
configuration.schema = config.getString(config_prefix + ".schema", "");
configuration.addresses_expr = config.getString(config_prefix + ".addresses_expr", "");
if ((args.size() == 1) && (configuration.host.empty() || configuration.port == 0
|| configuration.username.empty() || configuration.password.empty()
if (!configuration.addresses_expr.empty() && !configuration.host.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot have `addresses_expr` and `host`, `port` in configuration at the same time");
if ((args.size() == 1) && ((configuration.addresses_expr.empty() && (configuration.host.empty() || configuration.port == 0))
|| configuration.database.empty() || (configuration.table.empty() && !is_database_engine)))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some of the parameters and no key-value arguments are added");
}
/// Check key-value arguments.
for (size_t i = 1; i < args.size(); ++i)
{
if (const auto * ast_function = typeid_cast<const ASTFunction *>(args[i].get()))
@ -92,24 +104,40 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
auto arg_value = evaluateConstantExpressionOrIdentifierAsLiteral(function_args[1], context)->as<ASTLiteral>()->value;
if (function_args[1]->as<ASTFunction>())
{
non_common_args.emplace_back(std::make_pair(arg_name, function_args[1]));
continue;
}
if (arg_name == "host")
configuration.host = arg_value.safeGet<String>();
else if (arg_name == "port")
configuration.port = arg_value.safeGet<UInt64>();
else if (arg_name == "user")
configuration.username = arg_value.safeGet<String>();
else if (arg_name == "password")
configuration.password = arg_value.safeGet<String>();
else if (arg_name == "database")
configuration.database = arg_value.safeGet<String>();
else if (arg_name == "table")
configuration.table = arg_value.safeGet<String>();
else if (arg_name == "schema")
configuration.schema = arg_value.safeGet<String>();
auto arg_value_ast = evaluateConstantExpressionOrIdentifierAsLiteral(function_args[1], context);
auto * arg_value_literal = arg_value_ast->as<ASTLiteral>();
if (arg_value_literal)
{
auto arg_value = arg_value_literal->value;
if (arg_name == "host")
configuration.host = arg_value.safeGet<String>();
else if (arg_name == "port")
configuration.port = arg_value.safeGet<UInt64>();
else if (arg_name == "user")
configuration.username = arg_value.safeGet<String>();
else if (arg_name == "password")
configuration.password = arg_value.safeGet<String>();
else if (arg_name == "database")
configuration.database = arg_value.safeGet<String>();
else if (arg_name == "table")
configuration.table = arg_value.safeGet<String>();
else if (arg_name == "schema")
configuration.schema = arg_value.safeGet<String>();
else if (arg_name == "addresses_expr")
configuration.addresses_expr = arg_value.safeGet<String>();
else
non_common_args.emplace_back(std::make_pair(arg_name, arg_value_ast));
}
else
non_common_args.emplace_back(std::make_pair(arg_name, arg_value));
{
non_common_args.emplace_back(std::make_pair(arg_name, arg_value_ast));
}
}
else
{
@ -269,9 +297,13 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
}
}
else
non_common_args.emplace_back(std::make_pair(key, config.getString(config_prefix + '.' + key)));
{
auto value = config.getString(config_prefix + '.' + key);
non_common_args.emplace_back(std::make_pair(key, std::make_shared<ASTLiteral>(value)));
}
}
/// Check key-value arguments.
for (size_t i = 1; i < args.size(); ++i)
{
if (const auto * ast_function = typeid_cast<const ASTFunction *>(args[i].get()))
@ -282,7 +314,8 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
auto arg_value = evaluateConstantExpressionOrIdentifierAsLiteral(function_args[1], context)->as<ASTLiteral>()->value;
auto arg_value_ast = evaluateConstantExpressionOrIdentifierAsLiteral(function_args[1], context);
auto arg_value = arg_value_ast->as<ASTLiteral>()->value;
if (arg_name == "url")
configuration.url = arg_value.safeGet<String>();
@ -293,7 +326,7 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
else if (arg_name == "structure")
configuration.structure = arg_value.safeGet<String>();
else
non_common_args.emplace_back(std::make_pair(arg_name, arg_value));
non_common_args.emplace_back(std::make_pair(arg_name, arg_value_ast));
}
else
{

View File

@ -19,6 +19,7 @@ struct ExternalDataSourceConfiguration
String schema;
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
String addresses_expr;
String toString() const;
@ -45,7 +46,7 @@ struct StorageMongoDBConfiguration : ExternalDataSourceConfiguration
};
using StorageSpecificArgs = std::vector<std::pair<String, DB::Field>>;
using StorageSpecificArgs = std::vector<std::pair<String, ASTPtr>>;
struct ExternalDataSourceConfig
{
@ -63,7 +64,7 @@ struct ExternalDataSourceConfig
* Any key-value engine argument except common (`host`, `port`, `username`, `password`, `database`)
* is returned in EngineArgs struct.
*/
std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine = false);
std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine = false, bool throw_on_no_collection = true);
std::optional<ExternalDataSourceConfiguration> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);

View File

@ -233,7 +233,7 @@ void registerStorageExternalDistributed(StorageFactory & factory)
for (const auto & [name, value] : storage_specific_args)
{
if (name == "description")
cluster_description = value.safeGet<String>();
cluster_description = value->as<ASTLiteral>()->value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unknown key-value argument {} for table engine URL", name);
@ -279,7 +279,7 @@ void registerStorageExternalDistributed(StorageFactory & factory)
for (const auto & [name, value] : storage_specific_args)
{
if (name == "description")
cluster_description = value.safeGet<String>();
cluster_description = value->as<ASTLiteral>()->value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unknown key-value argument {} for table function URL", name);

View File

@ -113,9 +113,9 @@ StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, C
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "collection")
configuration.collection = arg_value.safeGet<String>();
configuration.collection = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else if (arg_name == "options")
configuration.options = arg_value.safeGet<String>();
configuration.options = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unexpected key-value argument."

View File

@ -250,9 +250,9 @@ StorageMySQLConfiguration StorageMySQL::getConfiguration(ASTs engine_args, Conte
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "replace_query")
configuration.replace_query = arg_value.safeGet<bool>();
configuration.replace_query = arg_value->as<ASTLiteral>()->value.safeGet<bool>();
else if (arg_name == "on_duplicate_clause")
configuration.on_duplicate_clause = arg_value.safeGet<String>();
configuration.on_duplicate_clause = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unexpected key-value argument."

View File

@ -398,7 +398,7 @@ StoragePostgreSQLConfiguration StoragePostgreSQL::getConfiguration(ASTs engine_a
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "on_conflict")
configuration.on_conflict = arg_value.safeGet<String>();
configuration.on_conflict = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unexpected key-value argument."

View File

@ -750,9 +750,9 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "access_key_id")
configuration.access_key_id = arg_value.safeGet<String>();
configuration.access_key_id = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else if (arg_name == "secret_access_key")
configuration.secret_access_key = arg_value.safeGet<String>();
configuration.secret_access_key = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Unknown key-value argument `{}` for StorageS3, expected: url, [access_key_id, secret_access_key], name of used format and [compression_method].",

View File

@ -13,6 +13,7 @@
#include <Common/typeid_cast.h>
#include <Common/parseRemoteDescription.h>
#include <Common/Macros.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Core/Defines.h>
#include <base/range.h>
@ -32,126 +33,171 @@ namespace ErrorCodes
void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
ASTs & args_func = ast_function->children;
ExternalDataSourceConfiguration configuration;
String cluster_name;
String cluster_description;
if (args_func.size() != 1)
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = args_func.at(0)->children;
const size_t max_args = is_cluster_function ? 4 : 6;
if (args.size() < 2 || args.size() > max_args)
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String cluster_name;
String cluster_description;
String remote_database;
String remote_table;
String username;
String password;
size_t arg_num = 0;
auto get_string_literal = [](const IAST & node, String & res)
/**
* Number of arguments for remote function is 4.
* Number of arguments for cluster function is 6.
* For now named collection can be used only for remote as cluster does not require credentials.
*/
size_t max_args = is_cluster_function ? 4 : 6;
auto named_collection = getExternalDataSourceConfiguration(args, context, false, false);
if (named_collection)
{
const auto * lit = node.as<ASTLiteral>();
if (!lit)
return false;
if (is_cluster_function)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Named collection cannot be used for table function cluster");
if (lit->value.getType() != Field::Types::String)
return false;
/**
* Common arguments: database, table, username, password, addresses_expr.
* Specific args (remote): sharding_key, or database (in case it is not ASTLiteral).
* None of the common arguments is empty at this point, it is checked in getExternalDataSourceConfiguration.
*/
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
res = safeGet<const String &>(lit->value);
return true;
};
if (is_cluster_function)
{
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
cluster_name = args[arg_num]->as<ASTLiteral &>().value.safeGet<const String &>();
}
else
{
if (!tryGetIdentifierNameInto(args[arg_num], cluster_name))
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (!get_string_literal(*args[arg_num], cluster_description))
throw Exception("Hosts pattern must be string literal (in single quotes).", ErrorCodes::BAD_ARGUMENTS);
}
}
++arg_num;
const auto * function = args[arg_num]->as<ASTFunction>();
if (function && TableFunctionFactory::instance().isTableFunctionName(function->name))
{
remote_table_function_ptr = args[arg_num];
++arg_num;
}
else
{
args[arg_num] = evaluateConstantExpressionForDatabaseName(args[arg_num], context);
remote_database = args[arg_num]->as<ASTLiteral &>().value.safeGet<String>();
++arg_num;
auto qualified_name = QualifiedTableName::parseFromString(remote_database);
if (qualified_name.database.empty())
{
if (arg_num >= args.size())
if (arg_name == "sharding_key")
{
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
sharding_key = arg_value;
}
else if (arg_name == "database")
{
const auto * function = arg_value->as<ASTFunction>();
if (function && TableFunctionFactory::instance().isTableFunctionName(function->name))
{
remote_table_function_ptr = arg_value;
}
else
{
auto database_literal = evaluateConstantExpressionOrIdentifierAsLiteral(arg_value, context);
configuration.database = database_literal->as<ASTLiteral>()->value.safeGet<String>();
}
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unexpected key-value argument."
"Got: {}, but expected: sharding_key", arg_name);
}
cluster_description = configuration.addresses_expr;
if (cluster_description.empty())
cluster_description = configuration.port ? configuration.host + ':' + toString(configuration.port) : configuration.host;
}
else
{
if (args.size() < 2 || args.size() > max_args)
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
size_t arg_num = 0;
auto get_string_literal = [](const IAST & node, String & res)
{
const auto * lit = node.as<ASTLiteral>();
if (!lit)
return false;
if (lit->value.getType() != Field::Types::String)
return false;
res = safeGet<const String &>(lit->value);
return true;
};
if (is_cluster_function)
{
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
cluster_name = args[arg_num]->as<ASTLiteral &>().value.safeGet<const String &>();
}
else
{
if (!tryGetIdentifierNameInto(args[arg_num], cluster_name))
{
std::swap(qualified_name.database, qualified_name.table);
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
qualified_name.table = args[arg_num]->as<ASTLiteral &>().value.safeGet<String>();
if (!get_string_literal(*args[arg_num], cluster_description))
throw Exception("Hosts pattern must be string literal (in single quotes).", ErrorCodes::BAD_ARGUMENTS);
}
}
++arg_num;
const auto * function = args[arg_num]->as<ASTFunction>();
if (function && TableFunctionFactory::instance().isTableFunctionName(function->name))
{
remote_table_function_ptr = args[arg_num];
++arg_num;
}
else
{
args[arg_num] = evaluateConstantExpressionForDatabaseName(args[arg_num], context);
configuration.database = args[arg_num]->as<ASTLiteral &>().value.safeGet<String>();
++arg_num;
auto qualified_name = QualifiedTableName::parseFromString(configuration.database);
if (qualified_name.database.empty())
{
if (arg_num >= args.size())
{
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
else
{
std::swap(qualified_name.database, qualified_name.table);
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
qualified_name.table = args[arg_num]->as<ASTLiteral &>().value.safeGet<String>();
++arg_num;
}
}
configuration.database = std::move(qualified_name.database);
configuration.table = std::move(qualified_name.table);
/// Cluster function may have sharding key for insert
if (is_cluster_function && arg_num < args.size())
{
sharding_key = args[arg_num];
++arg_num;
}
}
remote_database = std::move(qualified_name.database);
remote_table = std::move(qualified_name.table);
}
/// Username and password parameters are prohibited in cluster version of the function
if (!is_cluster_function)
{
if (arg_num < args.size())
{
if (!get_string_literal(*args[arg_num], configuration.username))
{
configuration.username = "default";
sharding_key = args[arg_num];
}
++arg_num;
}
/// Cluster function may have sharding key for insert
if (is_cluster_function && arg_num < args.size())
{
sharding_key = args[arg_num];
++arg_num;
}
if (arg_num < args.size() && !sharding_key)
{
if (!get_string_literal(*args[arg_num], configuration.password))
{
sharding_key = args[arg_num];
}
++arg_num;
}
if (arg_num < args.size() && !sharding_key)
{
sharding_key = args[arg_num];
++arg_num;
}
}
/// Username and password parameters are prohibited in cluster version of the function
if (!is_cluster_function)
{
if (arg_num < args.size())
{
if (!get_string_literal(*args[arg_num], username))
{
username = "default";
sharding_key = args[arg_num];
}
++arg_num;
}
if (arg_num < args.size() && !sharding_key)
{
if (!get_string_literal(*args[arg_num], password))
{
sharding_key = args[arg_num];
}
++arg_num;
}
if (arg_num < args.size() && !sharding_key)
{
sharding_key = args[arg_num];
++arg_num;
}
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
if (arg_num < args.size())
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!cluster_name.empty())
{
/// Use an existing cluster from the main config
@ -197,19 +243,19 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, ContextPtr
cluster = std::make_shared<Cluster>(
context->getSettings(),
names,
username,
password,
configuration.username,
configuration.password,
(secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context->getTCPPort()),
treat_local_as_remote,
treat_local_port_as_remote,
secure);
}
if (!remote_table_function_ptr && remote_table.empty())
if (!remote_table_function_ptr && configuration.table.empty())
throw Exception("The name of remote table cannot be empty", ErrorCodes::BAD_ARGUMENTS);
remote_table_id.database_name = remote_database;
remote_table_id.table_name = remote_table;
remote_table_id.database_name = configuration.database;
remote_table_id.table_name = configuration.table;
}
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const

View File

@ -48,9 +48,9 @@ void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr con
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "access_key_id")
configuration.access_key_id = arg_value.safeGet<String>();
configuration.access_key_id = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else if (arg_name == "secret_access_key")
configuration.secret_access_key = arg_value.safeGet<String>();
configuration.secret_access_key = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Unknown key-value argument `{}` for StorageS3, expected: "

View File

@ -0,0 +1,13 @@
<clickhouse>
<named_collections>
<remote1>
<host>127.0.0.2</host>
<port>9000</port>
<table>remote_test</table>
</remote1>
<remote2>
<host>127.0.0.{1,2}</host>
<table>remote_test</table>
</remote2>
</named_collections>
</clickhouse>

View File

@ -40,6 +40,7 @@ ln -sf $SRC_PATH/config.d/encryption.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/CORS.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/zookeeper_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/logger.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/named_collection.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/users.d/log_queries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/readonly.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/access_management.xml $DEST_SERVER_PATH/users.d/

View File

@ -0,0 +1,11 @@
-- Tags: shard, no-fasttest
DROP TABLE IF EXISTS remote_test;
CREATE TABLE remote_test(a1 UInt8) ENGINE=Memory;
INSERT INTO FUNCTION remote(remote1, database=currentDatabase()) VALUES(1);
INSERT INTO FUNCTION remote(remote1, database=currentDatabase()) VALUES(2);
INSERT INTO FUNCTION remote(remote1, database=currentDatabase()) VALUES(3);
INSERT INTO FUNCTION remote(remote1, database=currentDatabase()) VALUES(4);
SELECT count() FROM remote(remote1, database=currentDatabase());
SELECT count() FROM remote(remote2, database=merge(currentDatabase(), '^remote_test'));
DROP TABLE remote_test;