Better version

This commit is contained in:
kssenii 2021-03-19 08:11:36 +00:00
parent f1ef87d966
commit 9057aad798
10 changed files with 40 additions and 43 deletions

View File

@ -28,7 +28,7 @@ namespace ErrorCodes
}
PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
WrappedPostgreSQLConnectionPtr connection_,
PostgreSQLConnectionHolderPtr connection_,
const std::string & query_str_,
const Block & sample_block,
const UInt64 max_block_size_)

View File

@ -19,7 +19,7 @@ class PostgreSQLBlockInputStream : public IBlockInputStream
{
public:
PostgreSQLBlockInputStream(
WrappedPostgreSQLConnectionPtr connection_,
PostgreSQLConnectionHolderPtr connection_,
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_);
@ -46,7 +46,7 @@ private:
const UInt64 max_block_size;
ExternalResultDescription description;
WrappedPostgreSQLConnectionPtr connection;
PostgreSQLConnectionHolderPtr connection;
std::unique_ptr<pqxx::read_transaction> tx;
std::unique_ptr<pqxx::stream_from> stream;

View File

@ -94,7 +94,7 @@ static DataTypePtr convertPostgreSQLDataType(std::string & type, bool is_nullabl
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
WrappedPostgreSQLConnectionPtr connection, const String & postgres_table_name, bool use_nulls)
PostgreSQLConnectionHolderPtr connection, const String & postgres_table_name, bool use_nulls)
{
auto columns = NamesAndTypesList();

View File

@ -12,7 +12,7 @@ namespace DB
{
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
WrappedPostgreSQLConnectionPtr connection, const String & postgres_table_name, bool use_nulls);
PostgreSQLConnectionHolderPtr connection, const String & postgres_table_name, bool use_nulls);
}

View File

@ -7,19 +7,15 @@
#if USE_LIBPQXX
#include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h>
#include <common/logger_useful.h>
#include <Common/ConcurrentBoundedQueue.h>
namespace DB
{
class WrappedPostgreSQLConnection;
class PostgreSQLConnection
{
friend class WrappedPostgreSQLConnection;
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
public:
@ -35,13 +31,6 @@ public:
bool isConnected() { return tryConnectIfNeeded(); }
int32_t isAvailable() { return !locked.load(); }
protected:
void lock() { locked.store(true); }
void unlock() { locked.store(false); }
private:
void connectIfNeeded();
@ -51,21 +40,27 @@ private:
ConnectionPtr connection;
std::string connection_str, address;
std::atomic<bool> locked{false};
};
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
class WrappedPostgreSQLConnection
class PostgreSQLConnectionHolder
{
using Pool = ConcurrentBoundedQueue<PostgreSQLConnectionPtr>;
using PoolPtr = std::shared_ptr<Pool>;
public:
WrappedPostgreSQLConnection(PostgreSQLConnectionPtr connection_) : connection(connection_) { connection->lock(); }
PostgreSQLConnectionHolder(PostgreSQLConnectionPtr connection_, PoolPtr pool_)
: connection(std::move(connection_))
, pool(std::move(pool_))
{
}
WrappedPostgreSQLConnection(const WrappedPostgreSQLConnection & other) = delete;
PostgreSQLConnectionHolder(const PostgreSQLConnectionHolder & other) = delete;
~WrappedPostgreSQLConnection() { connection->unlock(); }
~PostgreSQLConnectionHolder() { pool->tryPush(connection); }
pqxx::connection & conn() const { return *connection->get(); }
@ -73,9 +68,10 @@ public:
private:
PostgreSQLConnectionPtr connection;
PoolPtr pool;
};
using WrappedPostgreSQLConnectionPtr = std::shared_ptr<WrappedPostgreSQLConnection>;
using PostgreSQLConnectionHolderPtr = std::shared_ptr<PostgreSQLConnectionHolder>;
}

View File

@ -15,6 +15,7 @@ namespace DB
PostgreSQLConnectionPool::PostgreSQLConnectionPool(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
: pool(std::make_shared<Pool>(POSTGRESQL_POOL_DEFAULT_SIZE))
{
address = host + ':' + std::to_string(port);
connection_str = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password));
@ -23,7 +24,8 @@ PostgreSQLConnectionPool::PostgreSQLConnectionPool(
PostgreSQLConnectionPool::PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other)
: connection_str(other.connection_str)
: pool(std::make_shared<Pool>(POSTGRESQL_POOL_DEFAULT_SIZE))
, connection_str(other.connection_str)
, address(other.address)
{
initialize();
@ -47,22 +49,20 @@ void PostgreSQLConnectionPool::initialize()
{
/// No connection is made, just fill pool with non-connected connection objects.
for (size_t i = 0; i < POSTGRESQL_POOL_DEFAULT_SIZE; ++i)
pool.emplace_back(std::make_shared<PostgreSQLConnection>(connection_str, address));
pool->push(std::make_shared<PostgreSQLConnection>(connection_str, address));
}
WrappedPostgreSQLConnectionPtr PostgreSQLConnectionPool::get()
PostgreSQLConnectionHolderPtr PostgreSQLConnectionPool::get()
{
std::lock_guard lock(mutex);
for (const auto & connection : pool)
PostgreSQLConnectionPtr connection;
if (pool->tryPop(connection, POSTGRESQL_POOL_WAIT_MS))
{
if (connection->isAvailable())
return std::make_shared<WrappedPostgreSQLConnection>(connection);
return std::make_shared<PostgreSQLConnectionHolder>(connection, pool);
}
auto connection = std::make_shared<PostgreSQLConnection>(connection_str, address);
return std::make_shared<WrappedPostgreSQLConnection>(connection);
connection = std::make_shared<PostgreSQLConnection>(connection_str, address);
return std::make_shared<PostgreSQLConnectionHolder>(connection, pool);
}

View File

@ -5,16 +5,17 @@
#endif
#if USE_LIBPQXX
#include <Common/ConcurrentBoundedQueue.h>
#include "PostgreSQLConnection.h"
namespace DB
{
class PostgreSQLReplicaConnection;
class PostgreSQLConnectionPool
{
friend class PostgreSQLReplicaConnection;
public:
@ -25,25 +26,25 @@ public:
PostgreSQLConnectionPool operator =(const PostgreSQLConnectionPool &) = delete;
WrappedPostgreSQLConnectionPtr get();
PostgreSQLConnectionHolderPtr get();
protected:
bool isConnected();
private:
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
static constexpr inline auto POSTGRESQL_POOL_WAIT_POP_PUSH_MS = 100;
static constexpr inline auto POSTGRESQL_POOL_WAIT_MS = 10;
using Pool = std::vector<PostgreSQLConnectionPtr>;
using Pool = ConcurrentBoundedQueue<PostgreSQLConnectionPtr>;
using PoolPtr = std::shared_ptr<Pool>;
static std::string formatConnectionString(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password);
void initialize();
PoolPtr pool;
std::string connection_str, address;
Pool pool;
std::mutex mutex;
};
using PostgreSQLConnectionPoolPtr = std::shared_ptr<PostgreSQLConnectionPool>;

View File

@ -52,7 +52,7 @@ PostgreSQLReplicaConnection::PostgreSQLReplicaConnection(
}
WrappedPostgreSQLConnectionPtr PostgreSQLReplicaConnection::get()
PostgreSQLConnectionHolderPtr PostgreSQLReplicaConnection::get()
{
for (size_t i = 0; i < num_retries; ++i)
{

View File

@ -21,7 +21,7 @@ public:
PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other) = default;
WrappedPostgreSQLConnectionPtr get();
PostgreSQLConnectionHolderPtr get();
private:

View File

@ -97,7 +97,7 @@ class PostgreSQLBlockOutputStream : public IBlockOutputStream
public:
explicit PostgreSQLBlockOutputStream(
const StorageMetadataPtr & metadata_snapshot_,
WrappedPostgreSQLConnectionPtr connection_,
PostgreSQLConnectionHolderPtr connection_,
const std::string & remote_table_name_)
: metadata_snapshot(metadata_snapshot_)
, connection(std::move(connection_))
@ -276,7 +276,7 @@ public:
private:
StorageMetadataPtr metadata_snapshot;
WrappedPostgreSQLConnectionPtr connection;
PostgreSQLConnectionHolderPtr connection;
std::string remote_table_name;
std::unique_ptr<pqxx::work> work;