From 4f41ebcae365fdb8a0b3f824e215672a625c8a51 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sat, 15 May 2021 07:40:43 +0300 Subject: [PATCH] Add settings (for connections) for MySQL storage engine Default settings are not very efficient, since they do not even reuse connections. And when each query requires connection you can have only ~80 QPS, while by simply enabling connection reuse (connection_auto_close=false) you can have ~500 QPS (and by increasing connection_pool_size you can have better QPS throughput). So this patch allows to pass through some connection related settings for the StorageMySQL engine, like: - connection_pool_size=16 - connection_max_tries=3 - connection_auto_close=true v2: remove connection_pool_default_size v3: remove num_tries_on_connection_loss --- base/mysqlxx/PoolWithFailover.cpp | 10 ++++- base/mysqlxx/PoolWithFailover.h | 2 + .../table-engines/integrations/mysql.md | 7 +++- src/CMakeLists.txt | 1 + .../MySQL/DatabaseConnectionMySQL.cpp | 8 ++-- src/Storages/MySQL/MySQLSettings.cpp | 42 +++++++++++++++++++ src/Storages/MySQL/MySQLSettings.h | 32 ++++++++++++++ src/Storages/StorageExternalDistributed.cpp | 4 +- src/Storages/StorageMySQL.cpp | 28 +++++++++++-- src/Storages/StorageMySQL.h | 6 ++- src/Storages/ya.make | 1 + src/TableFunctions/TableFunctionMySQL.cpp | 4 +- 12 files changed, 133 insertions(+), 12 deletions(-) create mode 100644 src/Storages/MySQL/MySQLSettings.cpp create mode 100644 src/Storages/MySQL/MySQLSettings.h diff --git a/base/mysqlxx/PoolWithFailover.cpp b/base/mysqlxx/PoolWithFailover.cpp index ea2d060e596..e317ab7f228 100644 --- a/base/mysqlxx/PoolWithFailover.cpp +++ b/base/mysqlxx/PoolWithFailover.cpp @@ -78,6 +78,8 @@ PoolWithFailover::PoolWithFailover( const RemoteDescription & addresses, const std::string & user, const std::string & password, + unsigned default_connections_, + unsigned max_connections_, size_t max_tries_) : max_tries(max_tries_) , shareable(false) @@ -85,7 +87,13 @@ PoolWithFailover::PoolWithFailover( /// Replicas have the same priority, but traversed replicas are moved to the end of the queue. for (const auto & [host, port] : addresses) { - replicas_by_priority[0].emplace_back(std::make_shared(database, host, user, password, port)); + replicas_by_priority[0].emplace_back(std::make_shared(database, + host, user, password, port, + /* socket_ = */ "", + MYSQLXX_DEFAULT_TIMEOUT, + MYSQLXX_DEFAULT_RW_TIMEOUT, + default_connections_, + max_connections_)); } } diff --git a/base/mysqlxx/PoolWithFailover.h b/base/mysqlxx/PoolWithFailover.h index 5154fc3e253..1c7a63e76c0 100644 --- a/base/mysqlxx/PoolWithFailover.h +++ b/base/mysqlxx/PoolWithFailover.h @@ -115,6 +115,8 @@ namespace mysqlxx const RemoteDescription & addresses, const std::string & user, const std::string & password, + unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, + unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS, size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); PoolWithFailover(const PoolWithFailover & other); diff --git a/docs/en/engines/table-engines/integrations/mysql.md b/docs/en/engines/table-engines/integrations/mysql.md index 3847e7a9e0e..9bd12e97dd8 100644 --- a/docs/en/engines/table-engines/integrations/mysql.md +++ b/docs/en/engines/table-engines/integrations/mysql.md @@ -15,7 +15,12 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2], ... -) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']); +) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']) +SETTINGS + [connection_pool_size=16, ] + [connection_max_tries=3, ] + [connection_auto_close=true ] +; ``` See a detailed description of the [CREATE TABLE](../../../sql-reference/statements/create/table.md#create-table-query) query. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a0f36163d68..a32817928fc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -187,6 +187,7 @@ add_object_library(clickhouse_interpreters_clusterproxy Interpreters/ClusterProx add_object_library(clickhouse_interpreters_jit Interpreters/JIT) add_object_library(clickhouse_columns Columns) add_object_library(clickhouse_storages Storages) +add_object_library(clickhouse_storages_mysql Storages/MySQL) add_object_library(clickhouse_storages_distributed Storages/Distributed) add_object_library(clickhouse_storages_mergetree Storages/MergeTree) add_object_library(clickhouse_storages_liveview Storages/LiveView) diff --git a/src/Databases/MySQL/DatabaseConnectionMySQL.cpp b/src/Databases/MySQL/DatabaseConnectionMySQL.cpp index 5cd59f8a7c8..9b71fe537ec 100644 --- a/src/Databases/MySQL/DatabaseConnectionMySQL.cpp +++ b/src/Databases/MySQL/DatabaseConnectionMySQL.cpp @@ -20,6 +20,7 @@ # include # include # include +# include # include # include # include @@ -253,12 +254,13 @@ void DatabaseConnectionMySQL::fetchLatestTablesStructureIntoCache( std::move(mysql_pool), database_name_in_mysql, table_name, - false, - "", + /* replace_query_ */ false, + /* on_duplicate_clause = */ "", ColumnsDescription{columns_name_and_type}, ConstraintsDescription{}, String{}, - getContext())); + getContext(), + MySQLSettings{})); } } diff --git a/src/Storages/MySQL/MySQLSettings.cpp b/src/Storages/MySQL/MySQLSettings.cpp new file mode 100644 index 00000000000..1a8f0804777 --- /dev/null +++ b/src/Storages/MySQL/MySQLSettings.cpp @@ -0,0 +1,42 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_SETTING; +} + +IMPLEMENT_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS) + +void MySQLSettings::loadFromQuery(ASTStorage & storage_def) +{ + if (storage_def.settings) + { + try + { + applyChanges(storage_def.settings->changes); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_SETTING) + e.addMessage("for storage " + storage_def.engine->name); + throw; + } + } + else + { + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage_def.set(storage_def.settings, settings_ast); + } +} + +} + diff --git a/src/Storages/MySQL/MySQLSettings.h b/src/Storages/MySQL/MySQLSettings.h new file mode 100644 index 00000000000..da8723c2ea6 --- /dev/null +++ b/src/Storages/MySQL/MySQLSettings.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + + +namespace Poco::Util +{ + class AbstractConfiguration; +} + + +namespace DB +{ +class ASTStorage; + +#define LIST_OF_MYSQL_SETTINGS(M) \ + M(UInt64, connection_pool_size, 16, "Size of connection pool (if all connections are in use, the query will wait until some connection will be freed).", 0) \ + M(UInt64, connection_max_tries, 3, "Number of retries for pool with failover", 0) \ + M(Bool, connection_auto_close, true, "Auto-close connection after query execution, i.e. disable connection reuse.", 0) \ + +DECLARE_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS) + + +/** Settings for the MySQL family of engines. + */ +struct MySQLSettings : public BaseSettings +{ + void loadFromQuery(ASTStorage & storage_def); +}; + +} diff --git a/src/Storages/StorageExternalDistributed.cpp b/src/Storages/StorageExternalDistributed.cpp index 5a153f16a0a..32b9c7e9245 100644 --- a/src/Storages/StorageExternalDistributed.cpp +++ b/src/Storages/StorageExternalDistributed.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -79,7 +80,8 @@ StorageExternalDistributed::StorageExternalDistributed( columns_, constraints_, String{}, - context); + context, + MySQLSettings{}); break; } #endif diff --git a/src/Storages/StorageMySQL.cpp b/src/Storages/StorageMySQL.cpp index 4cf69d7dd77..1dadcfe986b 100644 --- a/src/Storages/StorageMySQL.cpp +++ b/src/Storages/StorageMySQL.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -50,13 +51,15 @@ StorageMySQL::StorageMySQL( const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const String & comment, - ContextPtr context_) + ContextPtr context_, + const MySQLSettings & mysql_settings_) : IStorage(table_id_) , WithContext(context_->getGlobalContext()) , remote_database_name(remote_database_name_) , remote_table_name(remote_table_name_) , replace_query{replace_query_} , on_duplicate_clause{on_duplicate_clause_} + , mysql_settings(mysql_settings_) , pool(std::make_shared(pool_)) { StorageInMemoryMetadata storage_metadata; @@ -98,7 +101,8 @@ Pipe StorageMySQL::read( } - StreamSettings mysql_input_stream_settings(context_->getSettingsRef(), true, false); + StreamSettings mysql_input_stream_settings(context_->getSettingsRef(), + mysql_settings.connection_auto_close); return Pipe(std::make_shared( std::make_shared(pool, query, sample_block, mysql_input_stream_settings))); } @@ -250,8 +254,22 @@ void registerStorageMySQL(StorageFactory & factory) const String & password = engine_args[4]->as().value.safeGet(); size_t max_addresses = args.getContext()->getSettingsRef().glob_expansion_max_elements; + /// TODO: move some arguments from the arguments to the SETTINGS. + MySQLSettings mysql_settings; + if (args.storage_def->settings) + { + mysql_settings.loadFromQuery(*args.storage_def); + } + + if (!mysql_settings.connection_pool_size) + throw Exception("connection_pool_size cannot be zero.", ErrorCodes::BAD_ARGUMENTS); + auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306); - mysqlxx::PoolWithFailover pool(remote_database, addresses, username, password); + mysqlxx::PoolWithFailover pool(remote_database, addresses, + username, password, + MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, + mysql_settings.connection_pool_size, + mysql_settings.connection_max_tries); bool replace_query = false; std::string on_duplicate_clause; @@ -275,9 +293,11 @@ void registerStorageMySQL(StorageFactory & factory) args.columns, args.constraints, args.comment, - args.getContext()); + args.getContext(), + mysql_settings); }, { + .supports_settings = true, .source_access_type = AccessType::MYSQL, }); } diff --git a/src/Storages/StorageMySQL.h b/src/Storages/StorageMySQL.h index a7aca48197e..5eb9ed14524 100644 --- a/src/Storages/StorageMySQL.h +++ b/src/Storages/StorageMySQL.h @@ -9,6 +9,7 @@ #include #include +#include #include @@ -33,7 +34,8 @@ public: const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const String & comment, - ContextPtr context_); + ContextPtr context_, + const MySQLSettings & mysql_settings_); std::string getName() const override { return "MySQL"; } @@ -56,6 +58,8 @@ private: bool replace_query; std::string on_duplicate_clause; + MySQLSettings mysql_settings; + mysqlxx::PoolWithFailoverPtr pool; }; diff --git a/src/Storages/ya.make b/src/Storages/ya.make index d83ba7f6490..8e0efac8c6e 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -112,6 +112,7 @@ SRCS( MergeTree/localBackup.cpp MergeTree/registerStorageMergeTree.cpp MutationCommands.cpp + MySQL/MySQLSettings.cpp PartitionCommands.cpp ProjectionsDescription.cpp ReadInOrderOptimizer.cpp diff --git a/src/TableFunctions/TableFunctionMySQL.cpp b/src/TableFunctions/TableFunctionMySQL.cpp index 325b2dc44c6..0b60e11f490 100644 --- a/src/TableFunctions/TableFunctionMySQL.cpp +++ b/src/TableFunctions/TableFunctionMySQL.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -107,7 +108,8 @@ StoragePtr TableFunctionMySQL::executeImpl( columns, ConstraintsDescription{}, String{}, - context); + context, + MySQLSettings{}); pool.reset();