Add namespace, simplify names

This commit is contained in:
kssenii 2021-03-27 20:14:02 +00:00
parent f40c582e7a
commit 22b515fbc9
21 changed files with 106 additions and 103 deletions

1
contrib/amqp vendored Submodule

@ -0,0 +1 @@
Subproject commit 03781aaff0f10ef41f902b8cf865fe0067180c10

View File

@ -28,7 +28,7 @@ namespace ErrorCodes
}
PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
PostgreSQLConnectionHolderPtr connection_,
postgres::ConnectionHolderPtr connection_,
const std::string & query_str_,
const Block & sample_block,
const UInt64 max_block_size_)

View File

@ -19,7 +19,7 @@ class PostgreSQLBlockInputStream : public IBlockInputStream
{
public:
PostgreSQLBlockInputStream(
PostgreSQLConnectionHolderPtr connection_,
postgres::ConnectionHolderPtr connection_,
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_);
@ -46,7 +46,7 @@ private:
const UInt64 max_block_size;
ExternalResultDescription description;
PostgreSQLConnectionHolderPtr connection;
postgres::ConnectionHolderPtr connection;
std::unique_ptr<pqxx::read_transaction> tx;
std::unique_ptr<pqxx::stream_from> stream;

View File

@ -248,7 +248,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
auto parsed_host_port = parseAddress(host_port, 5432);
/// no connection is made here
auto connection_pool = std::make_shared<PostgreSQLPoolWithFailover>(
auto connection_pool = std::make_shared<postgres::PoolWithFailover>(
postgres_database_name,
parsed_host_port.first, parsed_host_port.second,
username, password,

View File

@ -40,7 +40,7 @@ DatabasePostgreSQL::DatabasePostgreSQL(
const ASTStorage * database_engine_define_,
const String & dbname_,
const String & postgres_dbname,
PostgreSQLPoolWithFailoverPtr connection_pool_,
postgres::PoolWithFailoverPtr connection_pool_,
const bool cache_tables_)
: IDatabase(dbname_)
, global_context(context.getGlobalContext())

View File

@ -33,7 +33,7 @@ public:
const ASTStorage * database_engine_define,
const String & dbname_,
const String & postgres_dbname,
PostgreSQLPoolWithFailoverPtr connection_pool_,
postgres::PoolWithFailoverPtr connection_pool_,
const bool cache_tables_);
String getEngineName() const override { return "PostgreSQL"; }
@ -71,7 +71,7 @@ private:
String metadata_path;
ASTPtr database_engine_define;
String dbname;
PostgreSQLPoolWithFailoverPtr connection_pool;
postgres::PoolWithFailoverPtr connection_pool;
const bool cache_tables;
mutable Tables cached_tables;

View File

@ -94,7 +94,7 @@ static DataTypePtr convertPostgreSQLDataType(std::string & type, bool is_nullabl
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
PostgreSQLConnectionHolderPtr connection, const String & postgres_table_name, bool use_nulls)
postgres::ConnectionHolderPtr connection, const String & postgres_table_name, bool use_nulls)
{
auto columns = NamesAndTypesList();

View File

@ -12,7 +12,7 @@ namespace DB
{
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
PostgreSQLConnectionHolderPtr connection, const String & postgres_table_name, bool use_nulls);
postgres::ConnectionHolderPtr connection, const String & postgres_table_name, bool use_nulls);
}

View File

@ -31,7 +31,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(
const Block & sample_block_)
: dict_struct{dict_struct_}
, sample_block(sample_block_)
, connection(std::make_shared<PostgreSQLPoolWithFailover>(config_, config_prefix))
, connection(std::make_shared<postgres::PoolWithFailover>(config_, config_prefix))
, log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, db(config_.getString(fmt::format("{}.db", config_prefix), ""))
, table(config_.getString(fmt::format("{}.table", config_prefix), ""))

View File

@ -51,7 +51,7 @@ private:
const DictionaryStructure dict_struct;
Block sample_block;
PostgreSQLPoolWithFailoverPtr connection;
postgres::PoolWithFailoverPtr connection;
Poco::Logger * log;
const std::string db;

View File

@ -8,10 +8,10 @@
#include <IO/Operators.h>
namespace DB
namespace postgres
{
PostgreSQLConnection::PostgreSQLConnection(
Connection::Connection(
const String & connection_str_,
const String & address_)
: connection_str(connection_str_)
@ -20,14 +20,14 @@ PostgreSQLConnection::PostgreSQLConnection(
}
PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::get()
pqxx::ConnectionPtr Connection::get()
{
connectIfNeeded();
return connection;
}
PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::tryGet()
pqxx::ConnectionPtr Connection::tryGet()
{
if (tryConnectIfNeeded())
return connection;
@ -35,17 +35,17 @@ PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::tryGet()
}
void PostgreSQLConnection::connectIfNeeded()
void Connection::connectIfNeeded()
{
if (!connection || !connection->is_open())
{
LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", getAddress());
connection = std::make_shared<pqxx::connection>(connection_str);
connection = std::make_unique<pqxx::connection>(connection_str);
}
}
bool PostgreSQLConnection::tryConnectIfNeeded()
bool Connection::tryConnectIfNeeded()
{
try
{

View File

@ -10,24 +10,27 @@
#include <Common/ConcurrentBoundedQueue.h>
namespace DB
namespace pqxx
{
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
}
namespace postgres
{
class PostgreSQLConnection
class Connection
{
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
public:
PostgreSQLConnection(
Connection(
const String & connection_str_,
const String & address_);
PostgreSQLConnection(const PostgreSQLConnection & other) = delete;
Connection(const Connection & other) = delete;
ConnectionPtr get();
pqxx::ConnectionPtr get();
ConnectionPtr tryGet();
pqxx::ConnectionPtr tryGet();
bool isConnected() { return tryConnectIfNeeded(); }
@ -38,40 +41,40 @@ private:
const std::string & getAddress() { return address; }
ConnectionPtr connection;
pqxx::ConnectionPtr connection;
std::string connection_str, address;
};
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
using ConnectionPtr = std::shared_ptr<Connection>;
class PostgreSQLConnectionHolder
class ConnectionHolder
{
using Pool = ConcurrentBoundedQueue<PostgreSQLConnectionPtr>;
using Pool = ConcurrentBoundedQueue<ConnectionPtr>;
static constexpr inline auto POSTGRESQL_POOL_WAIT_MS = 50;
public:
PostgreSQLConnectionHolder(PostgreSQLConnectionPtr connection_, Pool & pool_)
ConnectionHolder(ConnectionPtr connection_, Pool & pool_)
: connection(std::move(connection_))
, pool(pool_)
{
}
PostgreSQLConnectionHolder(const PostgreSQLConnectionHolder & other) = delete;
ConnectionHolder(const ConnectionHolder & other) = delete;
~PostgreSQLConnectionHolder() { pool.tryPush(connection, POSTGRESQL_POOL_WAIT_MS); }
~ConnectionHolder() { pool.tryPush(connection, POSTGRESQL_POOL_WAIT_MS); }
pqxx::connection & conn() const { return *connection->get(); }
bool isConnected() { return connection->isConnected(); }
private:
PostgreSQLConnectionPtr connection;
ConnectionPtr connection;
Pool & pool;
};
using PostgreSQLConnectionHolderPtr = std::shared_ptr<PostgreSQLConnectionHolder>;
using ConnectionHolderPtr = std::shared_ptr<ConnectionHolder>;
}

View File

@ -10,10 +10,10 @@
#include <common/logger_useful.h>
namespace DB
namespace postgres
{
PostgreSQLConnectionPool::PostgreSQLConnectionPool(
ConnectionPool::ConnectionPool(
std::string dbname,
std::string host,
UInt16 port,
@ -37,7 +37,7 @@ PostgreSQLConnectionPool::PostgreSQLConnectionPool(
}
PostgreSQLConnectionPool::PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other)
ConnectionPool::ConnectionPool(const ConnectionPool & other)
: pool(std::make_shared<Pool>(other.pool_size))
, connection_str(other.connection_str)
, address(other.address)
@ -49,46 +49,46 @@ PostgreSQLConnectionPool::PostgreSQLConnectionPool(const PostgreSQLConnectionPoo
}
void PostgreSQLConnectionPool::initialize()
void ConnectionPool::initialize()
{
/// No connection is made, just fill pool with non-connected connection objects.
for (size_t i = 0; i < pool_size; ++i)
pool->push(std::make_shared<PostgreSQLConnection>(connection_str, address));
pool->push(std::make_shared<Connection>(connection_str, address));
}
std::string PostgreSQLConnectionPool::formatConnectionString(
std::string ConnectionPool::formatConnectionString(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
{
WriteBufferFromOwnString out;
out << "dbname=" << quote << dbname
<< " host=" << quote << host
DB::WriteBufferFromOwnString out;
out << "dbname=" << DB::quote << dbname
<< " host=" << DB::quote << host
<< " port=" << port
<< " user=" << quote << user
<< " password=" << quote << password;
<< " user=" << DB::quote << user
<< " password=" << DB::quote << password;
return out.str();
}
PostgreSQLConnectionHolderPtr PostgreSQLConnectionPool::get()
ConnectionHolderPtr ConnectionPool::get()
{
PostgreSQLConnectionPtr connection;
ConnectionPtr connection;
/// Always blocks by default.
if (block_on_empty_pool)
{
/// pop to ConcurrentBoundedQueue will block until it is non-empty.
pool->pop(connection);
return std::make_shared<PostgreSQLConnectionHolder>(connection, *pool);
return std::make_shared<ConnectionHolder>(connection, *pool);
}
if (pool->tryPop(connection, pool_wait_timeout))
{
return std::make_shared<PostgreSQLConnectionHolder>(connection, *pool);
return std::make_shared<ConnectionHolder>(connection, *pool);
}
connection = std::make_shared<PostgreSQLConnection>(connection_str, address);
return std::make_shared<PostgreSQLConnectionHolder>(connection, *pool);
connection = std::make_shared<Connection>(connection_str, address);
return std::make_shared<ConnectionHolder>(connection, *pool);
}
}

View File

@ -8,42 +8,41 @@
#include "PostgreSQLConnection.h"
namespace DB
namespace postgres
{
class PostgreSQLPoolWithFailover;
class PoolWithFailover;
/// Connection pool size is defined by user with setting `postgresql_connection_pool_size` (default 16).
/// If pool is empty, it will block until there are available connections.
/// If setting `connection_pool_wait_timeout` is defined, it will not block on empty pool and will
/// wait until the timeout and then create a new connection. (only for storage/db engine)
class PostgreSQLConnectionPool
class ConnectionPool
{
friend class PostgreSQLPoolWithFailover;
friend class PoolWithFailover;
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
public:
PostgreSQLConnectionPool(
std::string dbname,
std::string host,
UInt16 port,
std::string user,
std::string password,
size_t pool_size_ = POSTGRESQL_POOL_DEFAULT_SIZE,
int64_t pool_wait_timeout_ = -1);
ConnectionPool(
std::string dbname,
std::string host,
UInt16 port,
std::string user,
std::string password,
size_t pool_size_ = POSTGRESQL_POOL_DEFAULT_SIZE,
int64_t pool_wait_timeout_ = -1);
PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other);
ConnectionPool(const ConnectionPool & other);
PostgreSQLConnectionPool operator =(const PostgreSQLConnectionPool &) = delete;
ConnectionPool operator =(const ConnectionPool &) = delete;
PostgreSQLConnectionHolderPtr get();
ConnectionHolderPtr get();
private:
using Pool = ConcurrentBoundedQueue<PostgreSQLConnectionPtr>;
using Pool = ConcurrentBoundedQueue<ConnectionPtr>;
using PoolPtr = std::shared_ptr<Pool>;
static std::string formatConnectionString(
@ -58,7 +57,7 @@ private:
bool block_on_empty_pool;
};
using PostgreSQLConnectionPoolPtr = std::shared_ptr<PostgreSQLConnectionPool>;
using ConnectionPoolPtr = std::shared_ptr<ConnectionPool>;
}

View File

@ -7,16 +7,18 @@
namespace DB
{
namespace ErrorCodes
{
extern const int POSTGRESQL_CONNECTION_FAILURE;
}
}
namespace postgres
{
PostgreSQLPoolWithFailover::PostgreSQLPoolWithFailover(
PoolWithFailover::PoolWithFailover(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const std::string & config_prefix,
const size_t max_tries_)
: max_tries(max_tries_)
{
@ -43,18 +45,18 @@ PostgreSQLPoolWithFailover::PostgreSQLPoolWithFailover(
auto replica_user = config.getString(replica_name + ".user", user);
auto replica_password = config.getString(replica_name + ".password", password);
replicas_with_priority[priority].emplace_back(std::make_shared<PostgreSQLConnectionPool>(db, replica_host, replica_port, replica_user, replica_password));
replicas_with_priority[priority].emplace_back(std::make_shared<ConnectionPool>(db, replica_host, replica_port, replica_user, replica_password));
}
}
}
else
{
replicas_with_priority[0].emplace_back(std::make_shared<PostgreSQLConnectionPool>(db, host, port, user, password));
replicas_with_priority[0].emplace_back(std::make_shared<ConnectionPool>(db, host, port, user, password));
}
}
PostgreSQLPoolWithFailover::PostgreSQLPoolWithFailover(
PoolWithFailover::PoolWithFailover(
const std::string & database,
const std::string & hosts_pattern,
const uint16_t port,
@ -64,24 +66,24 @@ PostgreSQLPoolWithFailover::PostgreSQLPoolWithFailover(
const size_t max_addresses)
: max_tries(max_tries_)
{
auto hosts = parseRemoteDescription(hosts_pattern, 0, hosts_pattern.size(), '|', max_addresses);
auto hosts = DB::parseRemoteDescription(hosts_pattern, 0, hosts_pattern.size(), '|', max_addresses);
for (const auto & host : hosts)
{
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue after each fetch.
replicas_with_priority[0].emplace_back(std::make_shared<PostgreSQLConnectionPool>(database, host, port, user, password));
replicas_with_priority[0].emplace_back(std::make_shared<ConnectionPool>(database, host, port, user, password));
LOG_TRACE(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address {}:{} to pool", host, port);
}
}
PostgreSQLPoolWithFailover::PostgreSQLPoolWithFailover(const PostgreSQLPoolWithFailover & other)
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
: replicas_with_priority(other.replicas_with_priority)
, max_tries(other.max_tries)
{
}
PostgreSQLConnectionHolderPtr PostgreSQLPoolWithFailover::get()
ConnectionHolderPtr PoolWithFailover::get()
{
std::lock_guard lock(mutex);
@ -103,7 +105,7 @@ PostgreSQLConnectionHolderPtr PostgreSQLPoolWithFailover::get()
}
}
throw Exception(ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas");
throw DB::Exception(DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas");
}
}

View File

@ -5,22 +5,22 @@
#include "PostgreSQLConnectionPool.h"
namespace DB
namespace postgres
{
class PostgreSQLPoolWithFailover
class PoolWithFailover
{
public:
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES = 5;
PostgreSQLPoolWithFailover(
PoolWithFailover(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const std::string & config_prefix,
const size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PostgreSQLPoolWithFailover(
PoolWithFailover(
const std::string & database,
const std::string & host_pattern,
uint16_t port,
@ -29,14 +29,14 @@ public:
const size_t max_tries = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
const size_t max_addresses = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES);
PostgreSQLPoolWithFailover(const PostgreSQLPoolWithFailover & other);
PoolWithFailover(const PoolWithFailover & other);
PostgreSQLConnectionHolderPtr get();
ConnectionHolderPtr get();
private:
/// Highest priority is 0, the bigger the number in map, the less the priority
using Replicas = std::vector<PostgreSQLConnectionPoolPtr>;
using Replicas = std::vector<ConnectionPoolPtr>;
using ReplicasWithPriority = std::map<size_t, Replicas>;
ReplicasWithPriority replicas_with_priority;
@ -44,6 +44,6 @@ private:
std::mutex mutex;
};
using PostgreSQLPoolWithFailoverPtr = std::shared_ptr<PostgreSQLPoolWithFailover>;
using PoolWithFailoverPtr = std::shared_ptr<PoolWithFailover>;
}

View File

@ -88,7 +88,7 @@ StorageExternalDistributed::StorageExternalDistributed(
}
else if (engine_name == "PostgreSQL")
{
PostgreSQLPoolWithFailover pool(
postgres::PoolWithFailover pool(
remote_database,
parsed_shard_description.first,
parsed_shard_description.second,

View File

@ -41,7 +41,7 @@ namespace ErrorCodes
StoragePostgreSQL::StoragePostgreSQL(
const StorageID & table_id_,
const PostgreSQLPoolWithFailover & pool_,
const postgres::PoolWithFailover & pool_,
const String & remote_table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
@ -51,7 +51,7 @@ StoragePostgreSQL::StoragePostgreSQL(
, remote_table_name(remote_table_name_)
, remote_table_schema(remote_table_schema_)
, global_context(context_)
, pool(std::make_shared<PostgreSQLPoolWithFailover>(pool_))
, pool(std::make_shared<postgres::PoolWithFailover>(pool_))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -97,7 +97,7 @@ class PostgreSQLBlockOutputStream : public IBlockOutputStream
public:
explicit PostgreSQLBlockOutputStream(
const StorageMetadataPtr & metadata_snapshot_,
PostgreSQLConnectionHolderPtr connection_,
postgres::ConnectionHolderPtr connection_,
const std::string & remote_table_name_)
: metadata_snapshot(metadata_snapshot_)
, connection(std::move(connection_))
@ -276,7 +276,7 @@ public:
private:
StorageMetadataPtr metadata_snapshot;
PostgreSQLConnectionHolderPtr connection;
postgres::ConnectionHolderPtr connection;
std::string remote_table_name;
std::unique_ptr<pqxx::work> work;
@ -316,7 +316,7 @@ void registerStoragePostgreSQL(StorageFactory & factory)
if (engine_args.size() == 6)
remote_table_schema = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>();
PostgreSQLPoolWithFailover pool(
postgres::PoolWithFailover pool(
remote_database,
parsed_host_port.first,
parsed_host_port.second,

View File

@ -23,7 +23,7 @@ class StoragePostgreSQL final : public ext::shared_ptr_helper<StoragePostgreSQL>
public:
StoragePostgreSQL(
const StorageID & table_id_,
const PostgreSQLPoolWithFailover & pool_,
const postgres::PoolWithFailover & pool_,
const String & remote_table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
@ -49,7 +49,7 @@ private:
String remote_table_name;
String remote_table_schema;
Context global_context;
PostgreSQLPoolWithFailoverPtr pool;
postgres::PoolWithFailoverPtr pool;
};
}

View File

@ -73,7 +73,7 @@ void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, const
if (args.size() == 6)
remote_table_schema = args[5]->as<ASTLiteral &>().value.safeGet<String>();
connection_pool = std::make_shared<PostgreSQLPoolWithFailover>(
connection_pool = std::make_shared<postgres::PoolWithFailover>(
args[1]->as<ASTLiteral &>().value.safeGet<String>(),
parsed_host_port.first,
parsed_host_port.second,

View File

@ -5,14 +5,12 @@
#if USE_LIBPQXX
#include <TableFunctions/ITableFunction.h>
#include <Storages/PostgreSQL/PostgreSQLPoolWithFailover.h>
namespace DB
{
class PostgreSQLPoolWithFailover;
using PostgreSQLPoolWithFailoverPtr = std::shared_ptr<PostgreSQLPoolWithFailover>;
class TableFunctionPostgreSQL : public ITableFunction
{
public:
@ -31,7 +29,7 @@ private:
String connection_str;
String remote_table_name, remote_table_schema;
PostgreSQLPoolWithFailoverPtr connection_pool;
postgres::PoolWithFailoverPtr connection_pool;
};
}