Merge branch 'master' into process-relative-cache-paths-better

This commit is contained in:
Kseniia Sumarokova 2023-04-17 13:28:39 +02:00 committed by GitHub
commit ef4316e1ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 175 additions and 44 deletions

View File

@ -393,7 +393,11 @@ else()
endif ()
option (ENABLE_GWP_ASAN "Enable Gwp-Asan" ON)
if (NOT OS_LINUX AND NOT OS_ANDROID)
# We use mmap for allocations more heavily in debug builds,
# but GWP-ASan also wants to use mmap frequently,
# and due to a large number of memory mappings,
# it does not work together well.
if ((NOT OS_LINUX AND NOT OS_ANDROID) OR (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG"))
set(ENABLE_GWP_ASAN OFF)
endif ()

View File

@ -188,7 +188,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, context))
{
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, false);
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, context, false);
}
else
{

View File

@ -21,6 +21,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/LocalDateTime.h>
#include <Common/parseRemoteDescription.h>
#include <Common/logger_useful.h>
#include "readInvalidateQuery.h"
@ -37,7 +38,7 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
static const ValidateKeysMultiset<ExternalDatabaseEqualKeysSet> dictionary_allowed_keys = {
"host", "port", "user", "password",
"db", "database", "table", "schema",
"update_field", "invalidate_query", "priority",
@ -69,13 +70,42 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
std::shared_ptr<mysqlxx::PoolWithFailover> pool;
MySQLSettings mysql_settings;
StorageMySQL::Configuration configuration;
std::optional<MySQLDictionarySource::Configuration> dictionary_configuration;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix) : nullptr;
if (named_collection)
{
named_collection->remove("name");
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, mysql_settings);
global_context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
auto allowed_arguments{dictionary_allowed_keys};
for (const auto & setting : mysql_settings.all())
allowed_arguments.insert(setting.getName());
validateNamedCollection<ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>>(*named_collection, {}, allowed_arguments);
StorageMySQL::Configuration::Addresses addresses;
const auto addresses_expr = named_collection->getOrDefault<String>("addresses_expr", "");
if (addresses_expr.empty())
{
const auto host = named_collection->getAnyOrDefault<String>({"host", "hostname"}, "");
const auto port = static_cast<UInt16>(named_collection->get<UInt64>("port"));
addresses = {std::make_pair(host, port)};
}
else
{
size_t max_addresses = global_context->getSettingsRef().glob_expansion_max_elements;
addresses = parseRemoteDescriptionForExternalDatabase(addresses_expr, max_addresses, 3306);
}
for (auto & address : addresses)
global_context->getRemoteHostFilter().checkHostAndPort(address.first, toString(address.second));
dictionary_configuration.emplace(MySQLDictionarySource::Configuration{
.db = named_collection->getAnyOrDefault<String>({"database", "db"}, ""),
.table = named_collection->getOrDefault<String>("table", ""),
.query = named_collection->getOrDefault<String>("query", ""),
.where = named_collection->getOrDefault<String>("where", ""),
.invalidate_query = named_collection->getOrDefault<String>("invalidate_query", ""),
.update_field = named_collection->getOrDefault<String>("update_field", ""),
.update_lag = named_collection->getOrDefault<UInt64>("update_lag", 1),
.dont_check_update_time = named_collection->getOrDefault<bool>("dont_check_update_time", false),
});
const auto & settings = global_context->getSettingsRef();
if (!mysql_settings.isChanged("connect_timeout"))
@ -83,38 +113,42 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
if (!mysql_settings.isChanged("read_write_timeout"))
mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec;
pool = std::make_shared<mysqlxx::PoolWithFailover>(createMySQLPoolWithFailover(configuration, mysql_settings));
for (const auto & setting : mysql_settings.all())
{
const auto & setting_name = setting.getName();
if (named_collection->has(setting_name))
mysql_settings.set(setting_name, named_collection->get<String>(setting_name));
}
pool = std::make_shared<mysqlxx::PoolWithFailover>(
createMySQLPoolWithFailover(
dictionary_configuration->db,
addresses,
named_collection->getAnyOrDefault<String>({"user", "username"}, ""),
named_collection->getOrDefault<String>("password", ""),
mysql_settings));
}
else
{
if (created_from_ddl)
{
for (auto & address : configuration.addresses)
global_context->getRemoteHostFilter().checkHostAndPort(address.first, toString(address.second));
}
dictionary_configuration.emplace(MySQLDictionarySource::Configuration{
.db = config.getString(settings_config_prefix + ".db", ""),
.table = config.getString(settings_config_prefix + ".table", ""),
.query = config.getString(settings_config_prefix + ".query", ""),
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false)
});
configuration.database = config.getString(settings_config_prefix + ".db", "");
configuration.table = config.getString(settings_config_prefix + ".table", "");
pool = std::make_shared<mysqlxx::PoolWithFailover>(mysqlxx::PoolFactory::instance().get(config, settings_config_prefix));
pool = std::make_shared<mysqlxx::PoolWithFailover>(
mysqlxx::PoolFactory::instance().get(config, settings_config_prefix));
}
auto query = config.getString(settings_config_prefix + ".query", "");
if (query.empty() && configuration.table.empty())
if (dictionary_configuration->query.empty() && dictionary_configuration->table.empty())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "MySQL dictionary source configuration must contain table or query field");
MySQLDictionarySource::Configuration dictionary_configuration
{
.db = configuration.database,
.table = configuration.table,
.query = query,
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false)
};
return std::make_unique<MySQLDictionarySource>(dict_struct, dictionary_configuration, std::move(pool), sample_block, mysql_input_stream_settings);
return std::make_unique<MySQLDictionarySource>(dict_struct, *dictionary_configuration, std::move(pool), sample_block, mysql_input_stream_settings);
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Dictionary source of type `mysql` is disabled because ClickHouse was built without mysql support.");

View File

@ -270,7 +270,6 @@ void AWSInstanceProfileCredentialsProvider::Reload()
void AWSInstanceProfileCredentialsProvider::refreshIfExpired()
{
LOG_DEBUG(logger, "Checking if latest credential pull has expired.");
Aws::Utils::Threading::ReaderLockGuard guard(m_reloadLock);
if (!IsTimeToRefresh(load_frequency_ms))
{

View File

@ -13,12 +13,24 @@ namespace ErrorCodes
}
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings)
{
return createMySQLPoolWithFailover(
configuration.database, configuration.addresses,
configuration.username, configuration.password, mysql_settings);
}
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(
const std::string & database,
const StorageMySQL::Configuration::Addresses & addresses,
const std::string & username,
const std::string & password,
const MySQLSettings & mysql_settings)
{
if (!mysql_settings.connection_pool_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size");
return mysqlxx::PoolWithFailover(
configuration.database, configuration.addresses, configuration.username, configuration.password,
database, addresses, username, password,
MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
static_cast<unsigned>(mysql_settings.connection_pool_size),
mysql_settings.connection_max_tries,

View File

@ -10,8 +10,15 @@ namespace mysqlxx { class PoolWithFailover; }
namespace DB
{
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings);
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(
const StorageMySQL::Configuration & configuration, const MySQLSettings & mysql_settings);
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(
const std::string & database,
const StorageMySQL::Configuration::Addresses & addresses,
const std::string & username,
const std::string & password,
const MySQLSettings & mysql_settings);
}
#endif

View File

@ -238,7 +238,7 @@ SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMeta
}
StorageMySQL::Configuration StorageMySQL::processNamedCollectionResult(
const NamedCollection & named_collection, MySQLSettings & storage_settings, bool require_table)
const NamedCollection & named_collection, MySQLSettings & storage_settings, ContextPtr context_, bool require_table)
{
StorageMySQL::Configuration configuration;
@ -255,10 +255,16 @@ StorageMySQL::Configuration StorageMySQL::processNamedCollectionResult(
configuration.addresses_expr = named_collection.getOrDefault<String>("addresses_expr", "");
if (configuration.addresses_expr.empty())
{
configuration.host = named_collection.getOrDefault<String>("host", named_collection.getOrDefault<String>("hostname", ""));
configuration.host = named_collection.getAnyOrDefault<String>({"host", "hostname"}, "");
configuration.port = static_cast<UInt16>(named_collection.get<UInt64>("port"));
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
}
else
{
size_t max_addresses = context_->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(
configuration.addresses_expr, max_addresses, 3306);
}
configuration.username = named_collection.getAny<String>({"username", "user"});
configuration.password = named_collection.get<String>("password");
@ -283,7 +289,7 @@ StorageMySQL::Configuration StorageMySQL::getConfiguration(ASTs engine_args, Con
StorageMySQL::Configuration configuration;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context_))
{
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, storage_settings);
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, storage_settings, context_);
}
else
{

View File

@ -53,6 +53,8 @@ public:
struct Configuration
{
using Addresses = std::vector<std::pair<String, UInt16>>;
String host;
UInt16 port = 0;
String username = "default";
@ -63,14 +65,15 @@ public:
bool replace_query = false;
String on_duplicate_clause;
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
Addresses addresses; /// Failover replicas.
String addresses_expr;
};
static Configuration getConfiguration(ASTs engine_args, ContextPtr context_, MySQLSettings & storage_settings);
static Configuration processNamedCollectionResult(
const NamedCollection & named_collection, MySQLSettings & storage_settings, bool require_table = true);
const NamedCollection & named_collection, MySQLSettings & storage_settings,
ContextPtr context_, bool require_table = true);
private:
friend class StorageMySQLSink;

View File

@ -1206,10 +1206,27 @@ def test_backup_all(exclude_system_log_tables):
exclude_from_backup = []
if exclude_system_log_tables:
system_log_tables = instance.query(
"SELECT concat('system.', table) FROM system.tables WHERE (database = 'system') AND (table LIKE '%_log')"
).splitlines()
exclude_from_backup += system_log_tables
# See the list of log tables in src/Interpreters/SystemLog.cpp
log_tables = [
"query_log",
"query_thread_log",
"part_log",
"trace_log",
"crash_log",
"text_log",
"metric_log",
"filesystem_cache_log",
"filesystem_read_prefetches_log",
"asynchronous_metric_log",
"opentelemetry_span_log",
"query_views_log",
"zookeeper_log",
"session_log",
"transactions_info_log",
"processors_profile_log",
"asynchronous_insert_log",
]
exclude_from_backup += ["system." + table_name for table_name in log_tables]
backup_command = f"BACKUP ALL {'EXCEPT TABLES ' + ','.join(exclude_from_backup) if exclude_from_backup else ''} TO {backup_name}"

View File

@ -309,6 +309,19 @@ def test_predefined_connection_configuration(started_cluster):
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
assert int(result) == 200
instance.query(
"""
DROP DICTIONARY IF EXISTS dict;
CREATE DICTIONARY dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(MYSQL(NAME mysql4 connection_pool_size 1 close_connection 1 share_connection 1))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
"""
)
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
assert int(result) == 200
def create_mysql_db(mysql_connection, name):
with mysql_connection.cursor() as cursor:

View File

@ -18,7 +18,7 @@ CREATE TABLE dst(count UInt64) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst AS SELECT count(a) AS count FROM mt GROUP BY tumble(timestamp, INTERVAL '5' SECOND, 'US/Samoa') AS wid;
INSERT INTO mt VALUES (1, now('US/Samoa') + 1);
INSERT INTO mt VALUES (1, now('US/Samoa') + 5);
EOF
for _ in {1..100}; do

View File

@ -0,0 +1,3 @@
\N
1 1
1

View File

@ -0,0 +1,33 @@
SELECT argMaxOrNull(id, timestamp)
FROM
(
SELECT
CAST(NULL, 'Nullable(UInt32)') AS id,
2 AS timestamp
);
SELECT
argMax(id, timestamp),
argMaxOrNull(id, timestamp)
FROM
(
SELECT
CAST(NULL, 'Nullable(UInt32)') AS id,
2 AS timestamp
UNION ALL
SELECT
1 AS id,
1 AS timestamp
);
SELECT argMaxIfOrNull(id, timestamp, id IS NOT NULL)
FROM
(
SELECT
CAST(NULL, 'Nullable(UInt32)') AS id,
2 AS timestamp
UNION ALL
SELECT
1 AS id,
1 AS timestamp
);