Settings changes as key value

This commit is contained in:
kssenii 2021-12-27 17:41:37 +03:00
parent 3cb37a11ae
commit 55dfaef4de
13 changed files with 74 additions and 22 deletions

View File

@ -156,13 +156,15 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
StorageMySQLConfiguration configuration; StorageMySQLConfiguration configuration;
ASTs & arguments = engine->arguments->children; ASTs & arguments = engine->arguments->children;
MySQLSettings mysql_settings;
if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true)) if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, mysql_settings))
{ {
auto [common_configuration, storage_specific_args] = named_collection.value(); auto [common_configuration, storage_specific_args, settings_changes] = named_collection.value();
configuration.set(common_configuration); configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
mysql_settings.applyChanges(settings_changes);
if (!storage_specific_args.empty()) if (!storage_specific_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -200,7 +202,6 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (engine_name == "MySQL") if (engine_name == "MySQL")
{ {
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>(); auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
MySQLSettings mysql_settings;
auto mysql_pool = createMySQLPoolWithFailover(configuration, mysql_settings); auto mysql_pool = createMySQLPoolWithFailover(configuration, mysql_settings);
mysql_database_settings->loadFromQueryContext(context); mysql_database_settings->loadFromQueryContext(context);
@ -299,7 +300,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true)) if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true))
{ {
auto [common_configuration, storage_specific_args] = named_collection.value(); auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration); configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
@ -358,7 +359,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true)) if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true))
{ {
auto [common_configuration, storage_specific_args] = named_collection.value(); auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration); configuration.set(common_configuration);
if (!storage_specific_args.empty()) if (!storage_specific_args.empty())

View File

@ -15,6 +15,9 @@
#if USE_RDKAFKA #if USE_RDKAFKA
#include <Storages/Kafka/KafkaSettings.h> #include <Storages/Kafka/KafkaSettings.h>
#endif #endif
#if USE_MYSQL
#include <Storages/MySQL/MySQLSettings.h>
#endif
namespace DB namespace DB
{ {
@ -24,6 +27,8 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
} }
IMPLEMENT_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
String ExternalDataSourceConfiguration::toString() const String ExternalDataSourceConfiguration::toString() const
{ {
WriteBufferFromOwnString configuration_info; WriteBufferFromOwnString configuration_info;
@ -59,7 +64,9 @@ void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration
} }
std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection) template <typename T>
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<T> & storage_settings)
{ {
if (args.empty()) if (args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
@ -82,6 +89,15 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name()); throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name());
} }
SettingsChanges config_settings;
for (const auto & setting : storage_settings.all())
{
const auto & setting_name = setting.getName();
auto setting_value = config.getString(collection_prefix + '.' + setting_name, "");
if (!setting_value.empty())
config_settings.emplace_back(setting_name, setting_value);
}
configuration.host = config.getString(collection_prefix + ".host", ""); configuration.host = config.getString(collection_prefix + ".host", "");
configuration.port = config.getInt(collection_prefix + ".port", 0); configuration.port = config.getInt(collection_prefix + ".port", 0);
configuration.username = config.getString(collection_prefix + ".user", ""); configuration.username = config.getString(collection_prefix + ".user", "");
@ -123,6 +139,7 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
if (arg_value_literal) if (arg_value_literal)
{ {
auto arg_value = arg_value_literal->value; auto arg_value = arg_value_literal->value;
if (arg_name == "host") if (arg_name == "host")
configuration.host = arg_value.safeGet<String>(); configuration.host = arg_value.safeGet<String>();
else if (arg_name == "port") else if (arg_name == "port")
@ -139,6 +156,8 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
configuration.schema = arg_value.safeGet<String>(); configuration.schema = arg_value.safeGet<String>();
else if (arg_name == "addresses_expr") else if (arg_name == "addresses_expr")
configuration.addresses_expr = arg_value.safeGet<String>(); configuration.addresses_expr = arg_value.safeGet<String>();
else if (storage_settings.has(arg_name))
config_settings.emplace_back(arg_name, arg_value);
else else
non_common_args.emplace_back(std::make_pair(arg_name, arg_value_ast)); non_common_args.emplace_back(std::make_pair(arg_name, arg_value_ast));
} }
@ -153,8 +172,7 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
} }
} }
ExternalDataSourceConfig source_config{ .configuration = configuration, .specific_args = non_common_args }; return ExternalDataSourceInfo{ .configuration = configuration, .specific_args = non_common_args, .settings_changes = config_settings };
return source_config;
} }
return std::nullopt; return std::nullopt;
} }
@ -425,4 +443,14 @@ bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<RabbitMQ
template template
bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<KafkaSettingsTraits> & settings, ContextPtr context); bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<KafkaSettingsTraits> & settings, ContextPtr context);
#endif #endif
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<EmptySettingsTraits> & storage_settings);
#if USE_MYSQL
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<MySQLSettingsTraits> & storage_settings);
#endif
} }

View File

@ -7,6 +7,11 @@
namespace DB namespace DB
{ {
#define EMPTY_SETTINGS(M)
DECLARE_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
struct EmptySettings : public BaseSettings<EmptySettingsTraits> {};
struct ExternalDataSourceConfiguration struct ExternalDataSourceConfiguration
{ {
String host; String host;
@ -46,10 +51,11 @@ struct StorageMongoDBConfiguration : ExternalDataSourceConfiguration
using StorageSpecificArgs = std::vector<std::pair<String, ASTPtr>>; using StorageSpecificArgs = std::vector<std::pair<String, ASTPtr>>;
struct ExternalDataSourceConfig struct ExternalDataSourceInfo
{ {
ExternalDataSourceConfiguration configuration; ExternalDataSourceConfiguration configuration;
StorageSpecificArgs specific_args; StorageSpecificArgs specific_args;
SettingsChanges settings_changes;
}; };
/* If there is a storage engine's configuration specified in the named_collections, /* If there is a storage engine's configuration specified in the named_collections,
@ -62,7 +68,9 @@ struct ExternalDataSourceConfig
* Any key-value engine argument except common (`host`, `port`, `username`, `password`, `database`) * Any key-value engine argument except common (`host`, `port`, `username`, `password`, `database`)
* is returned in EngineArgs struct. * is returned in EngineArgs struct.
*/ */
std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine = false, bool throw_on_no_collection = true); template <typename T = EmptySettingsTraits>
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine = false, bool throw_on_no_collection = true, const BaseSettings<T> & storage_settings = {});
std::optional<ExternalDataSourceConfiguration> getExternalDataSourceConfiguration( std::optional<ExternalDataSourceConfiguration> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context); const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);

View File

@ -8,9 +8,17 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
mysqlxx::PoolWithFailover mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings) createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings)
{ {
if (!mysql_settings.connection_pool_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size");
return mysqlxx::PoolWithFailover( return mysqlxx::PoolWithFailover(
configuration.database, configuration.addresses, configuration.username, configuration.password, configuration.database, configuration.addresses, configuration.username, configuration.password,
MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,

View File

@ -25,11 +25,14 @@ class ASTStorage;
DECLARE_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS) DECLARE_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS)
using MySQLBaseSettings = BaseSettings<MySQLSettingsTraits>;
/** Settings for the MySQL family of engines. /** Settings for the MySQL family of engines.
*/ */
struct MySQLSettings : public BaseSettings<MySQLSettingsTraits> struct MySQLSettings : public MySQLBaseSettings
{ {
void loadFromQuery(ASTStorage & storage_def); void loadFromQuery(ASTStorage & storage_def);
}; };
} }

View File

@ -272,7 +272,7 @@ void registerStorageExternalDistributed(StorageFactory & factory)
ExternalDataSourceConfiguration configuration; ExternalDataSourceConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(inner_engine_args, args.getLocalContext())) if (auto named_collection = getExternalDataSourceConfiguration(inner_engine_args, args.getLocalContext()))
{ {
auto [common_configuration, storage_specific_args] = named_collection.value(); auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration); configuration.set(common_configuration);
for (const auto & [name, value] : storage_specific_args) for (const auto & [name, value] : storage_specific_args)

View File

@ -117,7 +117,7 @@ StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, C
StorageMongoDBConfiguration configuration; StorageMongoDBConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context)) if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context))
{ {
auto [common_configuration, storage_specific_args] = named_collection.value(); auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration); configuration.set(common_configuration);
for (const auto & [arg_name, arg_value] : storage_specific_args) for (const auto & [arg_name, arg_value] : storage_specific_args)

View File

@ -238,15 +238,17 @@ SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMeta
} }
StorageMySQLConfiguration StorageMySQL::getConfiguration(ASTs engine_args, ContextPtr context_) StorageMySQLConfiguration StorageMySQL::getConfiguration(ASTs engine_args, ContextPtr context_, MySQLBaseSettings & storage_settings)
{ {
StorageMySQLConfiguration configuration; StorageMySQLConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context_)) if (auto named_collection = getExternalDataSourceConfiguration(
engine_args, context_, /* is_database_engine */false, /* throw_on_no_collection */true, storage_settings))
{ {
auto [common_configuration, storage_specific_args] = named_collection.value(); auto [common_configuration, storage_specific_args, settings_changes] = named_collection.value();
configuration.set(common_configuration); configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
storage_settings.applyChanges(settings_changes);
for (const auto & [arg_name, arg_value] : storage_specific_args) for (const auto & [arg_name, arg_value] : storage_specific_args)
{ {
@ -298,9 +300,9 @@ void registerStorageMySQL(StorageFactory & factory)
{ {
factory.registerStorage("MySQL", [](const StorageFactory::Arguments & args) factory.registerStorage("MySQL", [](const StorageFactory::Arguments & args)
{ {
auto configuration = StorageMySQL::getConfiguration(args.engine_args, args.getLocalContext());
MySQLSettings mysql_settings; /// TODO: move some arguments from the arguments to the SETTINGS. MySQLSettings mysql_settings; /// TODO: move some arguments from the arguments to the SETTINGS.
auto configuration = StorageMySQL::getConfiguration(args.engine_args, args.getLocalContext(), mysql_settings);
if (args.storage_def->settings) if (args.storage_def->settings)
mysql_settings.loadFromQuery(*args.storage_def); mysql_settings.loadFromQuery(*args.storage_def);

View File

@ -53,7 +53,7 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
static StorageMySQLConfiguration getConfiguration(ASTs engine_args, ContextPtr context_); static StorageMySQLConfiguration getConfiguration(ASTs engine_args, ContextPtr context_, MySQLBaseSettings & storage_settings);
private: private:
friend class StorageMySQLSink; friend class StorageMySQLSink;

View File

@ -390,7 +390,7 @@ StoragePostgreSQLConfiguration StoragePostgreSQL::getConfiguration(ASTs engine_a
StoragePostgreSQLConfiguration configuration; StoragePostgreSQLConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context)) if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context))
{ {
auto [common_configuration, storage_specific_args] = named_collection.value(); auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration); configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; configuration.addresses = {std::make_pair(configuration.host, configuration.port)};

View File

@ -37,8 +37,8 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
if (!args_func.arguments) if (!args_func.arguments)
throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR); throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR);
configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context);
MySQLSettings mysql_settings; MySQLSettings mysql_settings;
configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context, mysql_settings);
const auto & settings = context->getSettingsRef(); const auto & settings = context->getSettingsRef();
mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec; mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec;
mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec; mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec;

View File

@ -60,7 +60,7 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, ContextPtr
* Specific args (remote): sharding_key, or database (in case it is not ASTLiteral). * Specific args (remote): sharding_key, or database (in case it is not ASTLiteral).
* None of the common arguments is empty at this point, it is checked in getExternalDataSourceConfiguration. * None of the common arguments is empty at this point, it is checked in getExternalDataSourceConfiguration.
*/ */
auto [common_configuration, storage_specific_args] = named_collection.value(); auto [common_configuration, storage_specific_args, _] = named_collection.value();
configuration.set(common_configuration); configuration.set(common_configuration);
for (const auto & [arg_name, arg_value] : storage_specific_args) for (const auto & [arg_name, arg_value] : storage_specific_args)

View File

@ -418,6 +418,8 @@ def test_predefined_connection_configuration(started_cluster):
''') ''')
assert (node1.query(f"SELECT count() FROM test_table").rstrip() == '100') assert (node1.query(f"SELECT count() FROM test_table").rstrip() == '100')
assert 'Connection pool cannot have zero size' in node1.query_and_get_error(f"SELECT count() FROM mysql(mysql1, table='test_table', connection_pool_size=0)").rstrip()
# Regression for (k, v) IN ((k, v)) # Regression for (k, v) IN ((k, v))
def test_mysql_in(started_cluster): def test_mysql_in(started_cluster):