Block on empty connection pool

This commit is contained in:
kssenii 2021-03-19 20:57:00 +00:00
parent e066feb81c
commit 3b32c8fe9a
7 changed files with 86 additions and 62 deletions

View File

@ -366,6 +366,9 @@ class IColumn;
M(Bool, check_query_single_value_result, true, "Return check query result as single 1/0 value", 0) \
M(Bool, allow_drop_detached, false, "Allow ALTER TABLE ... DROP DETACHED PART[ITION] ... queries", 0) \
\
M(UInt64, postgresql_connection_pool_size, 16, "Connection pool size for PostgreSQL table engine and database engine.", 0) \
M(Int64, postgresql_connection_pool_wait_timeout, -1, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \
\
M(Seconds, distributed_replica_error_half_life, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD, "Time period reduces replica error counter by 2 times.", 0) \
M(UInt64, distributed_replica_error_cap, DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT, "Max number of errors per replica, prevents piling up an incredible amount of errors if replica was offline for some time and allows it to be reconsidered in a shorter amount of time.", 0) \
M(UInt64, distributed_replica_max_ignored_errors, 0, "Number of errors that will be ignored while choosing replicas", 0) \

View File

@ -247,7 +247,11 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
/// no connection is made here
auto connection_pool = std::make_shared<PostgreSQLConnectionPool>(
postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password);
postgres_database_name,
parsed_host_port.first, parsed_host_port.second,
username, password,
context.getSettingsRef().postgresql_connection_pool_size,
context.getSettingsRef().postgresql_connection_pool_wait_timeout);
return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, postgres_database_name, connection_pool, use_table_cache);

View File

@ -14,9 +14,23 @@ namespace DB
{
PostgreSQLConnectionPool::PostgreSQLConnectionPool(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
: pool(std::make_shared<Pool>(POSTGRESQL_POOL_DEFAULT_SIZE))
std::string dbname,
std::string host,
UInt16 port,
std::string user,
std::string password,
size_t pool_size_,
int64_t pool_wait_timeout_)
: pool(std::make_shared<Pool>(pool_size_))
, pool_size(pool_size_)
, pool_wait_timeout(pool_wait_timeout_)
, block_on_empty_pool(pool_wait_timeout == -1)
{
LOG_INFO(
&Poco::Logger::get("PostgreSQLConnectionPool"),
"New connection pool. Size: {}, blocks on empty pool: {}",
pool_size, block_on_empty_pool);
address = host + ':' + std::to_string(port);
connection_str = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password));
initialize();
@ -24,9 +38,12 @@ PostgreSQLConnectionPool::PostgreSQLConnectionPool(
PostgreSQLConnectionPool::PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other)
: pool(std::make_shared<Pool>(POSTGRESQL_POOL_DEFAULT_SIZE))
: pool(std::make_shared<Pool>(other.pool_size))
, connection_str(other.connection_str)
, address(other.address)
, pool_size(other.pool_size)
, pool_wait_timeout(other.pool_wait_timeout)
, block_on_empty_pool(other.block_on_empty_pool)
{
initialize();
}
@ -35,7 +52,7 @@ PostgreSQLConnectionPool::PostgreSQLConnectionPool(const PostgreSQLConnectionPoo
void PostgreSQLConnectionPool::initialize()
{
/// No connection is made, just fill pool with non-connected connection objects.
for (size_t i = 0; i < POSTGRESQL_POOL_DEFAULT_SIZE; ++i)
for (size_t i = 0; i < pool_size; ++i)
pool->push(std::make_shared<PostgreSQLConnection>(connection_str, address));
}
@ -56,7 +73,16 @@ std::string PostgreSQLConnectionPool::formatConnectionString(
PostgreSQLConnectionHolderPtr PostgreSQLConnectionPool::get()
{
PostgreSQLConnectionPtr connection;
if (pool->tryPop(connection, POSTGRESQL_POOL_WAIT_MS))
/// Always blocks by default.
if (block_on_empty_pool)
{
/// pop to ConcurrentBoundedQueue will block untill it is non-empty.
pool->pop(connection);
return std::make_shared<PostgreSQLConnectionHolder>(connection, *pool);
}
if (pool->tryPop(connection, pool_wait_timeout))
{
return std::make_shared<PostgreSQLConnectionHolder>(connection, *pool);
}

View File

@ -14,17 +14,27 @@ namespace DB
class PostgreSQLReplicaConnection;
/// Connection pool of size POSTGRESQL_POOL_DEFAULT_SIZE = 16.
/// If it was not possible to fetch connection within a timeout, a new connection is made.
/// If it was not possible to put connection back into pool within a timeout, it is closed.
/// Connection pool size is defined by user with setting `postgresql_connection_pool_size` (default 16).
/// If pool is empty, it will block untill there are available connections.
/// If setting `connection_pool_wait_timeout` is defined, it will not block on empty pool and will
/// wait untill the timeout and then create a new connection. (only for storage/db engine)
class PostgreSQLConnectionPool
{
friend class PostgreSQLReplicaConnection;
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
public:
PostgreSQLConnectionPool(std::string dbname, std::string host, UInt16 port, std::string user, std::string password);
PostgreSQLConnectionPool(
std::string dbname,
std::string host,
UInt16 port,
std::string user,
std::string password,
size_t pool_size_ = POSTGRESQL_POOL_DEFAULT_SIZE,
int64_t pool_wait_timeout_ = -1);
PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other);
@ -33,9 +43,6 @@ public:
PostgreSQLConnectionHolderPtr get();
private:
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
static constexpr inline auto POSTGRESQL_POOL_WAIT_MS = 50;
using Pool = ConcurrentBoundedQueue<PostgreSQLConnectionPtr>;
using PoolPtr = std::shared_ptr<Pool>;
@ -46,6 +53,9 @@ private:
PoolPtr pool;
std::string connection_str, address;
size_t pool_size;
int64_t pool_wait_timeout;
bool block_on_empty_pool;
};
using PostgreSQLConnectionPoolPtr = std::shared_ptr<PostgreSQLConnectionPool>;

View File

@ -317,7 +317,9 @@ void registerStoragePostgreSQL(StorageFactory & factory)
parsed_host_port.first,
parsed_host_port.second,
engine_args[3]->as<ASTLiteral &>().value.safeGet<String>(),
engine_args[4]->as<ASTLiteral &>().value.safeGet<String>());
engine_args[4]->as<ASTLiteral &>().value.safeGet<String>(),
args.context.getSettingsRef().postgresql_connection_pool_size,
args.context.getSettingsRef().postgresql_connection_pool_wait_timeout);
return StoragePostgreSQL::create(
args.table_id, remote_table, connection_pool, args.columns, args.constraints, args.context, remote_table_schema);

View File

@ -1084,8 +1084,8 @@ class ClickHouseInstance:
def count_in_log(self, substring):
result = self.exec_in_container(
["bash", "-c", 'grep "{}" /var/log/clickhouse-server/clickhouse-server.log || true'.format(substring)])
return len(result)
["bash", "-c", 'grep "{}" /var/log/clickhouse-server/clickhouse-server.log | wc -l'.format(substring)])
return result
def wait_for_log_line(self, regexp, filename='/var/log/clickhouse-server/clickhouse-server.log', timeout=30, repetitions=1, look_behind_lines=100):
start_time = time.time()

View File

@ -166,79 +166,58 @@ def test_non_default_scema(started_cluster):
assert(result == expected)
def test_connection_pool(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
node1.query('''
CREATE TABLE test_table (key UInt32, value UInt32)
ENGINE = PostgreSQL('postgres1:5432', 'clickhouse', 'test_table', 'postgres', 'mysecretpassword')''')
cursor.execute('CREATE TABLE test_table (key integer, value integer)')
# Make sure connection pool is filled
def node_pool(_):
result = node1.query("INSERT INTO test_table SELECT number, number FROM numbers(1000)", user='default')
busy_pool = Pool(16)
p = busy_pool.map_async(node_pool, range(16))
p.wait()
prev_count = node1.count_in_log('New connection to postgres*')
# Check connections do not open anymore
busy_pool = Pool(10)
p = busy_pool.map_async(node_pool, range(10))
p.wait()
count = node1.count_in_log('New connection to postgres*')
node1.query('DROP TABLE test_table;')
cursor.execute('DROP TABLE test_table;')
assert(count == prev_count)
def test_concurrent_queries(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
node1.query('''
CREATE TABLE test_table (key UInt32, value UInt32)
ENGINE = PostgreSQL('postgres1:5432', 'clickhouse', 'test_table', 'postgres', 'mysecretpassword')''')
cursor.execute('CREATE TABLE test_table (key integer, value integer)')
def node_insert(_):
prev_count = node1.count_in_log('New connection to postgres1:5432')
def node_select(_):
for i in range(20):
result = node1.query("SELECT * FROM test_table", user='default')
busy_pool = Pool(20)
p = busy_pool.map_async(node_select, range(20))
p.wait()
count = node1.count_in_log('New connection to postgres1:5432')
print(count, prev_count)
# 16 is default size for connection pool
assert(int(count) == int(prev_count) + 16)
def node_insert(_):
for i in range(5):
result = node1.query("INSERT INTO test_table SELECT number, number FROM numbers(1000)", user='default')
busy_pool = Pool(10)
p = busy_pool.map_async(node_insert, range(10))
busy_pool = Pool(5)
p = busy_pool.map_async(node_insert, range(5))
p.wait()
result = node1.query("SELECT count() FROM test_table", user='default')
print(result)
assert(int(result) == 20 * 10 * 1000)
def node_select(_):
for i in range(5):
result = node1.query("SELECT * FROM test_table", user='default')
busy_pool = Pool(10)
p = busy_pool.map_async(node_select, range(10))
p.wait()
assert(int(result) == 5 * 5 * 1000)
def node_insert_select(_):
for i in range(20):
for i in range(5):
result = node1.query("INSERT INTO test_table SELECT number, number FROM numbers(1000)", user='default')
result = node1.query("SELECT * FROM test_table LIMIT 100", user='default')
busy_pool = Pool(10)
p = busy_pool.map_async(node_insert_select, range(10))
busy_pool = Pool(5)
p = busy_pool.map_async(node_insert_select, range(5))
p.wait()
result = node1.query("SELECT count() FROM test_table", user='default')
print(result)
assert(int(result) == 20 * 10 * 1000 * 2)
assert(int(result) == 5 * 5 * 1000 * 2)
node1.query('DROP TABLE test_table;')
cursor.execute('DROP TABLE test_table;')
count = node1.count_in_log('New connection to postgres1:5432')
print(count, prev_count)
assert(int(count) == int(prev_count) + 16)
if __name__ == '__main__':
cluster.start()