From c17748a0ba31136f4364c2f4de1d8ae0a97ec598 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sat, 27 Mar 2021 21:11:10 +0000 Subject: [PATCH] Add proper settings --- base/mysqlxx/PoolWithFailover.cpp | 6 ++--- base/mysqlxx/PoolWithFailover.h | 6 ++--- src/Core/Settings.h | 2 ++ .../PostgreSQL/PostgreSQLPoolWithFailover.cpp | 7 ++++-- .../PostgreSQL/PostgreSQLPoolWithFailover.h | 7 ++++-- src/Storages/StorageExternalDistributed.cpp | 23 ++++++++----------- src/Storages/StorageExternalDistributed.h | 3 +-- src/Storages/StorageMySQL.cpp | 3 ++- src/Storages/StoragePostgreSQL.cpp | 4 +++- 9 files changed, 33 insertions(+), 28 deletions(-) diff --git a/base/mysqlxx/PoolWithFailover.cpp b/base/mysqlxx/PoolWithFailover.cpp index c473058b7d6..deb52047b53 100644 --- a/base/mysqlxx/PoolWithFailover.cpp +++ b/base/mysqlxx/PoolWithFailover.cpp @@ -82,14 +82,14 @@ PoolWithFailover::PoolWithFailover( const uint16_t port, const std::string & user, const std::string & password, - const size_t max_tries_, - const size_t max_addresses) + const size_t max_addresses, + const size_t max_tries_) : max_tries(max_tries_) { auto hosts = DB::parseRemoteDescription(hosts_pattern, 0, hosts_pattern.size(), '|', max_addresses); for (const auto & host : hosts) { - /// Replicas have the same priority, but traversed replicas (with failed connection) are moved to the end of the queue. + /// Replicas have the same priority, but traversed replicas are moved to the end of the queue. replicas_by_priority[0].emplace_back(std::make_shared(database, host, user, password, port)); LOG_TRACE(&Poco::Logger::get("MySQLPoolWithFailover"), "Adding address {}:{} to MySQL pool", host, port); } diff --git a/base/mysqlxx/PoolWithFailover.h b/base/mysqlxx/PoolWithFailover.h index 8338d6dcc4d..4a6dc020156 100644 --- a/base/mysqlxx/PoolWithFailover.h +++ b/base/mysqlxx/PoolWithFailover.h @@ -6,7 +6,7 @@ #define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS 1 #define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS 16 #define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3 -#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES 3 +#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES 1 namespace mysqlxx @@ -116,8 +116,8 @@ namespace mysqlxx uint16_t port, const std::string & user, const std::string & password, - const size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, - const size_t max_addresses = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES); + const size_t max_addresses = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES, + const size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); PoolWithFailover(const PoolWithFailover & other); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 896b56b4d0b..36e98b87615 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -368,6 +368,8 @@ class IColumn; \ M(UInt64, postgresql_connection_pool_size, 16, "Connection pool size for PostgreSQL table engine and database engine.", 0) \ M(Int64, postgresql_connection_pool_wait_timeout, -1, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \ + M(UInt64, storage_external_distributed_max_addresses, 5, "Maximum number of addresses for storage ExternalDistributed.", 0) \ + M(UInt64, storage_external_distributed_default_port, 5432, "Default port for storage ExternalDistributed.", 0) \ \ M(Seconds, distributed_replica_error_half_life, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD, "Time period reduces replica error counter by 2 times.", 0) \ M(UInt64, distributed_replica_error_cap, DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT, "Max number of errors per replica, prevents piling up an incredible amount of errors if replica was offline for some time and allows it to be reconsidered in a shorter amount of time.", 0) \ diff --git a/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.cpp b/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.cpp index 8d453d0773f..7128507e532 100644 --- a/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.cpp @@ -62,6 +62,8 @@ PoolWithFailover::PoolWithFailover( const uint16_t port, const std::string & user, const std::string & password, + size_t pool_size, + int64_t pool_wait_timeout, const size_t max_tries_, const size_t max_addresses) : max_tries(max_tries_) @@ -69,8 +71,9 @@ PoolWithFailover::PoolWithFailover( auto hosts = DB::parseRemoteDescription(hosts_pattern, 0, hosts_pattern.size(), '|', max_addresses); for (const auto & host : hosts) { - /// Replicas have the same priority, but traversed replicas are moved to the end of the queue after each fetch. - replicas_with_priority[0].emplace_back(std::make_shared(database, host, port, user, password)); + /// Replicas have the same priority, but traversed replicas are moved to the end of the queue. + replicas_with_priority[0].emplace_back( + std::make_shared(database, host, port, user, password, pool_size, pool_wait_timeout)); LOG_TRACE(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address {}:{} to pool", host, port); } } diff --git a/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.h b/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.h index 45d2657b34e..ba8a093f845 100644 --- a/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.h +++ b/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.h @@ -14,6 +14,7 @@ class PoolWithFailover public: static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5; static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES = 5; + static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16; PoolWithFailover( const Poco::Util::AbstractConfiguration & config, @@ -26,8 +27,10 @@ public: uint16_t port, const std::string & user, const std::string & password, - const size_t max_tries = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, - const size_t max_addresses = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES); + size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE, + int64_t pool_wait_timeout = -1, + const size_t max_addresses = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES, + const size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); PoolWithFailover(const PoolWithFailover & other); diff --git a/src/Storages/StorageExternalDistributed.cpp b/src/Storages/StorageExternalDistributed.cpp index 0b767764e0a..558f815cc73 100644 --- a/src/Storages/StorageExternalDistributed.cpp +++ b/src/Storages/StorageExternalDistributed.cpp @@ -45,9 +45,8 @@ StorageExternalDistributed::StorageExternalDistributed( storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); - /// TODO: add proper setting - size_t max_addresses = context.getSettingsRef().table_function_remote_max_addresses; - + size_t max_addresses = context.getSettingsRef().storage_external_distributed_max_addresses; + UInt16 default_port = context.getSettingsRef().storage_external_distributed_default_port; /// Split into shards std::vector shards_descriptions = parseRemoteDescription(cluster_description, 0, cluster_description.size(), ',', max_addresses); @@ -55,9 +54,7 @@ StorageExternalDistributed::StorageExternalDistributed( for (const auto & shard_description : shards_descriptions) { /// Parse shard description like host-{01..02}-{1|2|3}:port, into host_description_pattern (host-01-{1..2}-{1|2|3}) and port - /// TODO: add setting with default port - LOG_TRACE(&Poco::Logger::get("StorageExternalDistributed"), "Adding shard description: {}", shard_description); - auto parsed_shard_description = parseAddress(shard_description, 3306); + auto parsed_shard_description = parseAddress(shard_description, default_port); StoragePtr shard; if (engine_name == "MySQL") @@ -66,7 +63,7 @@ StorageExternalDistributed::StorageExternalDistributed( remote_database, parsed_shard_description.first, parsed_shard_description.second, - username, password); + username, password, max_addresses); shard = StorageMySQL::create( table_id_, @@ -75,8 +72,7 @@ StorageExternalDistributed::StorageExternalDistributed( remote_table, /* replace_query = */ false, /* on_duplicate_clause = */ "", - columns_, - constraints_, + columns_, constraints_, context); } else if (engine_name == "PostgreSQL") @@ -85,17 +81,16 @@ StorageExternalDistributed::StorageExternalDistributed( remote_database, parsed_shard_description.first, parsed_shard_description.second, - username, - password, + username, password, context.getSettingsRef().postgresql_connection_pool_size, - context.getSettingsRef().postgresql_connection_pool_wait_timeout); + context.getSettingsRef().postgresql_connection_pool_wait_timeout, + max_addresses); shard = StoragePostgreSQL::create( table_id_, std::move(pool), remote_table, - columns_, - constraints_, + columns_, constraints_, context); } else diff --git a/src/Storages/StorageExternalDistributed.h b/src/Storages/StorageExternalDistributed.h index 7a0f7f3f770..5c8fd114432 100644 --- a/src/Storages/StorageExternalDistributed.h +++ b/src/Storages/StorageExternalDistributed.h @@ -18,8 +18,7 @@ namespace DB /// Storages MySQL and PostgreSQL use ConnectionPoolWithFailover and support multiple replicas. /// This class unites multiple storages with replicas into multiple shards with replicas. /// A query to external database is passed to one replica on each shard, the result is united. -/// Replicas on each shard have the same priority, unavailable replicas are moved to the end of -/// the queue. The queue is shuffled from time to time. +/// Replicas on each shard have the same priority, traversed replicas are moved to the end of the queue. /// TODO: try `load_balancing` setting for replicas priorities same way as for table function `remote` class StorageExternalDistributed final : public ext::shared_ptr_helper, public DB::IStorage { diff --git a/src/Storages/StorageMySQL.cpp b/src/Storages/StorageMySQL.cpp index ecbaf8469ba..b747f5f0960 100644 --- a/src/Storages/StorageMySQL.cpp +++ b/src/Storages/StorageMySQL.cpp @@ -236,8 +236,9 @@ void registerStorageMySQL(StorageFactory & factory) const String & remote_table = engine_args[2]->as().value.safeGet(); const String & username = engine_args[3]->as().value.safeGet(); const String & password = engine_args[4]->as().value.safeGet(); + size_t max_addresses = args.context.getSettingsRef().storage_external_distributed_max_addresses; - mysqlxx::PoolWithFailover pool(remote_database, parsed_host_port.first, parsed_host_port.second, username, password); + mysqlxx::PoolWithFailover pool(remote_database, parsed_host_port.first, parsed_host_port.second, username, password, max_addresses); bool replace_query = false; std::string on_duplicate_clause; diff --git a/src/Storages/StoragePostgreSQL.cpp b/src/Storages/StoragePostgreSQL.cpp index be43319090e..48ef1c35dd9 100644 --- a/src/Storages/StoragePostgreSQL.cpp +++ b/src/Storages/StoragePostgreSQL.cpp @@ -311,6 +311,7 @@ void registerStoragePostgreSQL(StorageFactory & factory) const String & remote_table = engine_args[2]->as().value.safeGet(); const String & username = engine_args[3]->as().value.safeGet(); const String & password = engine_args[4]->as().value.safeGet(); + size_t max_addresses = args.context.getSettingsRef().storage_external_distributed_max_addresses; String remote_table_schema; if (engine_args.size() == 6) @@ -323,7 +324,8 @@ void registerStoragePostgreSQL(StorageFactory & factory) username, password, args.context.getSettingsRef().postgresql_connection_pool_size, - args.context.getSettingsRef().postgresql_connection_pool_wait_timeout); + args.context.getSettingsRef().postgresql_connection_pool_wait_timeout, + max_addresses); return StoragePostgreSQL::create( args.table_id, pool, remote_table,