This commit is contained in:
kssenii 2021-04-01 10:27:24 +00:00
parent f6163bb51d
commit 747967a9bb
15 changed files with 89 additions and 80 deletions

View File

@ -3,6 +3,7 @@
#include <random>
#include <thread>
#include <mysqlxx/PoolWithFailover.h>
#include <common/logger_useful.h>
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
@ -75,17 +76,17 @@ PoolWithFailover::PoolWithFailover(
PoolWithFailover::PoolWithFailover(
const std::string & database,
const std::vector<std::string> & hosts,
uint16_t port,
const RemoteDescription & addresses,
const std::string & user,
const std::string & password,
size_t max_tries_)
: max_tries(max_tries_)
, shareable(false)
{
for (const auto & host : hosts)
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
for (const auto & [host, port] : addresses)
{
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
LOG_DEBUG(&Poco::Logger::get("MySQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port);
replicas_by_priority[0].emplace_back(std::make_shared<Pool>(database, host, user, password, port));
}
}

View File

@ -83,6 +83,7 @@ namespace mysqlxx
public:
using Entry = Pool::Entry;
using RemoteDescription = std::vector<std::pair<std::string, uint16_t>>;
/**
* * Mysql dictionary sourse related params:
@ -111,8 +112,7 @@ namespace mysqlxx
PoolWithFailover(
const std::string & database,
const std::vector<std::string> & hosts,
uint16_t port,
const RemoteDescription & addresses,
const std::string & user,
const std::string & password,
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);

View File

@ -1,6 +1,8 @@
#include "parseRemoteDescription.h"
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
namespace DB
{
@ -167,4 +169,26 @@ std::vector<String> parseRemoteDescription(const String & description, size_t l,
return res;
}
std::vector<std::pair<String, uint16_t>> parseRemoteDescriptionForExternalDatabase(const String & description, size_t max_addresses)
{
auto addresses = parseRemoteDescription(description, 0, description.size(), '|', max_addresses);
std::vector<std::pair<String, uint16_t>> result;
for (const auto & address : addresses)
{
size_t colon = address.find(':');
if (colon == String::npos)
{
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "No port found for host {}", address);
}
else
{
result.emplace_back(std::make_pair(address.substr(0, colon), DB::parseFromString<UInt16>(address.substr(colon + 1))));
}
}
return result;
}
}

View File

@ -17,4 +17,7 @@ namespace DB
*/
std::vector<String> parseRemoteDescription(const String & description, size_t l, size_t r, char separator, size_t max_addresses);
/// Parse remote description for external database (MySQL or PostgreSQL).
std::vector<std::pair<String, uint16_t>> parseRemoteDescriptionForExternalDatabase(const String & description, size_t max_addresses);
}

View File

@ -134,22 +134,20 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
ASTs & arguments = engine->arguments->children;
arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], context);
const auto & host_name_and_port = safeGetLiteralValue<String>(arguments[0], engine_name);
const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name);
const auto & mysql_database_name = safeGetLiteralValue<String>(arguments[1], engine_name);
const auto & mysql_user_name = safeGetLiteralValue<String>(arguments[2], engine_name);
const auto & mysql_user_password = safeGetLiteralValue<String>(arguments[3], engine_name);
try
{
const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306);
if (engine_name == "MySQL")
{
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
/// Split into replicas if needed.
size_t max_addresses = context.getSettingsRef().storage_external_distributed_max_addresses;
auto hosts = parseRemoteDescription(remote_host_name, 0, remote_host_name.size(), '|', max_addresses);
auto mysql_pool = mysqlxx::PoolWithFailover(mysql_database_name, hosts, remote_port, mysql_user_name, mysql_user_password);
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses);
auto mysql_pool = mysqlxx::PoolWithFailover(mysql_database_name, addresses, mysql_user_name, mysql_user_password);
mysql_database_settings->loadFromQueryContext(context);
mysql_database_settings->loadFromQuery(*engine_define); /// higher priority
@ -158,6 +156,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_database_settings), std::move(mysql_pool));
}
const auto & [remote_host_name, remote_port] = parseAddress(host_port, 3306);
MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password);
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password);
@ -249,16 +248,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (engine->arguments->children.size() == 5)
use_table_cache = safeGetLiteralValue<UInt64>(engine_args[4], engine_name);
const auto & [remote_host_name, remote_port] = parseAddress(host_port, 5432);
/// Split into replicas if needed.
size_t max_addresses = context.getSettingsRef().storage_external_distributed_max_addresses;
auto hosts = parseRemoteDescription(remote_host_name, 0, remote_host_name.size(), '|', max_addresses);
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses);
/// no connection is made here
auto connection_pool = std::make_shared<postgres::PoolWithFailover>(
postgres_database_name,
hosts,
remote_port,
addresses,
username, password,
context.getSettingsRef().postgresql_connection_pool_size,
context.getSettingsRef().postgresql_connection_pool_wait_timeout);

View File

@ -2,6 +2,7 @@
#include "PostgreSQLConnection.h"
#include <Common/parseRemoteDescription.h>
#include <Common/Exception.h>
#include <common/logger_useful.h>
namespace DB
@ -57,8 +58,7 @@ PoolWithFailover::PoolWithFailover(
PoolWithFailover::PoolWithFailover(
const std::string & database,
const std::vector<std::string> & hosts,
uint16_t port,
const RemoteDescription & addresses,
const std::string & user,
const std::string & password,
size_t pool_size,
@ -66,11 +66,11 @@ PoolWithFailover::PoolWithFailover(
size_t max_tries_)
: max_tries(max_tries_)
{
for (const auto & host : hosts)
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
for (const auto & [host, port] : addresses)
{
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
replicas_with_priority[0].emplace_back(
std::make_shared<ConnectionPool>(database, host, port, user, password, pool_size, pool_wait_timeout));
LOG_DEBUG(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port);
replicas_with_priority[0].emplace_back(std::make_shared<ConnectionPool>(database, host, port, user, password, pool_size, pool_wait_timeout));
}
}

View File

@ -11,6 +11,8 @@ namespace postgres
class PoolWithFailover
{
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
public:
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
@ -22,8 +24,7 @@ public:
PoolWithFailover(
const std::string & database,
const std::vector<std::string> & hosts,
uint16_t port,
const RemoteDescription & addresses,
const std::string & user,
const std::string & password,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,

View File

@ -45,17 +45,12 @@ StorageExternalDistributed::StorageExternalDistributed(
setInMemoryMetadata(storage_metadata);
size_t max_addresses = context.getSettingsRef().storage_external_distributed_max_addresses;
UInt16 default_port = context.getSettingsRef().storage_external_distributed_default_port;
/// Split into shards
std::vector<String> shards_descriptions = parseRemoteDescription(cluster_description, 0, cluster_description.size(), ',', max_addresses);
/// For each shard pass replicas description into storage, replicas are managed by storage's PoolWithFailover.
for (const auto & shard_description : shards_descriptions)
{
/// Parse shard description like host-{01..02}-{1|2|3}:port, into host_description_pattern (host-01-{1..2}-{1|2|3}) and port
const auto & [remote_host_name, remote_port] = parseAddress(shard_description, default_port);
auto hosts = parseRemoteDescription(remote_host_name, 0, remote_host_name.size(), '|', max_addresses);
auto addresses = parseRemoteDescriptionForExternalDatabase(shard_description, max_addresses);
StoragePtr shard;
switch (table_engine)
@ -66,8 +61,7 @@ StorageExternalDistributed::StorageExternalDistributed(
{
mysqlxx::PoolWithFailover pool(
remote_database,
hosts,
remote_port,
addresses,
username, password);
shard = StorageMySQL::create(
@ -88,8 +82,7 @@ StorageExternalDistributed::StorageExternalDistributed(
{
postgres::PoolWithFailover pool(
remote_database,
hosts,
remote_port,
addresses,
username, password,
context.getSettingsRef().postgresql_connection_pool_size,
context.getSettingsRef().postgresql_connection_pool_wait_timeout);

View File

@ -238,9 +238,8 @@ void registerStorageMySQL(StorageFactory & factory)
const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
size_t max_addresses = args.context.getSettingsRef().storage_external_distributed_max_addresses;
const auto & [remote_host_name, remote_port] = parseAddress(host_port, 3306);
auto hosts = parseRemoteDescription(remote_host_name, 0, remote_host_name.size(), '|', max_addresses);
mysqlxx::PoolWithFailover pool(remote_database, hosts, remote_port, username, password);
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses);
mysqlxx::PoolWithFailover pool(remote_database, addresses, username, password);
bool replace_query = false;
std::string on_duplicate_clause;

View File

@ -307,25 +307,22 @@ void registerStoragePostgreSQL(StorageFactory & factory)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.local_context);
auto host_port = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
/// Split into replicas if needed.
size_t max_addresses = args.context.getSettingsRef().storage_external_distributed_max_addresses;
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses);
const String & remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
size_t max_addresses = args.context.getSettingsRef().storage_external_distributed_max_addresses;
String remote_table_schema;
if (engine_args.size() == 6)
remote_table_schema = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>();
/// Split into replicas if needed.
const auto & [remote_host_name, remote_port] = parseAddress(host_port, 5432);
auto hosts = parseRemoteDescription(remote_host_name, 0, remote_host_name.size(), '|', max_addresses);
postgres::PoolWithFailover pool(
remote_database,
hosts,
remote_port,
addresses,
username,
password,
args.context.getSettingsRef().postgresql_connection_pool_size,

View File

@ -60,11 +60,10 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, const Conte
user_name = args[3]->as<ASTLiteral &>().value.safeGet<String>();
password = args[4]->as<ASTLiteral &>().value.safeGet<String>();
const auto & [remote_host_name, remote_port] = parseAddress(host_port, 3306);
/// Split into replicas if needed.
/// Split into replicas if needed. 3306 is the default MySQL port number
size_t max_addresses = context.getSettingsRef().storage_external_distributed_max_addresses;
auto hosts = parseRemoteDescription(remote_host_name, 0, remote_host_name.size(), '|', max_addresses);
pool.emplace(remote_database_name, hosts, remote_port, user_name, password);
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses);
pool.emplace(remote_database_name, addresses, user_name, password);
if (args.size() >= 6)
replace_query = args[5]->as<ASTLiteral &>().value.safeGet<UInt64>() > 0;
@ -75,9 +74,6 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, const Conte
throw Exception(
"Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them",
ErrorCodes::BAD_ARGUMENTS);
/// 3306 is the default MySQL port number
parsed_host_port = parseAddress(host_port, 3306);
}
ColumnsDescription TableFunctionMySQL::getActualTableStructure(const Context & context) const

View File

@ -30,7 +30,6 @@ private:
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
std::pair<std::string, UInt16> parsed_host_port;
String remote_database_name;
String remote_table_name;
String user_name;

View File

@ -68,11 +68,10 @@ void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, const
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
const auto & host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>();
const auto & [remote_host_name, remote_port] = parseAddress(host_port, 5432);
/// Split into replicas if needed.
const auto & host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>();
size_t max_addresses = context.getSettingsRef().storage_external_distributed_max_addresses;
auto hosts = parseRemoteDescription(remote_host_name, 0, remote_host_name.size(), '|', max_addresses);
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses);
remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
@ -81,8 +80,7 @@ void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, const
connection_pool = std::make_shared<postgres::PoolWithFailover>(
args[1]->as<ASTLiteral &>().value.safeGet<String>(),
hosts,
remote_port,
addresses,
args[3]->as<ASTLiteral &>().value.safeGet<String>(),
args[4]->as<ASTLiteral &>().value.safeGet<String>());
}

View File

@ -220,6 +220,12 @@ def test_mysql_distributed(started_cluster):
ENGINE = MySQL(`mysql{}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');'''.format(i, i))
nodes[i-1].query("INSERT INTO test_replica{} (id, name) SELECT number, 'host{}' from numbers(10) ".format(i, i))
# test multiple ports parsing
result = node2.query('''SELECT DISTINCT(name) FROM mysql(`mysql{1|2|3}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse'); ''')
assert(result == 'host1\n' or result == 'host2\n' or result == 'host3\n')
result = node2.query('''SELECT DISTINCT(name) FROM mysql(`mysql1:3306|mysql2:3306|mysql3:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse'); ''')
assert(result == 'host1\n' or result == 'host2\n' or result == 'host3\n')
# check all replicas are traversed
query = "SELECT * FROM ("
for i in range (3):
@ -229,32 +235,27 @@ def test_mysql_distributed(started_cluster):
result = node2.query(query)
assert(result == 'host2\nhost3\nhost4\n')
# test table function (all replicas are invalid except for one)
result = node2.query('''SELECT DISTINCT(name) FROM mysql(`mysql{9|3}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse'); ''')
assert(result == 'host3\n')
# Storage with with two shards, each has 2 replicas
node1.query('''
node2.query('''
CREATE TABLE test_shards
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = ExternalDistributed('MySQL', `mysql{1|2|3}:3306,mysql{4|5}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse'); ''')
ENGINE = ExternalDistributed('MySQL', `mysql{1|2}:3306,mysql{3|4}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse'); ''')
# Check only one replica in each shard is used
result = node1.query("SELECT DISTINCT(name) FROM test_shards ORDER BY name")
assert(result == 'host1\nhost4\n')
result = node2.query("SELECT DISTINCT(name) FROM test_shards ORDER BY name")
assert(result == 'host1\nhost3\n')
# check all replicas are traversed
query = "SELECT name FROM ("
for i in range (3):
query += "SELECT name FROM test_shards UNION DISTINCT "
query += "SELECT name FROM test_shards) ORDER BY name"
result = node1.query(query)
result = node2.query(query)
assert(result == 'host1\nhost2\nhost3\nhost4\n')
# disconnect mysql1
started_cluster.pause_container('mysql1')
result = node1.query("SELECT DISTINCT(name) FROM test_shards ORDER BY name")
result = node2.query("SELECT DISTINCT(name) FROM test_shards ORDER BY name")
assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n')

View File

@ -248,45 +248,45 @@ def test_postgres_distributed(started_cluster):
cursors[i].execute('CREATE TABLE test_replicas (id Integer, name Text)')
cursors[i].execute("""INSERT INTO test_replicas select i, 'host{}' from generate_series(0, 99) as t(i);""".format(i + 1));
# Storage with with 3 replicas
# test multiple ports parsing
result = node2.query('''SELECT DISTINCT(name) FROM postgresql(`postgres{1|2|3}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
assert(result == 'host1\n' or result == 'host2\n' or result == 'host3\n')
result = node2.query('''SELECT DISTINCT(name) FROM postgresql(`postgres2:5431|postgres3:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
assert(result == 'host3\n' or result == 'host2\n')
# Create storage with with 3 replicas
node2.query('''
CREATE TABLE test_replicas
(id UInt32, name String)
ENGINE = PostgreSQL(`postgres{2|3|4}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
# check all replicas are traversed
# Check all replicas are traversed
query = "SELECT name FROM ("
for i in range (3):
query += "SELECT name FROM test_replicas UNION DISTINCT "
query += "SELECT name FROM test_replicas) ORDER BY name"
result = node2.query(query)
assert(result == 'host2\nhost3\nhost4\n')
# test table function (all replicas are invalid except for one)
result = node2.query('''SELECT DISTINCT(name) FROM postgresql(`postgres{9|3}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
assert(result == 'host3\n')
# Storage with with two two shards, each has 2 replicas
# Create storage with with two two shards, each has 2 replicas
node2.query('''
CREATE TABLE test_shards
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = ExternalDistributed('PostgreSQL', `postgres{1|2|3}:5432,postgres{4|5}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
ENGINE = ExternalDistributed('PostgreSQL', `postgres{1|2}:5432,postgres{3|4}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
# Check only one replica in each shard is used
result = node2.query("SELECT DISTINCT(name) FROM test_shards ORDER BY name")
assert(result == 'host1\nhost4\n')
assert(result == 'host1\nhost3\n')
# check all replicas are traversed
# Check all replicas are traversed
query = "SELECT name FROM ("
for i in range (3):
query += "SELECT name FROM test_shards UNION DISTINCT "
query += "SELECT name FROM test_shards) ORDER BY name"
result = node2.query(query)
assert(result == 'host1\nhost2\nhost3\nhost4\n')
# disconnect postgres1
# Disconnect postgres1
started_cluster.pause_container('postgres1')
result = node2.query("SELECT DISTINCT(name) FROM test_shards ORDER BY name")
assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n')