mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #48759 from kssenii/fix-mysql-named-collection
Fix some settings of mysql dictionary with named collection
This commit is contained in:
commit
8bc0a3a899
@ -188,7 +188,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
|
||||
if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, context))
|
||||
{
|
||||
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, false);
|
||||
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, context, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/LocalDateTime.h>
|
||||
#include <Common/parseRemoteDescription.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include "readInvalidateQuery.h"
|
||||
|
||||
@ -37,7 +38,7 @@ namespace ErrorCodes
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
}
|
||||
|
||||
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
|
||||
static const ValidateKeysMultiset<ExternalDatabaseEqualKeysSet> dictionary_allowed_keys = {
|
||||
"host", "port", "user", "password",
|
||||
"db", "database", "table", "schema",
|
||||
"update_field", "invalidate_query", "priority",
|
||||
@ -69,13 +70,42 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
|
||||
std::shared_ptr<mysqlxx::PoolWithFailover> pool;
|
||||
MySQLSettings mysql_settings;
|
||||
|
||||
StorageMySQL::Configuration configuration;
|
||||
std::optional<MySQLDictionarySource::Configuration> dictionary_configuration;
|
||||
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix) : nullptr;
|
||||
if (named_collection)
|
||||
{
|
||||
named_collection->remove("name");
|
||||
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, mysql_settings);
|
||||
global_context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
|
||||
auto allowed_arguments{dictionary_allowed_keys};
|
||||
for (const auto & setting : mysql_settings.all())
|
||||
allowed_arguments.insert(setting.getName());
|
||||
validateNamedCollection<ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>>(*named_collection, {}, allowed_arguments);
|
||||
|
||||
StorageMySQL::Configuration::Addresses addresses;
|
||||
const auto addresses_expr = named_collection->getOrDefault<String>("addresses_expr", "");
|
||||
if (addresses_expr.empty())
|
||||
{
|
||||
const auto host = named_collection->getAnyOrDefault<String>({"host", "hostname"}, "");
|
||||
const auto port = static_cast<UInt16>(named_collection->get<UInt64>("port"));
|
||||
addresses = {std::make_pair(host, port)};
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t max_addresses = global_context->getSettingsRef().glob_expansion_max_elements;
|
||||
addresses = parseRemoteDescriptionForExternalDatabase(addresses_expr, max_addresses, 3306);
|
||||
}
|
||||
|
||||
for (auto & address : addresses)
|
||||
global_context->getRemoteHostFilter().checkHostAndPort(address.first, toString(address.second));
|
||||
|
||||
dictionary_configuration.emplace(MySQLDictionarySource::Configuration{
|
||||
.db = named_collection->getAnyOrDefault<String>({"database", "db"}, ""),
|
||||
.table = named_collection->getOrDefault<String>("table", ""),
|
||||
.query = named_collection->getOrDefault<String>("query", ""),
|
||||
.where = named_collection->getOrDefault<String>("where", ""),
|
||||
.invalidate_query = named_collection->getOrDefault<String>("invalidate_query", ""),
|
||||
.update_field = named_collection->getOrDefault<String>("update_field", ""),
|
||||
.update_lag = named_collection->getOrDefault<UInt64>("update_lag", 1),
|
||||
.dont_check_update_time = named_collection->getOrDefault<bool>("dont_check_update_time", false),
|
||||
});
|
||||
|
||||
const auto & settings = global_context->getSettingsRef();
|
||||
if (!mysql_settings.isChanged("connect_timeout"))
|
||||
@ -83,38 +113,42 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
|
||||
if (!mysql_settings.isChanged("read_write_timeout"))
|
||||
mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec;
|
||||
|
||||
pool = std::make_shared<mysqlxx::PoolWithFailover>(createMySQLPoolWithFailover(configuration, mysql_settings));
|
||||
for (const auto & setting : mysql_settings.all())
|
||||
{
|
||||
const auto & setting_name = setting.getName();
|
||||
if (named_collection->has(setting_name))
|
||||
mysql_settings.set(setting_name, named_collection->get<String>(setting_name));
|
||||
}
|
||||
|
||||
pool = std::make_shared<mysqlxx::PoolWithFailover>(
|
||||
createMySQLPoolWithFailover(
|
||||
dictionary_configuration->db,
|
||||
addresses,
|
||||
named_collection->getAnyOrDefault<String>({"user", "username"}, ""),
|
||||
named_collection->getOrDefault<String>("password", ""),
|
||||
mysql_settings));
|
||||
}
|
||||
else
|
||||
{
|
||||
if (created_from_ddl)
|
||||
{
|
||||
for (auto & address : configuration.addresses)
|
||||
global_context->getRemoteHostFilter().checkHostAndPort(address.first, toString(address.second));
|
||||
}
|
||||
dictionary_configuration.emplace(MySQLDictionarySource::Configuration{
|
||||
.db = config.getString(settings_config_prefix + ".db", ""),
|
||||
.table = config.getString(settings_config_prefix + ".table", ""),
|
||||
.query = config.getString(settings_config_prefix + ".query", ""),
|
||||
.where = config.getString(settings_config_prefix + ".where", ""),
|
||||
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
|
||||
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
|
||||
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
|
||||
.dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false)
|
||||
});
|
||||
|
||||
configuration.database = config.getString(settings_config_prefix + ".db", "");
|
||||
configuration.table = config.getString(settings_config_prefix + ".table", "");
|
||||
pool = std::make_shared<mysqlxx::PoolWithFailover>(mysqlxx::PoolFactory::instance().get(config, settings_config_prefix));
|
||||
pool = std::make_shared<mysqlxx::PoolWithFailover>(
|
||||
mysqlxx::PoolFactory::instance().get(config, settings_config_prefix));
|
||||
}
|
||||
|
||||
auto query = config.getString(settings_config_prefix + ".query", "");
|
||||
if (query.empty() && configuration.table.empty())
|
||||
if (dictionary_configuration->query.empty() && dictionary_configuration->table.empty())
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "MySQL dictionary source configuration must contain table or query field");
|
||||
|
||||
MySQLDictionarySource::Configuration dictionary_configuration
|
||||
{
|
||||
.db = configuration.database,
|
||||
.table = configuration.table,
|
||||
.query = query,
|
||||
.where = config.getString(settings_config_prefix + ".where", ""),
|
||||
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
|
||||
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
|
||||
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
|
||||
.dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false)
|
||||
};
|
||||
|
||||
return std::make_unique<MySQLDictionarySource>(dict_struct, dictionary_configuration, std::move(pool), sample_block, mysql_input_stream_settings);
|
||||
return std::make_unique<MySQLDictionarySource>(dict_struct, *dictionary_configuration, std::move(pool), sample_block, mysql_input_stream_settings);
|
||||
#else
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"Dictionary source of type `mysql` is disabled because ClickHouse was built without mysql support.");
|
||||
|
@ -13,12 +13,24 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings)
|
||||
{
|
||||
return createMySQLPoolWithFailover(
|
||||
configuration.database, configuration.addresses,
|
||||
configuration.username, configuration.password, mysql_settings);
|
||||
}
|
||||
|
||||
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(
|
||||
const std::string & database,
|
||||
const StorageMySQL::Configuration::Addresses & addresses,
|
||||
const std::string & username,
|
||||
const std::string & password,
|
||||
const MySQLSettings & mysql_settings)
|
||||
{
|
||||
if (!mysql_settings.connection_pool_size)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size");
|
||||
|
||||
return mysqlxx::PoolWithFailover(
|
||||
configuration.database, configuration.addresses, configuration.username, configuration.password,
|
||||
database, addresses, username, password,
|
||||
MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
|
||||
static_cast<unsigned>(mysql_settings.connection_pool_size),
|
||||
mysql_settings.connection_max_tries,
|
||||
|
@ -10,8 +10,15 @@ namespace mysqlxx { class PoolWithFailover; }
|
||||
namespace DB
|
||||
{
|
||||
|
||||
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings);
|
||||
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(
|
||||
const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings);
|
||||
|
||||
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(
|
||||
const std::string & database,
|
||||
const StorageMySQL::Configuration::Addresses & addresses,
|
||||
const std::string & username,
|
||||
const std::string & password,
|
||||
const MySQLSettings & mysql_settings);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -238,7 +238,7 @@ SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMeta
|
||||
}
|
||||
|
||||
StorageMySQL::Configuration StorageMySQL::processNamedCollectionResult(
|
||||
const NamedCollection & named_collection, MySQLSettings & storage_settings, bool require_table)
|
||||
const NamedCollection & named_collection, MySQLSettings & storage_settings, ContextPtr context_, bool require_table)
|
||||
{
|
||||
StorageMySQL::Configuration configuration;
|
||||
|
||||
@ -255,10 +255,16 @@ StorageMySQL::Configuration StorageMySQL::processNamedCollectionResult(
|
||||
configuration.addresses_expr = named_collection.getOrDefault<String>("addresses_expr", "");
|
||||
if (configuration.addresses_expr.empty())
|
||||
{
|
||||
configuration.host = named_collection.getOrDefault<String>("host", named_collection.getOrDefault<String>("hostname", ""));
|
||||
configuration.host = named_collection.getAnyOrDefault<String>({"host", "hostname"}, "");
|
||||
configuration.port = static_cast<UInt16>(named_collection.get<UInt64>("port"));
|
||||
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t max_addresses = context_->getSettingsRef().glob_expansion_max_elements;
|
||||
configuration.addresses = parseRemoteDescriptionForExternalDatabase(
|
||||
configuration.addresses_expr, max_addresses, 3306);
|
||||
}
|
||||
|
||||
configuration.username = named_collection.getAny<String>({"username", "user"});
|
||||
configuration.password = named_collection.get<String>("password");
|
||||
@ -283,7 +289,7 @@ StorageMySQL::Configuration StorageMySQL::getConfiguration(ASTs engine_args, Con
|
||||
StorageMySQL::Configuration configuration;
|
||||
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context_))
|
||||
{
|
||||
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, storage_settings);
|
||||
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, storage_settings, context_);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -53,6 +53,8 @@ public:
|
||||
|
||||
struct Configuration
|
||||
{
|
||||
using Addresses = std::vector<std::pair<String, UInt16>>;
|
||||
|
||||
String host;
|
||||
UInt16 port = 0;
|
||||
String username = "default";
|
||||
@ -63,14 +65,15 @@ public:
|
||||
bool replace_query = false;
|
||||
String on_duplicate_clause;
|
||||
|
||||
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
|
||||
Addresses addresses; /// Failover replicas.
|
||||
String addresses_expr;
|
||||
};
|
||||
|
||||
static Configuration getConfiguration(ASTs engine_args, ContextPtr context_, MySQLSettings & storage_settings);
|
||||
|
||||
static Configuration processNamedCollectionResult(
|
||||
const NamedCollection & named_collection, MySQLSettings & storage_settings, bool require_table = true);
|
||||
const NamedCollection & named_collection, MySQLSettings & storage_settings,
|
||||
ContextPtr context_, bool require_table = true);
|
||||
|
||||
private:
|
||||
friend class StorageMySQLSink;
|
||||
|
@ -309,6 +309,19 @@ def test_predefined_connection_configuration(started_cluster):
|
||||
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
|
||||
assert int(result) == 200
|
||||
|
||||
instance.query(
|
||||
"""
|
||||
DROP DICTIONARY IF EXISTS dict;
|
||||
CREATE DICTIONARY dict (id UInt32, value UInt32)
|
||||
PRIMARY KEY id
|
||||
SOURCE(MYSQL(NAME mysql4 connection_pool_size 1 close_connection 1 share_connection 1))
|
||||
LIFETIME(MIN 1 MAX 2)
|
||||
LAYOUT(HASHED());
|
||||
"""
|
||||
)
|
||||
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
|
||||
assert int(result) == 200
|
||||
|
||||
|
||||
def create_mysql_db(mysql_connection, name):
|
||||
with mysql_connection.cursor() as cursor:
|
||||
|
Loading…
Reference in New Issue
Block a user