Add settings (for connections) for MySQL storage engine

Default settings are not very efficient, since they do not even reuse
connections.
And when each query requires connection you can have only ~80 QPS, while
by simply enabling connection reuse (connection_auto_close=false) you
can have ~500 QPS (and by increasing connection_pool_size you can have
better QPS throughput).

So this patch allows to pass through some connection related settings
for the StorageMySQL engine, like:
- connection_pool_size=16
- connection_max_tries=3
- connection_auto_close=true

v2: remove connection_pool_default_size
v3: remove num_tries_on_connection_loss
This commit is contained in:
Azat Khuzhin 2021-05-15 07:40:43 +03:00
parent 2c4920186b
commit 4f41ebcae3
12 changed files with 133 additions and 12 deletions

View File

@ -78,6 +78,8 @@ PoolWithFailover::PoolWithFailover(
const RemoteDescription & addresses,
const std::string & user,
const std::string & password,
unsigned default_connections_,
unsigned max_connections_,
size_t max_tries_)
: max_tries(max_tries_)
, shareable(false)
@ -85,7 +87,13 @@ PoolWithFailover::PoolWithFailover(
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
for (const auto & [host, port] : addresses)
{
replicas_by_priority[0].emplace_back(std::make_shared<Pool>(database, host, user, password, port));
replicas_by_priority[0].emplace_back(std::make_shared<Pool>(database,
host, user, password, port,
/* socket_ = */ "",
MYSQLXX_DEFAULT_TIMEOUT,
MYSQLXX_DEFAULT_RW_TIMEOUT,
default_connections_,
max_connections_));
}
}

View File

@ -115,6 +115,8 @@ namespace mysqlxx
const RemoteDescription & addresses,
const std::string & user,
const std::string & password,
unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(const PoolWithFailover & other);

View File

@ -15,7 +15,12 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])
SETTINGS
[connection_pool_size=16, ]
[connection_max_tries=3, ]
[connection_auto_close=true ]
;
```
See a detailed description of the [CREATE TABLE](../../../sql-reference/statements/create/table.md#create-table-query) query.

View File

@ -187,6 +187,7 @@ add_object_library(clickhouse_interpreters_clusterproxy Interpreters/ClusterProx
add_object_library(clickhouse_interpreters_jit Interpreters/JIT)
add_object_library(clickhouse_columns Columns)
add_object_library(clickhouse_storages Storages)
add_object_library(clickhouse_storages_mysql Storages/MySQL)
add_object_library(clickhouse_storages_distributed Storages/Distributed)
add_object_library(clickhouse_storages_mergetree Storages/MergeTree)
add_object_library(clickhouse_storages_liveview Storages/LiveView)

View File

@ -20,6 +20,7 @@
# include <Parsers/parseQuery.h>
# include <Parsers/queryToString.h>
# include <Storages/StorageMySQL.h>
# include <Storages/MySQL/MySQLSettings.h>
# include <Common/escapeForFileName.h>
# include <Common/parseAddress.h>
# include <Common/setThreadName.h>
@ -253,12 +254,13 @@ void DatabaseConnectionMySQL::fetchLatestTablesStructureIntoCache(
std::move(mysql_pool),
database_name_in_mysql,
table_name,
false,
"",
/* replace_query_ */ false,
/* on_duplicate_clause = */ "",
ColumnsDescription{columns_name_and_type},
ConstraintsDescription{},
String{},
getContext()));
getContext(),
MySQLSettings{}));
}
}

View File

@ -0,0 +1,42 @@
#include <Storages/MySQL/MySQLSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS)
void MySQLSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
e.addMessage("for storage " + storage_def.engine->name);
throw;
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Core/Defines.h>
#include <Core/BaseSettings.h>
namespace Poco::Util
{
class AbstractConfiguration;
}
namespace DB
{
class ASTStorage;
#define LIST_OF_MYSQL_SETTINGS(M) \
M(UInt64, connection_pool_size, 16, "Size of connection pool (if all connections are in use, the query will wait until some connection will be freed).", 0) \
M(UInt64, connection_max_tries, 3, "Number of retries for pool with failover", 0) \
M(Bool, connection_auto_close, true, "Auto-close connection after query execution, i.e. disable connection reuse.", 0) \
DECLARE_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS)
/** Settings for the MySQL family of engines.
*/
struct MySQLSettings : public BaseSettings<MySQLSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -12,6 +12,7 @@
#include <Processors/Pipe.h>
#include <Common/parseRemoteDescription.h>
#include <Storages/StorageMySQL.h>
#include <Storages/MySQL/MySQLSettings.h>
#include <Storages/StoragePostgreSQL.h>
#include <Storages/StorageURL.h>
#include <common/logger_useful.h>
@ -79,7 +80,8 @@ StorageExternalDistributed::StorageExternalDistributed(
columns_,
constraints_,
String{},
context);
context,
MySQLSettings{});
break;
}
#endif

View File

@ -15,6 +15,7 @@
#include <IO/Operators.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTCreateQuery.h>
#include <mysqlxx/Transaction.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
@ -50,13 +51,15 @@ StorageMySQL::StorageMySQL(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_)
ContextPtr context_,
const MySQLSettings & mysql_settings_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, remote_database_name(remote_database_name_)
, remote_table_name(remote_table_name_)
, replace_query{replace_query_}
, on_duplicate_clause{on_duplicate_clause_}
, mysql_settings(mysql_settings_)
, pool(std::make_shared<mysqlxx::PoolWithFailover>(pool_))
{
StorageInMemoryMetadata storage_metadata;
@ -98,7 +101,8 @@ Pipe StorageMySQL::read(
}
StreamSettings mysql_input_stream_settings(context_->getSettingsRef(), true, false);
StreamSettings mysql_input_stream_settings(context_->getSettingsRef(),
mysql_settings.connection_auto_close);
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<MySQLWithFailoverBlockInputStream>(pool, query, sample_block, mysql_input_stream_settings)));
}
@ -250,8 +254,22 @@ void registerStorageMySQL(StorageFactory & factory)
const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
size_t max_addresses = args.getContext()->getSettingsRef().glob_expansion_max_elements;
/// TODO: move some arguments from the arguments to the SETTINGS.
MySQLSettings mysql_settings;
if (args.storage_def->settings)
{
mysql_settings.loadFromQuery(*args.storage_def);
}
if (!mysql_settings.connection_pool_size)
throw Exception("connection_pool_size cannot be zero.", ErrorCodes::BAD_ARGUMENTS);
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
mysqlxx::PoolWithFailover pool(remote_database, addresses, username, password);
mysqlxx::PoolWithFailover pool(remote_database, addresses,
username, password,
MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
mysql_settings.connection_pool_size,
mysql_settings.connection_max_tries);
bool replace_query = false;
std::string on_duplicate_clause;
@ -275,9 +293,11 @@ void registerStorageMySQL(StorageFactory & factory)
args.columns,
args.constraints,
args.comment,
args.getContext());
args.getContext(),
mysql_settings);
},
{
.supports_settings = true,
.source_access_type = AccessType::MYSQL,
});
}

View File

@ -9,6 +9,7 @@
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/MySQL/MySQLSettings.h>
#include <mysqlxx/PoolWithFailover.h>
@ -33,7 +34,8 @@ public:
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_);
ContextPtr context_,
const MySQLSettings & mysql_settings_);
std::string getName() const override { return "MySQL"; }
@ -56,6 +58,8 @@ private:
bool replace_query;
std::string on_duplicate_clause;
MySQLSettings mysql_settings;
mysqlxx::PoolWithFailoverPtr pool;
};

View File

@ -112,6 +112,7 @@ SRCS(
MergeTree/localBackup.cpp
MergeTree/registerStorageMergeTree.cpp
MutationCommands.cpp
MySQL/MySQLSettings.cpp
PartitionCommands.cpp
ProjectionsDescription.cpp
ReadInOrderOptimizer.cpp

View File

@ -15,6 +15,7 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageMySQL.h>
#include <Storages/MySQL/MySQLSettings.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionMySQL.h>
@ -107,7 +108,8 @@ StoragePtr TableFunctionMySQL::executeImpl(
columns,
ConstraintsDescription{},
String{},
context);
context,
MySQLSettings{});
pool.reset();