Use pool with failover in mysql storage

This commit is contained in:
kssenii 2021-03-24 18:15:31 +00:00
parent ced6d8e6bd
commit ae868208c2
18 changed files with 273 additions and 153 deletions

View File

@ -34,6 +34,7 @@ endif ()
target_link_libraries (mysqlxx
PUBLIC
common
dbms
PRIVATE
${MYSQLCLIENT_LIBRARIES}
${ZLIB_LIBRARIES}

View File

@ -4,20 +4,19 @@
#include <thread>
#include <mysqlxx/PoolWithFailover.h>
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
static bool startsWith(const std::string & s, const char * prefix)
{
return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
}
#include <Common/parseRemoteDescription.h>
#include <IO/ReadHelpers.h>
#include <common/logger_useful.h>
using namespace mysqlxx;
PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & config_,
const std::string & config_name_, const unsigned default_connections_,
const unsigned max_connections_, const size_t max_tries_)
PoolWithFailover::PoolWithFailover(
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_name_,
const unsigned default_connections_,
const unsigned max_connections_,
const size_t max_tries_)
: max_tries(max_tries_)
{
shareable = config_.getBool(config_name_ + ".share_connection", false);
@ -59,16 +58,41 @@ PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & con
}
}
PoolWithFailover::PoolWithFailover(const std::string & config_name_, const unsigned default_connections_,
const unsigned max_connections_, const size_t max_tries_)
: PoolWithFailover{
Poco::Util::Application::instance().config(), config_name_,
default_connections_, max_connections_, max_tries_}
PoolWithFailover::PoolWithFailover(
const std::string & config_name_,
const unsigned default_connections_,
const unsigned max_connections_,
const size_t max_tries_)
: PoolWithFailover{Poco::Util::Application::instance().config(),
config_name_, default_connections_, max_connections_, max_tries_}
{
}
PoolWithFailover::PoolWithFailover(
const std::string & database,
const std::string & hosts_pattern,
const uint16_t port,
const std::string & user,
const std::string & password,
const size_t max_tries_,
const size_t max_addresses)
: max_tries(max_tries_)
{
auto hosts = DB::parseRemoteDescription(hosts_pattern, 0, hosts_pattern.size(), '|', max_addresses);
for (const auto & host : hosts)
{
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue after each fetch.
replicas_by_priority[0].emplace_back(std::make_shared<Pool>(database, host, user, password, port));
LOG_TRACE(&Poco::Logger::get("MySQLPoolWithFailover"), "Adding address {}:{} to pool", host, port);
}
}
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
: max_tries{other.max_tries}, shareable{other.shareable}
: max_tries{other.max_tries}
, shareable{other.shareable}
{
if (shareable)
{

View File

@ -6,11 +6,14 @@
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS 1
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS 16
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES 3
namespace mysqlxx
{
/** MySQL connection pool with support of failover.
*
* For dictionary source:
* Have information about replicas and their priorities.
* Tries to connect to replica in an order of priority. When equal priority, choose replica with maximum time without connections.
*
@ -68,16 +71,17 @@ namespace mysqlxx
using PoolPtr = std::shared_ptr<Pool>;
using Replicas = std::vector<PoolPtr>;
/// [priority][index] -> replica.
/// Mysql dictionary sourse related priotity:
/// [priority][index] -> replica. Highest priotity is 0.
using ReplicasByPriority = std::map<int, Replicas>;
ReplicasByPriority replicas_by_priority;
/// Mysql storage related priotity:
/// Number of connection tries.
size_t max_tries;
/// Mutex for set of replicas.
std::mutex mutex;
/// Can the Pool be shared
bool shareable;
@ -85,22 +89,39 @@ namespace mysqlxx
using Entry = Pool::Entry;
/**
* config_name Name of parameter in configuration file.
* * Mysql dictionary sourse related params:
* config_name Name of parameter in configuration file for dictionary source.
*
* * Mysql storage related parameters:
* replicas_description
*
* * Mutual parameters:
* default_connections Number of connection in pool to each replica at start.
* max_connections Maximum number of connections in pool to each replica.
* max_tries_ Max number of connection tries.
*/
PoolWithFailover(const std::string & config_name_,
unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(const Poco::Util::AbstractConfiguration & config_,
PoolWithFailover(
const std::string & config_name_,
unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_name_,
unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(
const std::string & database,
const std::string & host_pattern,
uint16_t port,
const std::string & user,
const std::string & password,
const size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
const size_t max_addresses = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES);
PoolWithFailover(const PoolWithFailover & other);
/** Allocates a connection to use. */

View File

@ -8,3 +8,10 @@ services:
ports:
- 3308:3306
command: --server_id=100 --log-bin='mysql-bin-1.log' --default-time-zone='+3:00' --gtid-mode="ON" --enforce-gtid-consistency
mysql2:
image: mysql:5.7
restart: always
environment:
MYSQL_ROOT_PASSWORD: clickhouse
ports:
- 3388:3306

View File

@ -141,11 +141,11 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
try
{
const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306);
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port);
if (engine_name == "MySQL")
{
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
auto mysql_pool = mysqlxx::PoolWithFailover(mysql_database_name, remote_host_name, remote_port, mysql_user_name, mysql_user_password);
mysql_database_settings->loadFromQueryContext(context);
mysql_database_settings->loadFromQuery(*engine_define); /// higher priority
@ -155,6 +155,8 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
}
MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password);
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password);
auto materialize_mode_settings = std::make_unique<MaterializeMySQLSettings>();

View File

@ -45,8 +45,14 @@ constexpr static const auto suffix = ".remove_flag";
static constexpr const std::chrono::seconds cleaner_sleep_time{30};
static const std::chrono::seconds lock_acquire_timeout{10};
DatabaseConnectionMySQL::DatabaseConnectionMySQL(const Context & context, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & database_name_in_mysql_, std::unique_ptr<ConnectionMySQLSettings> settings_, mysqlxx::Pool && pool)
DatabaseConnectionMySQL::DatabaseConnectionMySQL(
const Context & context,
const String & database_name_,
const String & metadata_path_,
const ASTStorage * database_engine_define_,
const String & database_name_in_mysql_,
std::unique_ptr<ConnectionMySQLSettings> settings_,
mysqlxx::PoolWithFailover && pool)
: IDatabase(database_name_)
, global_context(context.getGlobalContext())
, metadata_path(metadata_path_)

View File

@ -10,6 +10,7 @@
#include <Databases/DatabasesCommon.h>
#include <Databases/MySQL/ConnectionMySQLSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <mysqlxx/PoolWithFailover.h>
#include <atomic>
#include <condition_variable>
@ -36,9 +37,13 @@ public:
~DatabaseConnectionMySQL() override;
DatabaseConnectionMySQL(
const Context & context, const String & database_name, const String & metadata_path,
const ASTStorage * database_engine_define, const String & database_name_in_mysql, std::unique_ptr<ConnectionMySQLSettings> settings_,
mysqlxx::Pool && pool);
const Context & context,
const String & database_name,
const String & metadata_path,
const ASTStorage * database_engine_define,
const String & database_name_in_mysql,
std::unique_ptr<ConnectionMySQLSettings> settings_,
mysqlxx::PoolWithFailover && pool);
String getEngineName() const override { return "MySQL"; }
@ -91,7 +96,7 @@ private:
std::atomic<bool> quit{false};
std::condition_variable cond;
using MySQLPool = mysqlxx::Pool;
using MySQLPool = mysqlxx::PoolWithFailover;
using ModifyTimeAndStorage = std::pair<UInt64, StoragePtr>;
mutable MySQLPool mysql_pool;

View File

@ -41,7 +41,7 @@ namespace DB
{
std::map<String, NamesAndTypesList> fetchTablesColumnsList(
mysqlxx::Pool & pool,
mysqlxx::PoolWithFailover & pool,
const String & database_name,
const std::vector<String> & tables_name,
bool external_table_functions_use_nulls,

View File

@ -3,7 +3,7 @@
#include "config_core.h"
#if USE_MYSQL
#include <mysqlxx/Pool.h>
#include <mysqlxx/PoolWithFailover.h>
#include <common/types.h>
#include <Core/MultiEnum.h>
@ -17,7 +17,7 @@ namespace DB
{
std::map<String, NamesAndTypesList> fetchTablesColumnsList(
mysqlxx::Pool & pool,
mysqlxx::PoolWithFailover & pool,
const String & database_name,
const std::vector<String> & tables_name,
bool external_table_functions_use_nulls,

View File

@ -119,32 +119,10 @@ std::string MySQLDictionarySource::getUpdateFieldAndDate()
}
}
BlockInputStreamPtr MySQLDictionarySource::retriedCreateMySqlBIStream(const std::string & data_fetch_query_str, const size_t max_tries)
BlockInputStreamPtr MySQLDictionarySource::loadBase(const String & query)
{
size_t count_connection_lost = 0;
while (true)
{
auto connection = pool.get();
try
{
return std::make_shared<MySQLBlockInputStream>(
connection, data_fetch_query_str, sample_block, max_block_size, close_connection);
}
catch (const mysqlxx::ConnectionLost & ecl) /// There are two retriable failures: CR_SERVER_GONE_ERROR, CR_SERVER_LOST
{
if (++count_connection_lost < max_tries)
{
LOG_WARNING(log, ecl.displayText());
LOG_WARNING(log, "Lost connection ({}/{}). Trying to reconnect...", count_connection_lost, max_tries);
continue;
}
LOG_ERROR(log, "Failed ({}/{}) to create BlockInputStream for MySQL dictionary source.", count_connection_lost, max_tries);
throw;
}
}
return std::make_shared<MySQLWithFailoverBlockInputStream>(
pool, query, sample_block, max_block_size, close_connection, false, max_tries_for_mysql_block_input_stream);
}
BlockInputStreamPtr MySQLDictionarySource::loadAll()
@ -153,7 +131,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadAll()
last_modification = getLastModification(connection, false);
LOG_TRACE(log, load_all_query);
return retriedCreateMySqlBIStream(load_all_query, max_tries_for_mysql_block_input_stream);
return loadBase(load_all_query);
}
BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
@ -163,23 +141,21 @@ BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
std::string load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query);
return retriedCreateMySqlBIStream(load_update_query, max_tries_for_mysql_block_input_stream);
return loadBase(load_update_query);
}
BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
/// We do not log in here and do not update the modification time, as the request can be large, and often called.
const auto query = query_builder.composeLoadIdsQuery(ids);
return retriedCreateMySqlBIStream(query, max_tries_for_mysql_block_input_stream);
return loadBase(query);
}
BlockInputStreamPtr MySQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
/// We do not log in here and do not update the modification time, as the request can be large, and often called.
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return retriedCreateMySqlBIStream(query, max_tries_for_mysql_block_input_stream);
return loadBase(query);
}
bool MySQLDictionarySource::isModified() const

View File

@ -34,7 +34,7 @@ public:
MySQLDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const String & config_prefix,
const Block & sample_block_);
/// copy-constructor is provided in order to support cloneability
@ -49,6 +49,8 @@ public:
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
BlockInputStreamPtr loadBase(const String & query);
bool isModified() const override;
bool supportsSelectiveLoad() const override;
@ -69,9 +71,6 @@ private:
// execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const;
/// A helper method for recovering from "Lost connection to MySQL server during query" errors
BlockInputStreamPtr retriedCreateMySqlBIStream(const std::string & query_str, const size_t max_tries);
Poco::Logger * log;
std::chrono::time_point<std::chrono::system_clock> update_time;

View File

@ -1,26 +1,29 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#include "config_core.h"
#endif
#if USE_MYSQL
# include <vector>
# include <Columns/ColumnNullable.h>
# include <Columns/ColumnString.h>
# include <Columns/ColumnsNumber.h>
# include <Columns/ColumnDecimal.h>
# include <Columns/ColumnFixedString.h>
# include <DataTypes/IDataType.h>
# include <DataTypes/DataTypeNullable.h>
# include <IO/ReadBufferFromString.h>
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <IO/Operators.h>
# include <Common/assert_cast.h>
# include <ext/range.h>
# include "MySQLBlockInputStream.h"
#include <vector>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <Common/assert_cast.h>
#include <ext/range.h>
#include <common/logger_useful.h>
#include "MySQLBlockInputStream.h"
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
@ -36,6 +39,7 @@ MySQLBlockInputStream::Connection::Connection(
{
}
/// Used in MaterializeMySQL and in doInvalidateQuery for dictionary source.
MySQLBlockInputStream::MySQLBlockInputStream(
const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str,
@ -43,7 +47,8 @@ MySQLBlockInputStream::MySQLBlockInputStream(
const UInt64 max_block_size_,
const bool auto_close_,
const bool fetch_by_name_)
: connection{std::make_unique<Connection>(entry, query_str)}
: log(&Poco::Logger::get("MySQLBlockInputStream"))
, connection{std::make_unique<Connection>(entry, query_str)}
, max_block_size{max_block_size_}
, auto_close{auto_close_}
, fetch_by_name(fetch_by_name_)
@ -52,6 +57,64 @@ MySQLBlockInputStream::MySQLBlockInputStream(
initPositionMappingFromQueryResultStructure();
}
/// As a base class contructor for MySQLWithFailoverBlockInputStream.
MySQLBlockInputStream::MySQLBlockInputStream(
const Block & sample_block_,
UInt64 max_block_size_,
bool auto_close_,
bool fetch_by_name_)
: log(&Poco::Logger::get("MySQLBlockInputStream"))
, max_block_size(max_block_size_)
, auto_close(auto_close_)
, fetch_by_name(fetch_by_name_)
{
description.init(sample_block_);
}
/// Used by MySQL storage / table function and dictionary source.
MySQLWithFailoverBlockInputStream::MySQLWithFailoverBlockInputStream(
mysqlxx::PoolWithFailover & pool_,
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_,
const bool auto_close_,
const bool fetch_by_name_,
const size_t max_tries_)
: MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_, fetch_by_name_)
, pool(pool_)
, query_str(query_str_)
, max_tries(max_tries_)
{
}
void MySQLWithFailoverBlockInputStream::readPrefix()
{
size_t count_connection_lost = 0;
/// For recovering from "Lost connection to MySQL server during query" errors
while (true)
{
try
{
connection = std::make_unique<Connection>(pool.get(), query_str);
break;
}
catch (const mysqlxx::ConnectionLost & ecl) /// There are two retriable failures: CR_SERVER_GONE_ERROR, CR_SERVER_LOST
{
LOG_WARNING(log, ecl.displayText());
}
LOG_WARNING(log, "Lost connection ({}/{}). Trying to reconnect...", count_connection_lost, max_tries);
if (++count_connection_lost > max_tries)
{
LOG_ERROR(log, "Failed to create connection to MySQL. ({}/{})", count_connection_lost, max_tries);
throw;
}
}
initPositionMappingFromQueryResultStructure();
}
namespace
{
@ -138,7 +201,11 @@ Block MySQLBlockInputStream::readImpl()
if (!row)
{
if (auto_close)
{
LOG_TRACE(log, "Removing connection, kssenii");
connection->entry.disconnect();
}
LOG_TRACE(log, "Connection disconnected kssenii");
return {};
}
@ -191,18 +258,6 @@ Block MySQLBlockInputStream::readImpl()
return description.sample_block.cloneWithColumns(std::move(columns));
}
MySQLBlockInputStream::MySQLBlockInputStream(
const Block & sample_block_,
UInt64 max_block_size_,
bool auto_close_,
bool fetch_by_name_)
: max_block_size(max_block_size_)
, auto_close(auto_close_)
, fetch_by_name(fetch_by_name_)
{
description.init(sample_block_);
}
void MySQLBlockInputStream::initPositionMappingFromQueryResultStructure()
{
position_mapping.resize(description.sample_block.columns());
@ -250,25 +305,6 @@ void MySQLBlockInputStream::initPositionMappingFromQueryResultStructure()
}
}
MySQLLazyBlockInputStream::MySQLLazyBlockInputStream(
mysqlxx::Pool & pool_,
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_,
const bool auto_close_,
const bool fetch_by_name_)
: MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_, fetch_by_name_)
, pool(pool_)
, query_str(query_str_)
{
}
void MySQLLazyBlockInputStream::readPrefix()
{
connection = std::make_unique<Connection>(pool.get(), query_str);
initPositionMappingFromQueryResultStructure();
}
}
#endif

View File

@ -41,6 +41,7 @@ protected:
mysqlxx::UseQueryResult result;
};
Poco::Logger * log;
std::unique_ptr<Connection> connection;
const UInt64 max_block_size;
@ -50,24 +51,29 @@ protected:
ExternalResultDescription description;
};
/// Like MySQLBlockInputStream, but allocates connection only when reading is starting.
/// It allows to create a lot of stream objects without occupation of all connection pool.
class MySQLLazyBlockInputStream final : public MySQLBlockInputStream
/// Like MySQLBlockInputStream, but allocates connection only when reading is starting
/// and retries in case of connection errors. It allows to create a lot of stream objects
/// without occupation of all connection pool.
class MySQLWithFailoverBlockInputStream final : public MySQLBlockInputStream
{
public:
MySQLLazyBlockInputStream(
mysqlxx::Pool & pool_,
static constexpr inline auto MAX_TRIES_MYSQL_CONNECT = 5;
MySQLWithFailoverBlockInputStream(
mysqlxx::PoolWithFailover & pool_,
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_,
const bool auto_close_ = false,
const bool fetch_by_name_ = false);
const bool fetch_by_name_ = false,
const size_t max_tries_ = MAX_TRIES_MYSQL_CONNECT);
private:
void readPrefix() override;
mysqlxx::Pool & pool;
mysqlxx::PoolWithFailover & pool;
std::string query_str;
size_t max_tries;
};
}

View File

@ -41,7 +41,7 @@ static String backQuoteMySQL(const String & x)
StorageMySQL::StorageMySQL(
const StorageID & table_id_,
mysqlxx::Pool && pool_,
mysqlxx::PoolWithFailover && pool_,
const std::string & remote_database_name_,
const std::string & remote_table_name_,
const bool replace_query_,
@ -94,9 +94,8 @@ Pipe StorageMySQL::read(
sample_block.insert({ column_data.type, column_data.name });
}
/// TODO: rewrite MySQLBlockInputStream
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<MySQLLazyBlockInputStream>(pool, query, sample_block, max_block_size_, /* auto_close = */ true)));
std::make_shared<MySQLWithFailoverBlockInputStream>(pool, query, sample_block, max_block_size_, /* auto_close = */ true)));
}
@ -224,7 +223,7 @@ void registerStorageMySQL(StorageFactory & factory)
if (engine_args.size() < 5 || engine_args.size() > 7)
throw Exception(
"Storage MySQL requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).",
"Storage MySQL requires 5-7 parameters: MySQL('host:port' (or 'addresses_pattern'), database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
@ -238,7 +237,7 @@ void registerStorageMySQL(StorageFactory & factory)
const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
mysqlxx::Pool pool(remote_database, parsed_host_port.first, username, password, parsed_host_port.second);
mysqlxx::PoolWithFailover pool(remote_database, parsed_host_port.first, parsed_host_port.second, username, password);
bool replace_query = false;
std::string on_duplicate_clause;

View File

@ -1,15 +1,14 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#include "config_core.h"
#endif
#if USE_MYSQL
# include <ext/shared_ptr_helper.h>
# include <Storages/IStorage.h>
# include <mysqlxx/Pool.h>
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <mysqlxx/PoolWithFailover.h>
namespace DB
@ -25,7 +24,7 @@ class StorageMySQL final : public ext::shared_ptr_helper<StorageMySQL>, public I
public:
StorageMySQL(
const StorageID & table_id_,
mysqlxx::Pool && pool_,
mysqlxx::PoolWithFailover && pool_,
const std::string & remote_database_name_,
const std::string & remote_table_name_,
const bool replace_query_,
@ -55,7 +54,7 @@ private:
bool replace_query;
std::string on_duplicate_clause;
mysqlxx::Pool pool;
mysqlxx::PoolWithFailover pool;
const Context & global_context;
};

View File

@ -77,7 +77,7 @@ ColumnsDescription TableFunctionMySQL::getActualTableStructure(const Context & c
{
assert(!parsed_host_port.first.empty());
if (!pool)
pool.emplace(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
pool.emplace(remote_database_name, parsed_host_port.first, parsed_host_port.second, user_name, password);
const auto & settings = context.getSettingsRef();
const auto tables_and_columns = fetchTablesColumnsList(*pool, remote_database_name, {remote_table_name}, settings.external_table_functions_use_nulls, settings.mysql_datatypes_support_level);
@ -93,7 +93,7 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & /*ast_function*/, cons
{
assert(!parsed_host_port.first.empty());
if (!pool)
pool.emplace(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
pool.emplace(remote_database_name, parsed_host_port.first, parsed_host_port.second, user_name, password);
auto columns = getActualTableStructure(context);

View File

@ -38,7 +38,7 @@ private:
bool replace_query = false;
String on_duplicate_clause;
mutable std::optional<mysqlxx::Pool> pool;
mutable std::optional<mysqlxx::PoolWithFailover> pool;
};
}

View File

@ -18,15 +18,32 @@ create_table_sql_template = """
PRIMARY KEY (`id`)) ENGINE=InnoDB;
"""
def get_mysql_conn(port=3308):
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=port)
return conn
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def create_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(tableName))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
conn = get_mysql_conn()
## create mysql db and table
create_mysql_db(conn, 'clickhouse')
conn1 = get_mysql_conn(port=3308)
create_mysql_db(conn1, 'clickhouse')
conn2 = get_mysql_conn(port=3388)
create_mysql_db(conn2, 'clickhouse')
yield cluster
finally:
@ -52,6 +69,7 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
assert node1.query(query.format(t=table_name)) == '250\n'
conn.close()
def test_insert_select(started_cluster):
table_name = 'test_insert_select'
conn = get_mysql_conn()
@ -148,6 +166,7 @@ def test_table_function(started_cluster):
assert node1.query("SELECT sum(`money`) FROM {}".format(table_function)).rstrip() == '60000'
conn.close()
def test_binary_type(started_cluster):
conn = get_mysql_conn()
with conn.cursor() as cursor:
@ -156,6 +175,7 @@ def test_binary_type(started_cluster):
node1.query("INSERT INTO {} VALUES (42, 'clickhouse')".format('TABLE FUNCTION ' + table_function))
assert node1.query("SELECT * FROM {}".format(table_function)) == '42\tclickhouse\\0\\0\\0\\0\\0\\0\n'
def test_enum_type(started_cluster):
table_name = 'test_enum_type'
conn = get_mysql_conn()
@ -168,20 +188,39 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, source Enum8('
conn.close()
def get_mysql_conn():
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
return conn
def test_mysql_many_replicas(started_cluster):
table_name = 'test_replicas'
conn1 = get_mysql_conn(port=3308)
create_mysql_table(conn1, table_name)
conn2 = get_mysql_conn(port=3388)
create_mysql_table(conn2, table_name)
# Storage with mysql{1|2}
node1.query('''
CREATE TABLE test_replicas
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = MySQL(`mysql{1|2}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse'); ''')
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
# Fill remote tables with different data to be able to check
node1.query('''
CREATE TABLE test_replica1
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = MySQL(`mysql1:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');''')
node1.query('''
CREATE TABLE test_replica2
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = MySQL(`mysql2:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse'); ''')
node1.query("INSERT INTO test_replica1 (id, name) SELECT number, 'host1' from numbers(10) ")
node1.query("INSERT INTO test_replica2 (id, name) SELECT number, 'host2' from numbers(10) ")
# check both remote replicas are accessible throught that table
query = "SELECT * FROM ("
for i in range (2):
query += "SELECT name FROM test_replicas UNION DISTINCT "
query += "SELECT name FROM test_replicas)"
def create_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(tableName))
result = node1.query(query.format(t=table_name))
assert(result == 'host1\nhost2\n')
if __name__ == '__main__':