ClickHouse/src/Storages/ExternalDataSourceConfiguration.cpp

452 lines
20 KiB
C++
Raw Normal View History

2021-09-01 17:59:11 +00:00
#include "ExternalDataSourceConfiguration.h"
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
2021-09-01 17:59:11 +00:00
#include <Poco/Util/AbstractConfiguration.h>
2021-09-01 23:17:15 +00:00
#include <IO/WriteBufferFromString.h>
2021-09-01 17:59:11 +00:00
#if USE_AMQPCPP
#include <Storages/RabbitMQ/RabbitMQSettings.h>
#endif
#if USE_RDKAFKA
#include <Storages/Kafka/KafkaSettings.h>
#endif
2021-09-01 17:59:11 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db",
"database", "table", "schema", "replica",
"update_field", "update_tag", "invalidate_query", "query",
"where", "name", "secure", "uri", "collection"};
2021-09-01 23:17:15 +00:00
String ExternalDataSourceConfiguration::toString() const
{
WriteBufferFromOwnString configuration_info;
2021-09-07 11:17:25 +00:00
configuration_info << "username: " << username << "\t";
if (addresses.empty())
{
configuration_info << "host: " << host << "\t";
configuration_info << "port: " << port << "\t";
}
else
{
for (const auto & [replica_host, replica_port] : addresses)
{
configuration_info << "host: " << replica_host << "\t";
configuration_info << "port: " << replica_port << "\t";
}
}
2021-09-01 23:17:15 +00:00
return configuration_info.str();
}
2021-09-15 22:45:43 +00:00
void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration & conf)
{
host = conf.host;
port = conf.port;
username = conf.username;
2021-09-22 15:10:25 +00:00
password = conf.password;
2021-09-15 22:45:43 +00:00
database = conf.database;
table = conf.table;
schema = conf.schema;
2021-12-13 22:06:46 +00:00
addresses = conf.addresses;
addresses_expr = conf.addresses_expr;
2021-09-15 22:45:43 +00:00
}
std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection)
2021-09-01 17:59:11 +00:00
{
2021-09-07 11:17:25 +00:00
if (args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
2021-09-01 17:59:11 +00:00
ExternalDataSourceConfiguration configuration;
2021-09-22 15:10:25 +00:00
StorageSpecificArgs non_common_args;
2021-09-01 17:59:11 +00:00
if (const auto * collection = typeid_cast<const ASTIdentifier *>(args[0].get()))
{
const auto & config = context->getConfigRef();
const auto & collection_prefix = fmt::format("named_collections.{}", collection->name());
2021-09-01 17:59:11 +00:00
if (!config.has(collection_prefix))
{
2021-10-06 12:06:40 +00:00
/// For table function remote we do not throw on no collection, because then we consider first arg
/// as cluster definition from config.
if (!throw_on_no_collection)
return std::nullopt;
2021-09-01 17:59:11 +00:00
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name());
}
2021-09-01 17:59:11 +00:00
configuration.host = config.getString(collection_prefix + ".host", "");
configuration.port = config.getInt(collection_prefix + ".port", 0);
configuration.username = config.getString(collection_prefix + ".user", "");
configuration.password = config.getString(collection_prefix + ".password", "");
configuration.database = config.getString(collection_prefix + ".database", "");
2021-12-09 14:40:51 +00:00
configuration.table = config.getString(collection_prefix + ".table", config.getString(collection_prefix + ".collection", ""));
configuration.schema = config.getString(collection_prefix + ".schema", "");
configuration.addresses_expr = config.getString(collection_prefix + ".addresses_expr", "");
if (!configuration.addresses_expr.empty() && !configuration.host.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot have `addresses_expr` and `host`, `port` in configuration at the same time");
2021-09-01 17:59:11 +00:00
if ((args.size() == 1) && ((configuration.addresses_expr.empty() && (configuration.host.empty() || configuration.port == 0))
2021-09-01 23:17:15 +00:00
|| configuration.database.empty() || (configuration.table.empty() && !is_database_engine)))
2021-09-01 17:59:11 +00:00
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some of the parameters and no key-value arguments are added");
}
/// Check key-value arguments.
2021-09-01 17:59:11 +00:00
for (size_t i = 1; i < args.size(); ++i)
{
if (const auto * ast_function = typeid_cast<const ASTFunction *>(args[i].get()))
{
const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_function->arguments.get());
auto function_args = args_expr->children;
if (function_args.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
if (function_args[1]->as<ASTFunction>())
{
non_common_args.emplace_back(std::make_pair(arg_name, function_args[1]));
continue;
}
2021-09-01 17:59:11 +00:00
auto arg_value_ast = evaluateConstantExpressionOrIdentifierAsLiteral(function_args[1], context);
2021-10-23 15:20:31 +00:00
auto * arg_value_literal = arg_value_ast->as<ASTLiteral>();
if (arg_value_literal)
{
auto arg_value = arg_value_literal->value;
if (arg_name == "host")
configuration.host = arg_value.safeGet<String>();
else if (arg_name == "port")
configuration.port = arg_value.safeGet<UInt64>();
else if (arg_name == "user")
configuration.username = arg_value.safeGet<String>();
else if (arg_name == "password")
configuration.password = arg_value.safeGet<String>();
else if (arg_name == "database")
configuration.database = arg_value.safeGet<String>();
else if (arg_name == "table")
configuration.table = arg_value.safeGet<String>();
else if (arg_name == "schema")
configuration.schema = arg_value.safeGet<String>();
else if (arg_name == "addresses_expr")
configuration.addresses_expr = arg_value.safeGet<String>();
else
non_common_args.emplace_back(std::make_pair(arg_name, arg_value_ast));
}
2021-09-01 17:59:11 +00:00
else
{
non_common_args.emplace_back(std::make_pair(arg_name, arg_value_ast));
}
2021-09-01 17:59:11 +00:00
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
}
}
2021-09-22 15:10:25 +00:00
ExternalDataSourceConfig source_config{ .configuration = configuration, .specific_args = non_common_args };
return source_config;
2021-09-01 17:59:11 +00:00
}
2021-09-15 22:45:43 +00:00
return std::nullopt;
2021-09-01 17:59:11 +00:00
}
static void validateConfigKeys(
const Poco::Util::AbstractConfiguration & dict_config, const String & config_prefix, const std::unordered_set<std::string_view> & allowed_keys)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
dict_config.keys(config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
2021-12-28 12:08:40 +00:00
if (allowed_keys.contains(config_key) || config_key.starts_with("replica"))
continue;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected key `{}` in dictionary source configuration", config_key);
}
}
2021-09-01 23:17:15 +00:00
2021-09-25 14:46:03 +00:00
std::optional<ExternalDataSourceConfiguration> getExternalDataSourceConfiguration(
2021-09-01 23:17:15 +00:00
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context)
{
validateConfigKeys(dict_config, dict_config_prefix, dictionary_allowed_keys);
2021-09-01 23:17:15 +00:00
ExternalDataSourceConfiguration configuration;
auto collection_name = dict_config.getString(dict_config_prefix + ".name", "");
if (!collection_name.empty())
{
const auto & config = context->getConfigRef();
const auto & collection_prefix = fmt::format("named_collections.{}", collection_name);
validateConfigKeys(dict_config, collection_prefix, dictionary_allowed_keys);
2021-09-01 23:17:15 +00:00
if (!config.has(collection_prefix))
2021-09-01 23:17:15 +00:00
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
configuration.host = dict_config.getString(dict_config_prefix + ".host", config.getString(collection_prefix + ".host", ""));
configuration.port = dict_config.getInt(dict_config_prefix + ".port", config.getUInt(collection_prefix + ".port", 0));
configuration.username = dict_config.getString(dict_config_prefix + ".user", config.getString(collection_prefix + ".user", ""));
configuration.password = dict_config.getString(dict_config_prefix + ".password", config.getString(collection_prefix + ".password", ""));
configuration.database = dict_config.getString(dict_config_prefix + ".db", config.getString(dict_config_prefix + ".database",
config.getString(collection_prefix + ".db", config.getString(collection_prefix + ".database", ""))));
configuration.table = dict_config.getString(dict_config_prefix + ".table", config.getString(collection_prefix + ".table", ""));
configuration.schema = dict_config.getString(dict_config_prefix + ".schema", config.getString(collection_prefix + ".schema", ""));
2021-09-01 23:17:15 +00:00
if (configuration.host.empty() || configuration.port == 0 || configuration.username.empty() || configuration.table.empty())
2021-09-01 23:17:15 +00:00
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
2021-12-28 12:08:40 +00:00
"Named collection of connection parameters is missing some of the parameters and dictionary parameters are not added");
2021-09-01 23:17:15 +00:00
}
2021-09-25 14:46:03 +00:00
return configuration;
2021-09-01 23:17:15 +00:00
}
2021-09-25 14:46:03 +00:00
return std::nullopt;
2021-09-01 23:17:15 +00:00
}
2021-09-03 11:16:32 +00:00
ExternalDataSourcesByPriority getExternalDataSourceConfigurationByPriority(
2021-09-01 23:17:15 +00:00
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context)
{
validateConfigKeys(dict_config, dict_config_prefix, dictionary_allowed_keys);
2021-09-25 14:46:03 +00:00
ExternalDataSourceConfiguration common_configuration;
auto named_collection = getExternalDataSourceConfiguration(dict_config, dict_config_prefix, context);
if (named_collection)
{
common_configuration = *named_collection;
}
else
{
common_configuration.host = dict_config.getString(dict_config_prefix + ".host", "");
common_configuration.port = dict_config.getUInt(dict_config_prefix + ".port", 0);
common_configuration.username = dict_config.getString(dict_config_prefix + ".user", "");
common_configuration.password = dict_config.getString(dict_config_prefix + ".password", "");
common_configuration.database = dict_config.getString(dict_config_prefix + ".db", dict_config.getString(dict_config_prefix + ".database", ""));
2021-09-25 14:46:03 +00:00
common_configuration.table = dict_config.getString(fmt::format("{}.table", dict_config_prefix), "");
common_configuration.schema = dict_config.getString(fmt::format("{}.schema", dict_config_prefix), "");
}
2021-09-02 13:01:26 +00:00
ExternalDataSourcesByPriority configuration
{
.database = common_configuration.database,
.table = common_configuration.table,
2021-09-10 11:11:52 +00:00
.schema = common_configuration.schema,
.replicas_configurations = {}
2021-09-02 13:01:26 +00:00
};
2021-09-01 23:17:15 +00:00
if (dict_config.has(dict_config_prefix + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
dict_config.keys(dict_config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (config_key.starts_with("replica"))
{
ExternalDataSourceConfiguration replica_configuration(common_configuration);
String replica_name = dict_config_prefix + "." + config_key;
validateConfigKeys(dict_config, replica_name, {"host", "port", "user", "password", "priority"});
2021-09-01 23:17:15 +00:00
size_t priority = dict_config.getInt(replica_name + ".priority", 0);
2021-09-01 23:17:15 +00:00
replica_configuration.host = dict_config.getString(replica_name + ".host", common_configuration.host);
replica_configuration.port = dict_config.getUInt(replica_name + ".port", common_configuration.port);
replica_configuration.username = dict_config.getString(replica_name + ".user", common_configuration.username);
replica_configuration.password = dict_config.getString(replica_name + ".password", common_configuration.password);
if (replica_configuration.host.empty() || replica_configuration.port == 0
|| replica_configuration.username.empty() || replica_configuration.password.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some of the parameters and no other dictionary parameters are added");
}
2021-09-02 13:01:26 +00:00
configuration.replicas_configurations[priority].emplace_back(replica_configuration);
2021-09-01 23:17:15 +00:00
}
}
}
else
{
2021-09-02 13:01:26 +00:00
configuration.replicas_configurations[0].emplace_back(common_configuration);
2021-09-01 23:17:15 +00:00
}
2021-09-02 13:01:26 +00:00
return configuration;
2021-09-01 23:17:15 +00:00
}
2021-09-07 11:17:25 +00:00
2021-09-15 22:45:43 +00:00
void URLBasedDataSourceConfiguration::set(const URLBasedDataSourceConfiguration & conf)
{
url = conf.url;
format = conf.format;
compression_method = conf.compression_method;
structure = conf.structure;
2021-10-28 12:44:12 +00:00
http_method = conf.http_method;
2021-09-15 22:45:43 +00:00
}
2021-09-22 15:10:25 +00:00
std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const ASTs & args, ContextPtr context)
2021-09-07 11:17:25 +00:00
{
if (args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
URLBasedDataSourceConfiguration configuration;
2021-09-22 15:10:25 +00:00
StorageSpecificArgs non_common_args;
2021-09-07 11:17:25 +00:00
if (const auto * collection = typeid_cast<const ASTIdentifier *>(args[0].get()))
{
const auto & config = context->getConfigRef();
auto config_prefix = fmt::format("named_collections.{}", collection->name());
if (!config.has(config_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name());
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
for (const auto & key : keys)
{
if (key == "url")
2021-09-09 09:18:08 +00:00
{
2021-09-07 11:17:25 +00:00
configuration.url = config.getString(config_prefix + ".url", "");
2021-09-09 09:18:08 +00:00
}
2021-10-26 09:31:01 +00:00
else if (key == "method")
{
2021-10-28 12:44:12 +00:00
configuration.http_method = config.getString(config_prefix + ".method", "");
2021-10-26 09:31:01 +00:00
}
else if (key == "format")
{
configuration.format = config.getString(config_prefix + ".format", "");
}
else if (key == "structure")
{
configuration.structure = config.getString(config_prefix + ".structure", "");
}
2021-09-09 09:18:08 +00:00
else if (key == "headers")
2021-09-07 11:17:25 +00:00
{
Poco::Util::AbstractConfiguration::Keys header_keys;
2021-09-15 18:11:49 +00:00
config.keys(config_prefix + ".headers", header_keys);
2021-09-07 11:17:25 +00:00
for (const auto & header : header_keys)
{
const auto header_prefix = config_prefix + ".headers." + header;
configuration.headers.emplace_back(std::make_pair(config.getString(header_prefix + ".name"), config.getString(header_prefix + ".value")));
}
}
else
{
auto value = config.getString(config_prefix + '.' + key);
non_common_args.emplace_back(std::make_pair(key, std::make_shared<ASTLiteral>(value)));
}
2021-09-07 11:17:25 +00:00
}
/// Check key-value arguments.
2021-09-07 11:17:25 +00:00
for (size_t i = 1; i < args.size(); ++i)
{
if (const auto * ast_function = typeid_cast<const ASTFunction *>(args[i].get()))
{
const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_function->arguments.get());
auto function_args = args_expr->children;
if (function_args.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
auto arg_value_ast = evaluateConstantExpressionOrIdentifierAsLiteral(function_args[1], context);
2021-10-23 15:20:31 +00:00
auto arg_value = arg_value_ast->as<ASTLiteral>()->value;
2021-09-07 11:17:25 +00:00
if (arg_name == "url")
configuration.url = arg_value.safeGet<String>();
2021-11-01 09:52:27 +00:00
else if (arg_name == "method")
2021-10-28 12:44:12 +00:00
configuration.http_method = arg_value.safeGet<String>();
2021-09-07 11:17:25 +00:00
else if (arg_name == "format")
configuration.format = arg_value.safeGet<String>();
else if (arg_name == "compression_method")
configuration.compression_method = arg_value.safeGet<String>();
else if (arg_name == "structure")
configuration.structure = arg_value.safeGet<String>();
else
non_common_args.emplace_back(std::make_pair(arg_name, arg_value_ast));
2021-09-07 11:17:25 +00:00
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
}
}
if (configuration.url.empty() || configuration.format.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
2021-09-09 09:18:08 +00:00
"Storage requires {}", configuration.url.empty() ? "url" : "format");
2021-09-07 11:17:25 +00:00
2021-09-22 15:10:25 +00:00
URLBasedDataSourceConfig source_config{ .configuration = configuration, .specific_args = non_common_args };
return source_config;
2021-09-07 11:17:25 +00:00
}
2021-09-15 22:45:43 +00:00
return std::nullopt;
2021-09-07 11:17:25 +00:00
}
template<typename T>
bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<T> & settings, ContextPtr context)
{
if (args.empty())
return false;
if (const auto * collection = typeid_cast<const ASTIdentifier *>(args[0].get()))
{
const auto & config = context->getConfigRef();
const auto & config_prefix = fmt::format("named_collections.{}", collection->name());
if (!config.has(config_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name());
SettingsChanges config_settings;
for (const auto & setting : settings.all())
{
const auto & setting_name = setting.getName();
auto setting_value = config.getString(config_prefix + '.' + setting_name, "");
if (!setting_value.empty())
config_settings.emplace_back(setting_name, setting_value);
}
/// Check key-value arguments.
for (size_t i = 1; i < args.size(); ++i)
{
if (const auto * ast_function = typeid_cast<const ASTFunction *>(args[i].get()))
{
const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_function->arguments.get());
auto function_args = args_expr->children;
if (function_args.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
auto arg_value_ast = evaluateConstantExpressionOrIdentifierAsLiteral(function_args[1], context);
auto arg_value = arg_value_ast->as<ASTLiteral>()->value;
config_settings.emplace_back(arg_name, arg_value);
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
}
}
settings.applyChanges(config_settings);
return true;
}
return false;
}
#if USE_AMQPCPP
template
bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<RabbitMQSettingsTraits> & settings, ContextPtr context);
#endif
#if USE_RDKAFKA
template
bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<KafkaSettingsTraits> & settings, ContextPtr context);
#endif
2021-09-01 17:59:11 +00:00
}