Add proper settings

This commit is contained in:
kssenii 2021-03-27 21:11:10 +00:00
parent ef537b802f
commit c17748a0ba
9 changed files with 33 additions and 28 deletions

View File

@ -82,14 +82,14 @@ PoolWithFailover::PoolWithFailover(
const uint16_t port,
const std::string & user,
const std::string & password,
const size_t max_tries_,
const size_t max_addresses)
const size_t max_addresses,
const size_t max_tries_)
: max_tries(max_tries_)
{
auto hosts = DB::parseRemoteDescription(hosts_pattern, 0, hosts_pattern.size(), '|', max_addresses);
for (const auto & host : hosts)
{
/// Replicas have the same priority, but traversed replicas (with failed connection) are moved to the end of the queue.
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
replicas_by_priority[0].emplace_back(std::make_shared<Pool>(database, host, user, password, port));
LOG_TRACE(&Poco::Logger::get("MySQLPoolWithFailover"), "Adding address {}:{} to MySQL pool", host, port);
}

View File

@ -6,7 +6,7 @@
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS 1
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS 16
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES 3
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES 1
namespace mysqlxx
@ -116,8 +116,8 @@ namespace mysqlxx
uint16_t port,
const std::string & user,
const std::string & password,
const size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
const size_t max_addresses = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES);
const size_t max_addresses = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES,
const size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(const PoolWithFailover & other);

View File

@ -368,6 +368,8 @@ class IColumn;
\
M(UInt64, postgresql_connection_pool_size, 16, "Connection pool size for PostgreSQL table engine and database engine.", 0) \
M(Int64, postgresql_connection_pool_wait_timeout, -1, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \
M(UInt64, storage_external_distributed_max_addresses, 5, "Maximum number of addresses for storage ExternalDistributed.", 0) \
M(UInt64, storage_external_distributed_default_port, 5432, "Default port for storage ExternalDistributed.", 0) \
\
M(Seconds, distributed_replica_error_half_life, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD, "Time period reduces replica error counter by 2 times.", 0) \
M(UInt64, distributed_replica_error_cap, DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT, "Max number of errors per replica, prevents piling up an incredible amount of errors if replica was offline for some time and allows it to be reconsidered in a shorter amount of time.", 0) \

View File

@ -62,6 +62,8 @@ PoolWithFailover::PoolWithFailover(
const uint16_t port,
const std::string & user,
const std::string & password,
size_t pool_size,
int64_t pool_wait_timeout,
const size_t max_tries_,
const size_t max_addresses)
: max_tries(max_tries_)
@ -69,8 +71,9 @@ PoolWithFailover::PoolWithFailover(
auto hosts = DB::parseRemoteDescription(hosts_pattern, 0, hosts_pattern.size(), '|', max_addresses);
for (const auto & host : hosts)
{
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue after each fetch.
replicas_with_priority[0].emplace_back(std::make_shared<ConnectionPool>(database, host, port, user, password));
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
replicas_with_priority[0].emplace_back(
std::make_shared<ConnectionPool>(database, host, port, user, password, pool_size, pool_wait_timeout));
LOG_TRACE(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address {}:{} to pool", host, port);
}
}

View File

@ -14,6 +14,7 @@ class PoolWithFailover
public:
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES = 5;
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
PoolWithFailover(
const Poco::Util::AbstractConfiguration & config,
@ -26,8 +27,10 @@ public:
uint16_t port,
const std::string & user,
const std::string & password,
const size_t max_tries = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
const size_t max_addresses = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES);
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
int64_t pool_wait_timeout = -1,
const size_t max_addresses = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES,
const size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(const PoolWithFailover & other);

View File

@ -45,9 +45,8 @@ StorageExternalDistributed::StorageExternalDistributed(
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
/// TODO: add proper setting
size_t max_addresses = context.getSettingsRef().table_function_remote_max_addresses;
size_t max_addresses = context.getSettingsRef().storage_external_distributed_max_addresses;
UInt16 default_port = context.getSettingsRef().storage_external_distributed_default_port;
/// Split into shards
std::vector<String> shards_descriptions = parseRemoteDescription(cluster_description, 0, cluster_description.size(), ',', max_addresses);
@ -55,9 +54,7 @@ StorageExternalDistributed::StorageExternalDistributed(
for (const auto & shard_description : shards_descriptions)
{
/// Parse shard description like host-{01..02}-{1|2|3}:port, into host_description_pattern (host-01-{1..2}-{1|2|3}) and port
/// TODO: add setting with default port
LOG_TRACE(&Poco::Logger::get("StorageExternalDistributed"), "Adding shard description: {}", shard_description);
auto parsed_shard_description = parseAddress(shard_description, 3306);
auto parsed_shard_description = parseAddress(shard_description, default_port);
StoragePtr shard;
if (engine_name == "MySQL")
@ -66,7 +63,7 @@ StorageExternalDistributed::StorageExternalDistributed(
remote_database,
parsed_shard_description.first,
parsed_shard_description.second,
username, password);
username, password, max_addresses);
shard = StorageMySQL::create(
table_id_,
@ -75,8 +72,7 @@ StorageExternalDistributed::StorageExternalDistributed(
remote_table,
/* replace_query = */ false,
/* on_duplicate_clause = */ "",
columns_,
constraints_,
columns_, constraints_,
context);
}
else if (engine_name == "PostgreSQL")
@ -85,17 +81,16 @@ StorageExternalDistributed::StorageExternalDistributed(
remote_database,
parsed_shard_description.first,
parsed_shard_description.second,
username,
password,
username, password,
context.getSettingsRef().postgresql_connection_pool_size,
context.getSettingsRef().postgresql_connection_pool_wait_timeout);
context.getSettingsRef().postgresql_connection_pool_wait_timeout,
max_addresses);
shard = StoragePostgreSQL::create(
table_id_,
std::move(pool),
remote_table,
columns_,
constraints_,
columns_, constraints_,
context);
}
else

View File

@ -18,8 +18,7 @@ namespace DB
/// Storages MySQL and PostgreSQL use ConnectionPoolWithFailover and support multiple replicas.
/// This class unites multiple storages with replicas into multiple shards with replicas.
/// A query to external database is passed to one replica on each shard, the result is united.
/// Replicas on each shard have the same priority, unavailable replicas are moved to the end of
/// the queue. The queue is shuffled from time to time.
/// Replicas on each shard have the same priority, traversed replicas are moved to the end of the queue.
/// TODO: try `load_balancing` setting for replicas priorities same way as for table function `remote`
class StorageExternalDistributed final : public ext::shared_ptr_helper<StorageExternalDistributed>, public DB::IStorage
{

View File

@ -236,8 +236,9 @@ void registerStorageMySQL(StorageFactory & factory)
const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
size_t max_addresses = args.context.getSettingsRef().storage_external_distributed_max_addresses;
mysqlxx::PoolWithFailover pool(remote_database, parsed_host_port.first, parsed_host_port.second, username, password);
mysqlxx::PoolWithFailover pool(remote_database, parsed_host_port.first, parsed_host_port.second, username, password, max_addresses);
bool replace_query = false;
std::string on_duplicate_clause;

View File

@ -311,6 +311,7 @@ void registerStoragePostgreSQL(StorageFactory & factory)
const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
size_t max_addresses = args.context.getSettingsRef().storage_external_distributed_max_addresses;
String remote_table_schema;
if (engine_args.size() == 6)
@ -323,7 +324,8 @@ void registerStoragePostgreSQL(StorageFactory & factory)
username,
password,
args.context.getSettingsRef().postgresql_connection_pool_size,
args.context.getSettingsRef().postgresql_connection_pool_wait_timeout);
args.context.getSettingsRef().postgresql_connection_pool_wait_timeout,
max_addresses);
return StoragePostgreSQL::create(
args.table_id, pool, remote_table,