This commit is contained in:
kssenii 2021-03-16 13:44:20 +00:00
parent 51cb89973d
commit 3903d59d30
12 changed files with 87 additions and 116 deletions

View File

@ -28,14 +28,13 @@ namespace ErrorCodes
} }
PostgreSQLBlockInputStream::PostgreSQLBlockInputStream( PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
PostgreSQLConnectionPoolPtr connection_pool_, WrappedPostgreSQLConnection connection_,
const std::string & query_str_, const std::string & query_str_,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_) const UInt64 max_block_size_)
: query_str(query_str_) : query_str(query_str_)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, connection_pool(connection_pool_) , connection(connection_)
, connection(connection_pool->get())
{ {
description.init(sample_block); description.init(sample_block);
for (const auto idx : ext::range(0, description.sample_block.columns())) for (const auto idx : ext::range(0, description.sample_block.columns()))
@ -112,9 +111,6 @@ void PostgreSQLBlockInputStream::readSuffix()
stream->complete(); stream->complete();
tx->commit(); tx->commit();
} }
if (connection->is_open())
connection_pool->put(connection);
} }

View File

@ -19,7 +19,7 @@ class PostgreSQLBlockInputStream : public IBlockInputStream
{ {
public: public:
PostgreSQLBlockInputStream( PostgreSQLBlockInputStream(
PostgreSQLConnectionPoolPtr connection_pool_, WrappedPostgreSQLConnection connection_,
const std::string & query_str, const std::string & query_str,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_); const UInt64 max_block_size_);
@ -46,8 +46,7 @@ private:
const UInt64 max_block_size; const UInt64 max_block_size;
ExternalResultDescription description; ExternalResultDescription description;
PostgreSQLConnectionPoolPtr connection_pool; WrappedPostgreSQLConnection connection;
PostgreSQLConnection::ConnectionPtr connection;
std::unique_ptr<pqxx::read_transaction> tx; std::unique_ptr<pqxx::read_transaction> tx;
std::unique_ptr<pqxx::stream_from> stream; std::unique_ptr<pqxx::stream_from> stream;

View File

@ -96,7 +96,6 @@ std::unordered_set<std::string> DatabasePostgreSQL::fetchTablesList() const
for (auto table_name : tx.stream<std::string>(query)) for (auto table_name : tx.stream<std::string>(query))
tables.insert(std::get<0>(table_name)); tables.insert(std::get<0>(table_name));
connection_pool->put(connection);
return tables; return tables;
} }
@ -132,7 +131,6 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
throw; throw;
} }
connection_pool->put(connection);
return true; return true;
} }
@ -167,7 +165,7 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, const Conte
return StoragePtr{}; return StoragePtr{};
auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls; auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
auto columns = fetchPostgreSQLTableStructure(connection_pool->get(), table_name, use_nulls); auto columns = fetchPostgreSQLTableStructure(connection_pool->get(), doubleQuoteString(table_name), use_nulls);
if (!columns) if (!columns)
return StoragePtr{}; return StoragePtr{};

View File

@ -94,7 +94,7 @@ static DataTypePtr convertPostgreSQLDataType(std::string & type, bool is_nullabl
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure( std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
std::shared_ptr<pqxx::connection> connection, const String & postgres_table_name, bool use_nulls) WrappedPostgreSQLConnection connection, const String & postgres_table_name, bool use_nulls)
{ {
auto columns = NamesAndTypesList(); auto columns = NamesAndTypesList();

View File

@ -12,7 +12,7 @@ namespace DB
{ {
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure( std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
std::shared_ptr<pqxx::connection> connection, const String & postgres_table_name, bool use_nulls); WrappedPostgreSQLConnection connection, const String & postgres_table_name, bool use_nulls);
} }

View File

@ -20,17 +20,6 @@ PostgreSQLConnection::PostgreSQLConnection(
} }
PostgreSQLConnection::PostgreSQLConnection(
ConnectionPtr connection_,
const String & connection_str_,
const String & address_)
: connection(std::move(connection_))
, connection_str(connection_str_)
, address(address_)
{
}
PostgreSQLConnection::PostgreSQLConnection(const PostgreSQLConnection & other) PostgreSQLConnection::PostgreSQLConnection(const PostgreSQLConnection & other)
: connection_str(other.connection_str) : connection_str(other.connection_str)
, address(other.address) , address(other.address)
@ -40,31 +29,31 @@ PostgreSQLConnection::PostgreSQLConnection(const PostgreSQLConnection & other)
PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::get() PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::get()
{ {
connect(); connectIfNeeded();
return connection; return connection;
} }
PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::tryGet() PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::tryGet()
{ {
if (tryConnect()) if (tryConnectIfNeeded())
return connection; return connection;
return nullptr; return nullptr;
} }
void PostgreSQLConnection::connect() void PostgreSQLConnection::connectIfNeeded()
{ {
if (!connection || !connection->is_open()) if (!connection || !connection->is_open())
connection = std::make_shared<pqxx::connection>(connection_str); connection = std::make_shared<pqxx::connection>(connection_str);
} }
bool PostgreSQLConnection::tryConnect() bool PostgreSQLConnection::tryConnectIfNeeded()
{ {
try try
{ {
connect(); connectIfNeeded();
} }
catch (const pqxx::broken_connection & pqxx_error) catch (const pqxx::broken_connection & pqxx_error)
{ {

View File

@ -12,44 +12,65 @@
namespace DB namespace DB
{ {
class PostgreSQLConnection class PostgreSQLConnection
{ {
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
public: public:
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
PostgreSQLConnection( PostgreSQLConnection(
const String & connection_str_, const String & connection_str_,
const String & address_); const String & address_);
PostgreSQLConnection(
ConnectionPtr connection_,
const String & connection_str_,
const String & address_);
PostgreSQLConnection(const PostgreSQLConnection & other); PostgreSQLConnection(const PostgreSQLConnection & other);
const std::string & getAddress() { return address; }
ConnectionPtr get(); ConnectionPtr get();
ConnectionPtr tryGet(); ConnectionPtr tryGet();
bool connected() { return tryConnect(); } bool isConnected() { return tryConnectIfNeeded(); }
bool available() { return ref_count.load() == 0; }
void incrementRef() { ref_count++; }
void decrementRef() { ref_count--; }
private: private:
void connect(); void connectIfNeeded();
bool tryConnect(); bool tryConnectIfNeeded();
const std::string & getAddress() { return address; }
ConnectionPtr connection; ConnectionPtr connection;
std::string connection_str; std::string connection_str, address;
std::string address; std::atomic<uint8_t> ref_count;
}; };
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>; using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
class WrappedPostgreSQLConnection
{
public:
WrappedPostgreSQLConnection(PostgreSQLConnectionPtr connection_) : connection(connection_) { connection->incrementRef(); }
WrappedPostgreSQLConnection(const WrappedPostgreSQLConnection & other) : connection(other.connection) {}
~WrappedPostgreSQLConnection() { connection->decrementRef(); }
pqxx::connection & operator*() const { return *connection->get(); }
pqxx::connection * operator->() const { return connection->get().get(); }
bool isConnected() { return connection->isConnected(); }
private:
PostgreSQLConnectionPtr connection;
};
} }

View File

@ -14,22 +14,18 @@ namespace DB
PostgreSQLConnectionPool::PostgreSQLConnectionPool( PostgreSQLConnectionPool::PostgreSQLConnectionPool(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password) std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
: pool(POSTGRESQL_POOL_DEFAULT_SIZE)
{ {
address = host + ':' + std::to_string(port); address = host + ':' + std::to_string(port);
connection_str = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password)); connection_str = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password));
initialize();
/// No connection is made, just fill pool with non-connected connection objects.
for (size_t i = 0; i < POSTGRESQL_POOL_DEFAULT_SIZE; ++i)
pool.push(std::make_shared<PostgreSQLConnection>(connection_str, address));
} }
PostgreSQLConnectionPool::PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other) PostgreSQLConnectionPool::PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other)
: connection_str(other.connection_str) : connection_str(other.connection_str)
, address(other.address) , address(other.address)
, pool(POSTGRESQL_POOL_DEFAULT_SIZE)
{ {
initialize();
} }
@ -46,48 +42,34 @@ std::string PostgreSQLConnectionPool::formatConnectionString(
} }
PostgreSQLConnection::ConnectionPtr PostgreSQLConnectionPool::get() void PostgreSQLConnectionPool::initialize()
{ {
PostgreSQLConnectionPtr connection = popConnection(); /// No connection is made, just fill pool with non-connected connection objects.
return connection->get(); for (size_t i = 0; i < POSTGRESQL_POOL_DEFAULT_SIZE; ++i)
pool.emplace_back(std::make_shared<PostgreSQLConnection>(connection_str, address));
} }
PostgreSQLConnection::ConnectionPtr PostgreSQLConnectionPool::tryGet()
WrappedPostgreSQLConnection PostgreSQLConnectionPool::get()
{ {
PostgreSQLConnectionPtr connection = popConnection(); std::lock_guard lock(mutex);
return connection->tryGet();
for (const auto & connection : pool)
{
if (connection->available())
return WrappedPostgreSQLConnection(connection);
}
auto connection = std::make_shared<PostgreSQLConnection>(connection_str, address);
return WrappedPostgreSQLConnection(connection);
} }
PostgreSQLConnectionPtr PostgreSQLConnectionPool::popConnection() bool PostgreSQLConnectionPool::isConnected()
{ {
PostgreSQLConnectionPtr connection; auto connection = get();
if (pool.tryPop(connection, POSTGRESQL_POOL_WAIT_POP_PUSH_MS)) return connection.isConnected();
return connection;
return std::make_shared<PostgreSQLConnection>(connection_str, address);
}
void PostgreSQLConnectionPool::put(PostgreSQLConnection::ConnectionPtr connection)
{
pushConnection(std::make_shared<PostgreSQLConnection>(connection, connection_str, address));
}
void PostgreSQLConnectionPool::pushConnection(PostgreSQLConnectionPtr connection)
{
pool.tryPush(connection, POSTGRESQL_POOL_WAIT_POP_PUSH_MS);
}
bool PostgreSQLConnectionPool::connected()
{
PostgreSQLConnectionPtr connection = popConnection();
bool result = connection->connected();
pushConnection(connection);
return result;
} }
} }

View File

@ -7,13 +7,15 @@
#if USE_LIBPQXX #if USE_LIBPQXX
#include <Common/ConcurrentBoundedQueue.h> #include <Common/ConcurrentBoundedQueue.h>
#include "PostgreSQLConnection.h" #include "PostgreSQLConnection.h"
#include <pqxx/pqxx> // Y_IGNORE
namespace DB namespace DB
{ {
class PostgreSQLReplicaConnection;
class PostgreSQLConnectionPool class PostgreSQLConnectionPool
{ {
friend class PostgreSQLReplicaConnection;
public: public:
@ -23,36 +25,25 @@ public:
PostgreSQLConnectionPool operator =(const PostgreSQLConnectionPool &) = delete; PostgreSQLConnectionPool operator =(const PostgreSQLConnectionPool &) = delete;
/// Will throw if unable to setup connection. WrappedPostgreSQLConnection get();
PostgreSQLConnection::ConnectionPtr get();
/// Will return nullptr if connection was not established. protected:
PostgreSQLConnection::ConnectionPtr tryGet(); bool isConnected();
void put(PostgreSQLConnection::ConnectionPtr connection);
bool connected();
std::string & conn_str() { return connection_str; }
private: private:
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16; static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
static constexpr inline auto POSTGRESQL_POOL_WAIT_POP_PUSH_MS = 100; static constexpr inline auto POSTGRESQL_POOL_WAIT_POP_PUSH_MS = 100;
using Pool = ConcurrentBoundedQueue<PostgreSQLConnectionPtr>; using Pool = std::vector<PostgreSQLConnectionPtr>;
static std::string formatConnectionString( static std::string formatConnectionString(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password); std::string dbname, std::string host, UInt16 port, std::string user, std::string password);
/// Try get connection from connection pool with timeout. void initialize();
/// If pool is empty after timeout, make a new connection.
PostgreSQLConnectionPtr popConnection();
/// Put connection object back into pool.
void pushConnection(PostgreSQLConnectionPtr connection);
std::string connection_str, address; std::string connection_str, address;
Pool pool; Pool pool;
std::mutex mutex;
}; };
using PostgreSQLConnectionPoolPtr = std::shared_ptr<PostgreSQLConnectionPool>; using PostgreSQLConnectionPoolPtr = std::shared_ptr<PostgreSQLConnectionPool>;

View File

@ -59,14 +59,14 @@ PostgreSQLReplicaConnection::PostgreSQLReplicaConnection(const PostgreSQLReplica
} }
PostgreSQLConnectionPoolPtr PostgreSQLReplicaConnection::get() WrappedPostgreSQLConnection PostgreSQLReplicaConnection::get()
{ {
for (size_t i = 0; i < num_retries; ++i) for (size_t i = 0; i < num_retries; ++i)
{ {
for (auto & replica : replicas) for (auto & replica : replicas)
{ {
if (replica.second->connected()) if (replica.second->isConnected())
return replica.second; return replica.second->get();
} }
} }

View File

@ -21,7 +21,7 @@ public:
PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other); PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other);
PostgreSQLConnectionPoolPtr get(); WrappedPostgreSQLConnection get();
private: private:

View File

@ -88,7 +88,7 @@ Pipe StoragePostgreSQL::read(
} }
return Pipe(std::make_shared<SourceFromInputStream>( return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<PostgreSQLBlockInputStream>(connection_pool, query, sample_block, max_block_size_))); std::make_shared<PostgreSQLBlockInputStream>(connection_pool->get(), query, sample_block, max_block_size_)));
} }
@ -97,11 +97,10 @@ class PostgreSQLBlockOutputStream : public IBlockOutputStream
public: public:
explicit PostgreSQLBlockOutputStream( explicit PostgreSQLBlockOutputStream(
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
PostgreSQLConnectionPoolPtr connection_pool_, WrappedPostgreSQLConnection connection_,
const std::string & remote_table_name_) const std::string & remote_table_name_)
: metadata_snapshot(metadata_snapshot_) : metadata_snapshot(metadata_snapshot_)
, connection_pool(connection_pool_) , connection(connection_)
, connection(connection_pool->get())
, remote_table_name(remote_table_name_) , remote_table_name(remote_table_name_)
{ {
} }
@ -167,9 +166,6 @@ public:
stream_inserter->complete(); stream_inserter->complete();
work->commit(); work->commit();
} }
if (connection->is_open())
connection_pool->put(connection);
} }
@ -280,8 +276,7 @@ public:
private: private:
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
PostgreSQLConnectionPoolPtr connection_pool; WrappedPostgreSQLConnection connection;
PostgreSQLConnection::ConnectionPtr connection;
std::string remote_table_name; std::string remote_table_name;
std::unique_ptr<pqxx::work> work; std::unique_ptr<pqxx::work> work;
@ -292,7 +287,7 @@ private:
BlockOutputStreamPtr StoragePostgreSQL::write( BlockOutputStreamPtr StoragePostgreSQL::write(
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /* context */) const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /* context */)
{ {
return std::make_shared<PostgreSQLBlockOutputStream>(metadata_snapshot, connection_pool, remote_table_name); return std::make_shared<PostgreSQLBlockOutputStream>(metadata_snapshot, connection_pool->get(), remote_table_name);
} }