diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 89a799349bf..199bae4fbb4 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -188,7 +188,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, context)) { - configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, false); + configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, context, false); } else { diff --git a/src/Dictionaries/MySQLDictionarySource.cpp b/src/Dictionaries/MySQLDictionarySource.cpp index 82a2762e61e..730217f96b7 100644 --- a/src/Dictionaries/MySQLDictionarySource.cpp +++ b/src/Dictionaries/MySQLDictionarySource.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "readInvalidateQuery.h" @@ -37,7 +38,7 @@ namespace ErrorCodes extern const int UNSUPPORTED_METHOD; } -static const std::unordered_set dictionary_allowed_keys = { +static const ValidateKeysMultiset dictionary_allowed_keys = { "host", "port", "user", "password", "db", "database", "table", "schema", "update_field", "invalidate_query", "priority", @@ -69,13 +70,42 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory) std::shared_ptr pool; MySQLSettings mysql_settings; - StorageMySQL::Configuration configuration; + std::optional dictionary_configuration; auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix) : nullptr; if (named_collection) { - named_collection->remove("name"); - configuration = StorageMySQL::processNamedCollectionResult(*named_collection, mysql_settings); - global_context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port)); + auto allowed_arguments{dictionary_allowed_keys}; + for (const auto & setting : mysql_settings.all()) + allowed_arguments.insert(setting.getName()); + validateNamedCollection>(*named_collection, {}, allowed_arguments); + + StorageMySQL::Configuration::Addresses addresses; + const auto addresses_expr = named_collection->getOrDefault("addresses_expr", ""); + if (addresses_expr.empty()) + { + const auto host = named_collection->getAnyOrDefault({"host", "hostname"}, ""); + const auto port = static_cast(named_collection->get("port")); + addresses = {std::make_pair(host, port)}; + } + else + { + size_t max_addresses = global_context->getSettingsRef().glob_expansion_max_elements; + addresses = parseRemoteDescriptionForExternalDatabase(addresses_expr, max_addresses, 3306); + } + + for (auto & address : addresses) + global_context->getRemoteHostFilter().checkHostAndPort(address.first, toString(address.second)); + + dictionary_configuration.emplace(MySQLDictionarySource::Configuration{ + .db = named_collection->getAnyOrDefault({"database", "db"}, ""), + .table = named_collection->getOrDefault("table", ""), + .query = named_collection->getOrDefault("query", ""), + .where = named_collection->getOrDefault("where", ""), + .invalidate_query = named_collection->getOrDefault("invalidate_query", ""), + .update_field = named_collection->getOrDefault("update_field", ""), + .update_lag = named_collection->getOrDefault("update_lag", 1), + .dont_check_update_time = named_collection->getOrDefault("dont_check_update_time", false), + }); const auto & settings = global_context->getSettingsRef(); if (!mysql_settings.isChanged("connect_timeout")) @@ -83,38 +113,42 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory) if (!mysql_settings.isChanged("read_write_timeout")) mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec; - pool = std::make_shared(createMySQLPoolWithFailover(configuration, mysql_settings)); + for (const auto & setting : mysql_settings.all()) + { + const auto & setting_name = setting.getName(); + if (named_collection->has(setting_name)) + mysql_settings.set(setting_name, named_collection->get(setting_name)); + } + + pool = std::make_shared( + createMySQLPoolWithFailover( + dictionary_configuration->db, + addresses, + named_collection->getAnyOrDefault({"user", "username"}, ""), + named_collection->getOrDefault("password", ""), + mysql_settings)); } else { - if (created_from_ddl) - { - for (auto & address : configuration.addresses) - global_context->getRemoteHostFilter().checkHostAndPort(address.first, toString(address.second)); - } + dictionary_configuration.emplace(MySQLDictionarySource::Configuration{ + .db = config.getString(settings_config_prefix + ".db", ""), + .table = config.getString(settings_config_prefix + ".table", ""), + .query = config.getString(settings_config_prefix + ".query", ""), + .where = config.getString(settings_config_prefix + ".where", ""), + .invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""), + .update_field = config.getString(settings_config_prefix + ".update_field", ""), + .update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1), + .dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false) + }); - configuration.database = config.getString(settings_config_prefix + ".db", ""); - configuration.table = config.getString(settings_config_prefix + ".table", ""); - pool = std::make_shared(mysqlxx::PoolFactory::instance().get(config, settings_config_prefix)); + pool = std::make_shared( + mysqlxx::PoolFactory::instance().get(config, settings_config_prefix)); } - auto query = config.getString(settings_config_prefix + ".query", ""); - if (query.empty() && configuration.table.empty()) + if (dictionary_configuration->query.empty() && dictionary_configuration->table.empty()) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "MySQL dictionary source configuration must contain table or query field"); - MySQLDictionarySource::Configuration dictionary_configuration - { - .db = configuration.database, - .table = configuration.table, - .query = query, - .where = config.getString(settings_config_prefix + ".where", ""), - .invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""), - .update_field = config.getString(settings_config_prefix + ".update_field", ""), - .update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1), - .dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false) - }; - - return std::make_unique(dict_struct, dictionary_configuration, std::move(pool), sample_block, mysql_input_stream_settings); + return std::make_unique(dict_struct, *dictionary_configuration, std::move(pool), sample_block, mysql_input_stream_settings); #else throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Dictionary source of type `mysql` is disabled because ClickHouse was built without mysql support."); diff --git a/src/IO/S3/Credentials.cpp b/src/IO/S3/Credentials.cpp index b6b264e274e..5f494ff55b8 100644 --- a/src/IO/S3/Credentials.cpp +++ b/src/IO/S3/Credentials.cpp @@ -270,7 +270,6 @@ void AWSInstanceProfileCredentialsProvider::Reload() void AWSInstanceProfileCredentialsProvider::refreshIfExpired() { - LOG_DEBUG(logger, "Checking if latest credential pull has expired."); Aws::Utils::Threading::ReaderLockGuard guard(m_reloadLock); if (!IsTimeToRefresh(load_frequency_ms)) { diff --git a/src/Storages/MySQL/MySQLHelpers.cpp b/src/Storages/MySQL/MySQLHelpers.cpp index 63a3436ea4a..e9ad18ee3ac 100644 --- a/src/Storages/MySQL/MySQLHelpers.cpp +++ b/src/Storages/MySQL/MySQLHelpers.cpp @@ -13,12 +13,24 @@ namespace ErrorCodes } mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings) +{ + return createMySQLPoolWithFailover( + configuration.database, configuration.addresses, + configuration.username, configuration.password, mysql_settings); +} + +mysqlxx::PoolWithFailover createMySQLPoolWithFailover( + const std::string & database, + const StorageMySQL::Configuration::Addresses & addresses, + const std::string & username, + const std::string & password, + const MySQLSettings & mysql_settings) { if (!mysql_settings.connection_pool_size) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size"); return mysqlxx::PoolWithFailover( - configuration.database, configuration.addresses, configuration.username, configuration.password, + database, addresses, username, password, MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, static_cast(mysql_settings.connection_pool_size), mysql_settings.connection_max_tries, diff --git a/src/Storages/MySQL/MySQLHelpers.h b/src/Storages/MySQL/MySQLHelpers.h index 71c331da16f..ebe00bd4cd1 100644 --- a/src/Storages/MySQL/MySQLHelpers.h +++ b/src/Storages/MySQL/MySQLHelpers.h @@ -10,8 +10,15 @@ namespace mysqlxx { class PoolWithFailover; } namespace DB { -mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings); +mysqlxx::PoolWithFailover createMySQLPoolWithFailover( + const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings); +mysqlxx::PoolWithFailover createMySQLPoolWithFailover( + const std::string & database, + const StorageMySQL::Configuration::Addresses & addresses, + const std::string & username, + const std::string & password, + const MySQLSettings & mysql_settings); } #endif diff --git a/src/Storages/StorageMySQL.cpp b/src/Storages/StorageMySQL.cpp index c389f7d9f7f..232ff87d9ed 100644 --- a/src/Storages/StorageMySQL.cpp +++ b/src/Storages/StorageMySQL.cpp @@ -238,7 +238,7 @@ SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMeta } StorageMySQL::Configuration StorageMySQL::processNamedCollectionResult( - const NamedCollection & named_collection, MySQLSettings & storage_settings, bool require_table) + const NamedCollection & named_collection, MySQLSettings & storage_settings, ContextPtr context_, bool require_table) { StorageMySQL::Configuration configuration; @@ -255,10 +255,16 @@ StorageMySQL::Configuration StorageMySQL::processNamedCollectionResult( configuration.addresses_expr = named_collection.getOrDefault("addresses_expr", ""); if (configuration.addresses_expr.empty()) { - configuration.host = named_collection.getOrDefault("host", named_collection.getOrDefault("hostname", "")); + configuration.host = named_collection.getAnyOrDefault({"host", "hostname"}, ""); configuration.port = static_cast(named_collection.get("port")); configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; } + else + { + size_t max_addresses = context_->getSettingsRef().glob_expansion_max_elements; + configuration.addresses = parseRemoteDescriptionForExternalDatabase( + configuration.addresses_expr, max_addresses, 3306); + } configuration.username = named_collection.getAny({"username", "user"}); configuration.password = named_collection.get("password"); @@ -283,7 +289,7 @@ StorageMySQL::Configuration StorageMySQL::getConfiguration(ASTs engine_args, Con StorageMySQL::Configuration configuration; if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context_)) { - configuration = StorageMySQL::processNamedCollectionResult(*named_collection, storage_settings); + configuration = StorageMySQL::processNamedCollectionResult(*named_collection, storage_settings, context_); } else { diff --git a/src/Storages/StorageMySQL.h b/src/Storages/StorageMySQL.h index 1c0e2639717..ae3d2f935b6 100644 --- a/src/Storages/StorageMySQL.h +++ b/src/Storages/StorageMySQL.h @@ -53,6 +53,8 @@ public: struct Configuration { + using Addresses = std::vector>; + String host; UInt16 port = 0; String username = "default"; @@ -63,14 +65,15 @@ public: bool replace_query = false; String on_duplicate_clause; - std::vector> addresses; /// Failover replicas. + Addresses addresses; /// Failover replicas. String addresses_expr; }; static Configuration getConfiguration(ASTs engine_args, ContextPtr context_, MySQLSettings & storage_settings); static Configuration processNamedCollectionResult( - const NamedCollection & named_collection, MySQLSettings & storage_settings, bool require_table = true); + const NamedCollection & named_collection, MySQLSettings & storage_settings, + ContextPtr context_, bool require_table = true); private: friend class StorageMySQLSink; diff --git a/tests/integration/test_dictionaries_mysql/test.py b/tests/integration/test_dictionaries_mysql/test.py index 5c67a4c434a..a12139a0bea 100644 --- a/tests/integration/test_dictionaries_mysql/test.py +++ b/tests/integration/test_dictionaries_mysql/test.py @@ -309,6 +309,19 @@ def test_predefined_connection_configuration(started_cluster): result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))") assert int(result) == 200 + instance.query( + """ + DROP DICTIONARY IF EXISTS dict; + CREATE DICTIONARY dict (id UInt32, value UInt32) + PRIMARY KEY id + SOURCE(MYSQL(NAME mysql4 connection_pool_size 1 close_connection 1 share_connection 1)) + LIFETIME(MIN 1 MAX 2) + LAYOUT(HASHED()); + """ + ) + result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))") + assert int(result) == 200 + def create_mysql_db(mysql_connection, name): with mysql_connection.cursor() as cursor: