From 55dfaef4decd8da8b7bf0efc33e90dec82f229f5 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 27 Dec 2021 17:41:37 +0300 Subject: [PATCH] Settings changes as key value --- src/Databases/DatabaseFactory.cpp | 11 +++--- .../ExternalDataSourceConfiguration.cpp | 34 +++++++++++++++++-- .../ExternalDataSourceConfiguration.h | 12 +++++-- src/Storages/MySQL/MySQLHelpers.cpp | 8 +++++ src/Storages/MySQL/MySQLSettings.h | 5 ++- src/Storages/StorageExternalDistributed.cpp | 2 +- src/Storages/StorageMongoDB.cpp | 2 +- src/Storages/StorageMySQL.cpp | 12 ++++--- src/Storages/StorageMySQL.h | 2 +- src/Storages/StoragePostgreSQL.cpp | 2 +- src/TableFunctions/TableFunctionMySQL.cpp | 2 +- src/TableFunctions/TableFunctionRemote.cpp | 2 +- tests/integration/test_storage_mysql/test.py | 2 ++ 13 files changed, 74 insertions(+), 22 deletions(-) diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 3f6cb49fda7..5cc334eaad4 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -156,13 +156,15 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String StorageMySQLConfiguration configuration; ASTs & arguments = engine->arguments->children; + MySQLSettings mysql_settings; - if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true)) + if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, mysql_settings)) { - auto [common_configuration, storage_specific_args] = named_collection.value(); + auto [common_configuration, storage_specific_args, settings_changes] = named_collection.value(); configuration.set(common_configuration); configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; + mysql_settings.applyChanges(settings_changes); if (!storage_specific_args.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, @@ -200,7 +202,6 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String if (engine_name == "MySQL") { auto mysql_database_settings = std::make_unique(); - MySQLSettings mysql_settings; auto mysql_pool = createMySQLPoolWithFailover(configuration, mysql_settings); mysql_database_settings->loadFromQueryContext(context); @@ -299,7 +300,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true)) { - auto [common_configuration, storage_specific_args] = named_collection.value(); + auto [common_configuration, storage_specific_args, _] = named_collection.value(); configuration.set(common_configuration); configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; @@ -358,7 +359,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true)) { - auto [common_configuration, storage_specific_args] = named_collection.value(); + auto [common_configuration, storage_specific_args, _] = named_collection.value(); configuration.set(common_configuration); if (!storage_specific_args.empty()) diff --git a/src/Storages/ExternalDataSourceConfiguration.cpp b/src/Storages/ExternalDataSourceConfiguration.cpp index 42b3b148551..f96e199ff6e 100644 --- a/src/Storages/ExternalDataSourceConfiguration.cpp +++ b/src/Storages/ExternalDataSourceConfiguration.cpp @@ -15,6 +15,9 @@ #if USE_RDKAFKA #include #endif +#if USE_MYSQL +#include +#endif namespace DB { @@ -24,6 +27,8 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +IMPLEMENT_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS) + String ExternalDataSourceConfiguration::toString() const { WriteBufferFromOwnString configuration_info; @@ -59,7 +64,9 @@ void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration } -std::optional getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection) +template +std::optional getExternalDataSourceConfiguration( + const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings & storage_settings) { if (args.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); @@ -82,6 +89,15 @@ std::optional getExternalDataSourceConfiguration(const throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name()); } + SettingsChanges config_settings; + for (const auto & setting : storage_settings.all()) + { + const auto & setting_name = setting.getName(); + auto setting_value = config.getString(collection_prefix + '.' + setting_name, ""); + if (!setting_value.empty()) + config_settings.emplace_back(setting_name, setting_value); + } + configuration.host = config.getString(collection_prefix + ".host", ""); configuration.port = config.getInt(collection_prefix + ".port", 0); configuration.username = config.getString(collection_prefix + ".user", ""); @@ -123,6 +139,7 @@ std::optional getExternalDataSourceConfiguration(const if (arg_value_literal) { auto arg_value = arg_value_literal->value; + if (arg_name == "host") configuration.host = arg_value.safeGet(); else if (arg_name == "port") @@ -139,6 +156,8 @@ std::optional getExternalDataSourceConfiguration(const configuration.schema = arg_value.safeGet(); else if (arg_name == "addresses_expr") configuration.addresses_expr = arg_value.safeGet(); + else if (storage_settings.has(arg_name)) + config_settings.emplace_back(arg_name, arg_value); else non_common_args.emplace_back(std::make_pair(arg_name, arg_value_ast)); } @@ -153,8 +172,7 @@ std::optional getExternalDataSourceConfiguration(const } } - ExternalDataSourceConfig source_config{ .configuration = configuration, .specific_args = non_common_args }; - return source_config; + return ExternalDataSourceInfo{ .configuration = configuration, .specific_args = non_common_args, .settings_changes = config_settings }; } return std::nullopt; } @@ -425,4 +443,14 @@ bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings & settings, ContextPtr context); #endif + +template +std::optional getExternalDataSourceConfiguration( + const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings & storage_settings); + +#if USE_MYSQL +template +std::optional getExternalDataSourceConfiguration( + const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings & storage_settings); +#endif } diff --git a/src/Storages/ExternalDataSourceConfiguration.h b/src/Storages/ExternalDataSourceConfiguration.h index 502f8b800e3..dd83215c79f 100644 --- a/src/Storages/ExternalDataSourceConfiguration.h +++ b/src/Storages/ExternalDataSourceConfiguration.h @@ -7,6 +7,11 @@ namespace DB { +#define EMPTY_SETTINGS(M) +DECLARE_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS) + +struct EmptySettings : public BaseSettings {}; + struct ExternalDataSourceConfiguration { String host; @@ -46,10 +51,11 @@ struct StorageMongoDBConfiguration : ExternalDataSourceConfiguration using StorageSpecificArgs = std::vector>; -struct ExternalDataSourceConfig +struct ExternalDataSourceInfo { ExternalDataSourceConfiguration configuration; StorageSpecificArgs specific_args; + SettingsChanges settings_changes; }; /* If there is a storage engine's configuration specified in the named_collections, @@ -62,7 +68,9 @@ struct ExternalDataSourceConfig * Any key-value engine argument except common (`host`, `port`, `username`, `password`, `database`) * is returned in EngineArgs struct. */ -std::optional getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine = false, bool throw_on_no_collection = true); +template +std::optional getExternalDataSourceConfiguration( + const ASTs & args, ContextPtr context, bool is_database_engine = false, bool throw_on_no_collection = true, const BaseSettings & storage_settings = {}); std::optional getExternalDataSourceConfiguration( const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context); diff --git a/src/Storages/MySQL/MySQLHelpers.cpp b/src/Storages/MySQL/MySQLHelpers.cpp index e7745e6c0bb..edeb4ffca8a 100644 --- a/src/Storages/MySQL/MySQLHelpers.cpp +++ b/src/Storages/MySQL/MySQLHelpers.cpp @@ -8,9 +8,17 @@ namespace DB { +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings) { + if (!mysql_settings.connection_pool_size) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size"); + return mysqlxx::PoolWithFailover( configuration.database, configuration.addresses, configuration.username, configuration.password, MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, diff --git a/src/Storages/MySQL/MySQLSettings.h b/src/Storages/MySQL/MySQLSettings.h index aa2c2703d6b..be1e09c12e6 100644 --- a/src/Storages/MySQL/MySQLSettings.h +++ b/src/Storages/MySQL/MySQLSettings.h @@ -25,11 +25,14 @@ class ASTStorage; DECLARE_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS) +using MySQLBaseSettings = BaseSettings; + /** Settings for the MySQL family of engines. */ -struct MySQLSettings : public BaseSettings +struct MySQLSettings : public MySQLBaseSettings { void loadFromQuery(ASTStorage & storage_def); }; + } diff --git a/src/Storages/StorageExternalDistributed.cpp b/src/Storages/StorageExternalDistributed.cpp index 927c070826b..40a2ad0b85e 100644 --- a/src/Storages/StorageExternalDistributed.cpp +++ b/src/Storages/StorageExternalDistributed.cpp @@ -272,7 +272,7 @@ void registerStorageExternalDistributed(StorageFactory & factory) ExternalDataSourceConfiguration configuration; if (auto named_collection = getExternalDataSourceConfiguration(inner_engine_args, args.getLocalContext())) { - auto [common_configuration, storage_specific_args] = named_collection.value(); + auto [common_configuration, storage_specific_args, _] = named_collection.value(); configuration.set(common_configuration); for (const auto & [name, value] : storage_specific_args) diff --git a/src/Storages/StorageMongoDB.cpp b/src/Storages/StorageMongoDB.cpp index 2c1b44d8685..9b25b44c0e7 100644 --- a/src/Storages/StorageMongoDB.cpp +++ b/src/Storages/StorageMongoDB.cpp @@ -117,7 +117,7 @@ StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, C StorageMongoDBConfiguration configuration; if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context)) { - auto [common_configuration, storage_specific_args] = named_collection.value(); + auto [common_configuration, storage_specific_args, _] = named_collection.value(); configuration.set(common_configuration); for (const auto & [arg_name, arg_value] : storage_specific_args) diff --git a/src/Storages/StorageMySQL.cpp b/src/Storages/StorageMySQL.cpp index 66adf3ae272..83cf2b07b21 100644 --- a/src/Storages/StorageMySQL.cpp +++ b/src/Storages/StorageMySQL.cpp @@ -238,15 +238,17 @@ SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMeta } -StorageMySQLConfiguration StorageMySQL::getConfiguration(ASTs engine_args, ContextPtr context_) +StorageMySQLConfiguration StorageMySQL::getConfiguration(ASTs engine_args, ContextPtr context_, MySQLBaseSettings & storage_settings) { StorageMySQLConfiguration configuration; - if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context_)) + if (auto named_collection = getExternalDataSourceConfiguration( + engine_args, context_, /* is_database_engine */false, /* throw_on_no_collection */true, storage_settings)) { - auto [common_configuration, storage_specific_args] = named_collection.value(); + auto [common_configuration, storage_specific_args, settings_changes] = named_collection.value(); configuration.set(common_configuration); configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; + storage_settings.applyChanges(settings_changes); for (const auto & [arg_name, arg_value] : storage_specific_args) { @@ -298,9 +300,9 @@ void registerStorageMySQL(StorageFactory & factory) { factory.registerStorage("MySQL", [](const StorageFactory::Arguments & args) { - auto configuration = StorageMySQL::getConfiguration(args.engine_args, args.getLocalContext()); - MySQLSettings mysql_settings; /// TODO: move some arguments from the arguments to the SETTINGS. + auto configuration = StorageMySQL::getConfiguration(args.engine_args, args.getLocalContext(), mysql_settings); + if (args.storage_def->settings) mysql_settings.loadFromQuery(*args.storage_def); diff --git a/src/Storages/StorageMySQL.h b/src/Storages/StorageMySQL.h index cc3673e50ca..fe2ee8439bc 100644 --- a/src/Storages/StorageMySQL.h +++ b/src/Storages/StorageMySQL.h @@ -53,7 +53,7 @@ public: SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; - static StorageMySQLConfiguration getConfiguration(ASTs engine_args, ContextPtr context_); + static StorageMySQLConfiguration getConfiguration(ASTs engine_args, ContextPtr context_, MySQLBaseSettings & storage_settings); private: friend class StorageMySQLSink; diff --git a/src/Storages/StoragePostgreSQL.cpp b/src/Storages/StoragePostgreSQL.cpp index 8327bb92a38..5042f911149 100644 --- a/src/Storages/StoragePostgreSQL.cpp +++ b/src/Storages/StoragePostgreSQL.cpp @@ -390,7 +390,7 @@ StoragePostgreSQLConfiguration StoragePostgreSQL::getConfiguration(ASTs engine_a StoragePostgreSQLConfiguration configuration; if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context)) { - auto [common_configuration, storage_specific_args] = named_collection.value(); + auto [common_configuration, storage_specific_args, _] = named_collection.value(); configuration.set(common_configuration); configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; diff --git a/src/TableFunctions/TableFunctionMySQL.cpp b/src/TableFunctions/TableFunctionMySQL.cpp index e959fa754c9..cfed24caef6 100644 --- a/src/TableFunctions/TableFunctionMySQL.cpp +++ b/src/TableFunctions/TableFunctionMySQL.cpp @@ -37,8 +37,8 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr if (!args_func.arguments) throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR); - configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context); MySQLSettings mysql_settings; + configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context, mysql_settings); const auto & settings = context->getSettingsRef(); mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec; mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec; diff --git a/src/TableFunctions/TableFunctionRemote.cpp b/src/TableFunctions/TableFunctionRemote.cpp index f7af6bee7d9..85857011616 100644 --- a/src/TableFunctions/TableFunctionRemote.cpp +++ b/src/TableFunctions/TableFunctionRemote.cpp @@ -60,7 +60,7 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, ContextPtr * Specific args (remote): sharding_key, or database (in case it is not ASTLiteral). * None of the common arguments is empty at this point, it is checked in getExternalDataSourceConfiguration. */ - auto [common_configuration, storage_specific_args] = named_collection.value(); + auto [common_configuration, storage_specific_args, _] = named_collection.value(); configuration.set(common_configuration); for (const auto & [arg_name, arg_value] : storage_specific_args) diff --git a/tests/integration/test_storage_mysql/test.py b/tests/integration/test_storage_mysql/test.py index c0ba0d8735e..59be9f5f879 100644 --- a/tests/integration/test_storage_mysql/test.py +++ b/tests/integration/test_storage_mysql/test.py @@ -418,6 +418,8 @@ def test_predefined_connection_configuration(started_cluster): ''') assert (node1.query(f"SELECT count() FROM test_table").rstrip() == '100') + assert 'Connection pool cannot have zero size' in node1.query_and_get_error(f"SELECT count() FROM mysql(mysql1, table='test_table', connection_pool_size=0)").rstrip() + # Regression for (k, v) IN ((k, v)) def test_mysql_in(started_cluster):