This commit is contained in:
kssenii 2023-04-13 19:33:58 +02:00
parent 467ecf45e1
commit ad48e1d010
7 changed files with 112 additions and 37 deletions

View File

@ -188,7 +188,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, context))
{
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, false);
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, context, false);
}
else
{

View File

@ -21,6 +21,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/LocalDateTime.h>
#include <Common/parseRemoteDescription.h>
#include <Common/logger_useful.h>
#include "readInvalidateQuery.h"
@ -37,7 +38,7 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
static const ValidateKeysMultiset<ExternalDatabaseEqualKeysSet> dictionary_allowed_keys = {
"host", "port", "user", "password",
"db", "database", "table", "schema",
"update_field", "invalidate_query", "priority",
@ -69,13 +70,42 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
std::shared_ptr<mysqlxx::PoolWithFailover> pool;
MySQLSettings mysql_settings;
StorageMySQL::Configuration configuration;
std::optional<MySQLDictionarySource::Configuration> dictionary_configuration;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix) : nullptr;
if (named_collection)
{
named_collection->remove("name");
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, mysql_settings);
global_context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
auto allowed_arguments{dictionary_allowed_keys};
for (const auto & setting : mysql_settings.all())
allowed_arguments.insert(setting.getName());
validateNamedCollection<ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>>(*named_collection, {}, allowed_arguments);
StorageMySQL::Configuration::Addresses addresses;
const auto addresses_expr = named_collection->getOrDefault<String>("addresses_expr", "");
if (addresses_expr.empty())
{
const auto host = named_collection->getAnyOrDefault<String>({"host", "hostname"}, "");
const auto port = static_cast<UInt16>(named_collection->get<UInt64>("port"));
addresses = {std::make_pair(host, port)};
}
else
{
size_t max_addresses = global_context->getSettingsRef().glob_expansion_max_elements;
addresses = parseRemoteDescriptionForExternalDatabase(addresses_expr, max_addresses, 3306);
}
for (auto & address : addresses)
global_context->getRemoteHostFilter().checkHostAndPort(address.first, toString(address.second));
dictionary_configuration.emplace(MySQLDictionarySource::Configuration{
.db = named_collection->getAnyOrDefault<String>({"database", "db"}, ""),
.table = named_collection->getOrDefault<String>("table", ""),
.query = named_collection->getOrDefault<String>("query", ""),
.where = named_collection->getOrDefault<String>("where", ""),
.invalidate_query = named_collection->getOrDefault<String>("invalidate_query", ""),
.update_field = named_collection->getOrDefault<String>("update_field", ""),
.update_lag = named_collection->getOrDefault<UInt64>("update_lag", 1),
.dont_check_update_time = named_collection->getOrDefault<bool>("dont_check_update_time", false),
});
const auto & settings = global_context->getSettingsRef();
if (!mysql_settings.isChanged("connect_timeout"))
@ -83,38 +113,42 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
if (!mysql_settings.isChanged("read_write_timeout"))
mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec;
pool = std::make_shared<mysqlxx::PoolWithFailover>(createMySQLPoolWithFailover(configuration, mysql_settings));
for (const auto & setting : mysql_settings.all())
{
const auto & setting_name = setting.getName();
if (named_collection->has(setting_name))
mysql_settings.set(setting_name, named_collection->get<String>(setting_name));
}
pool = std::make_shared<mysqlxx::PoolWithFailover>(
createMySQLPoolWithFailover(
dictionary_configuration->db,
addresses,
named_collection->getAnyOrDefault<String>({"user", "username"}, ""),
named_collection->getOrDefault<String>("password", ""),
mysql_settings));
}
else
{
if (created_from_ddl)
{
for (auto & address : configuration.addresses)
global_context->getRemoteHostFilter().checkHostAndPort(address.first, toString(address.second));
}
dictionary_configuration.emplace(MySQLDictionarySource::Configuration{
.db = config.getString(settings_config_prefix + ".db", ""),
.table = config.getString(settings_config_prefix + ".table", ""),
.query = config.getString(settings_config_prefix + ".query", ""),
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false)
});
configuration.database = config.getString(settings_config_prefix + ".db", "");
configuration.table = config.getString(settings_config_prefix + ".table", "");
pool = std::make_shared<mysqlxx::PoolWithFailover>(mysqlxx::PoolFactory::instance().get(config, settings_config_prefix));
pool = std::make_shared<mysqlxx::PoolWithFailover>(
mysqlxx::PoolFactory::instance().get(config, settings_config_prefix));
}
auto query = config.getString(settings_config_prefix + ".query", "");
if (query.empty() && configuration.table.empty())
if (dictionary_configuration->query.empty() && dictionary_configuration->table.empty())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "MySQL dictionary source configuration must contain table or query field");
MySQLDictionarySource::Configuration dictionary_configuration
{
.db = configuration.database,
.table = configuration.table,
.query = query,
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false)
};
return std::make_unique<MySQLDictionarySource>(dict_struct, dictionary_configuration, std::move(pool), sample_block, mysql_input_stream_settings);
return std::make_unique<MySQLDictionarySource>(dict_struct, *dictionary_configuration, std::move(pool), sample_block, mysql_input_stream_settings);
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Dictionary source of type `mysql` is disabled because ClickHouse was built without mysql support.");

View File

@ -13,12 +13,24 @@ namespace ErrorCodes
}
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings)
{
return createMySQLPoolWithFailover(
configuration.database, configuration.addresses,
configuration.username, configuration.password, mysql_settings);
}
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(
const std::string & database,
const StorageMySQL::Configuration::Addresses & addresses,
const std::string & username,
const std::string & password,
const MySQLSettings & mysql_settings)
{
if (!mysql_settings.connection_pool_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size");
return mysqlxx::PoolWithFailover(
configuration.database, configuration.addresses, configuration.username, configuration.password,
database, addresses, username, password,
MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
static_cast<unsigned>(mysql_settings.connection_pool_size),
mysql_settings.connection_max_tries,

View File

@ -10,8 +10,15 @@ namespace mysqlxx { class PoolWithFailover; }
namespace DB
{
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings);
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(
const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings);
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(
const std::string & database,
const StorageMySQL::Configuration::Addresses & addresses,
const std::string & username,
const std::string & password,
const MySQLSettings & mysql_settings);
}
#endif

View File

@ -238,7 +238,7 @@ SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMeta
}
StorageMySQL::Configuration StorageMySQL::processNamedCollectionResult(
const NamedCollection & named_collection, MySQLSettings & storage_settings, bool require_table)
const NamedCollection & named_collection, MySQLSettings & storage_settings, ContextPtr context_, bool require_table)
{
StorageMySQL::Configuration configuration;
@ -255,10 +255,16 @@ StorageMySQL::Configuration StorageMySQL::processNamedCollectionResult(
configuration.addresses_expr = named_collection.getOrDefault<String>("addresses_expr", "");
if (configuration.addresses_expr.empty())
{
configuration.host = named_collection.getOrDefault<String>("host", named_collection.getOrDefault<String>("hostname", ""));
configuration.host = named_collection.getAnyOrDefault<String>({"host", "hostname"}, "");
configuration.port = static_cast<UInt16>(named_collection.get<UInt64>("port"));
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
}
else
{
size_t max_addresses = context_->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(
configuration.addresses_expr, max_addresses, 3306);
}
configuration.username = named_collection.getAny<String>({"username", "user"});
configuration.password = named_collection.get<String>("password");
@ -283,7 +289,7 @@ StorageMySQL::Configuration StorageMySQL::getConfiguration(ASTs engine_args, Con
StorageMySQL::Configuration configuration;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context_))
{
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, storage_settings);
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, storage_settings, context_);
}
else
{

View File

@ -53,6 +53,8 @@ public:
struct Configuration
{
using Addresses = std::vector<std::pair<String, UInt16>>;
String host;
UInt16 port = 0;
String username = "default";
@ -63,14 +65,15 @@ public:
bool replace_query = false;
String on_duplicate_clause;
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
Addresses addresses; /// Failover replicas.
String addresses_expr;
};
static Configuration getConfiguration(ASTs engine_args, ContextPtr context_, MySQLSettings & storage_settings);
static Configuration processNamedCollectionResult(
const NamedCollection & named_collection, MySQLSettings & storage_settings, bool require_table = true);
const NamedCollection & named_collection, MySQLSettings & storage_settings,
ContextPtr context_, bool require_table = true);
private:
friend class StorageMySQLSink;

View File

@ -309,6 +309,19 @@ def test_predefined_connection_configuration(started_cluster):
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
assert int(result) == 200
instance.query(
"""
DROP DICTIONARY IF EXISTS dict;
CREATE DICTIONARY dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(MYSQL(NAME mysql4 connection_pool_size 1 close_connection 1 share_connection 1))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
"""
)
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
assert int(result) == 200
def create_mysql_db(mysql_connection, name):
with mysql_connection.cursor() as cursor: