Review fixes

This commit is contained in:
kssenii 2021-05-06 12:48:48 +00:00
parent b96f047db7
commit 660442a16c
4 changed files with 46 additions and 61 deletions

View File

@ -1,25 +1,21 @@
#include <Storages/PostgreSQL/ConnectionHolder.h>
#include <common/logger_useful.h>
#include <IO/Operators.h>
namespace postgres
{
String formatConnectionString(String dbname, String host, UInt16 port, String user, String password)
ConnectionHolder::ConnectionHolder(const String & connection_string_, PoolPtr pool_, size_t pool_wait_timeout)
: connection_string(connection_string_), pool(pool_)
{
DB::WriteBufferFromOwnString out;
out << "dbname=" << DB::quote << dbname
<< " host=" << DB::quote << host
<< " port=" << port
<< " user=" << DB::quote << user
<< " password=" << DB::quote << password;
return out.str();
pool->tryBorrowObject(connection, [&]()
{
auto new_connection = std::make_unique<pqxx::connection>(connection_string);
LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"),
"New connection to {}:{}", new_connection->hostname(), new_connection->port());
return new_connection;
}, pool_wait_timeout);
}
ConnectionHolder::ConnectionHolder(const String & connection_string_, PoolPtr pool_, size_t pool_wait_timeout_)
: connection_string(connection_string_), pool(pool_), pool_wait_timeout(pool_wait_timeout_) {}
ConnectionHolder::~ConnectionHolder()
{
if (connection)
@ -28,43 +24,10 @@ ConnectionHolder::~ConnectionHolder()
pqxx::connection & ConnectionHolder::get()
{
if (!connection)
{
pool->tryBorrowObject(connection, [&]()
{
auto new_connection = std::make_unique<pqxx::connection>(connection_string);
LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"),
"New connection to {}:{}", new_connection->hostname(), new_connection->port());
return new_connection;
}, pool_wait_timeout);
}
if (!connection->is_open())
{
connection = std::make_unique<pqxx::connection>(connection_string);
}
return *connection;
}
bool ConnectionHolder::isConnected()
{
try
{
get();
}
catch (const pqxx::broken_connection & pqxx_error)
{
LOG_ERROR(
&Poco::Logger::get("PostgreSQLConnection"),
"Connection error: {}", pqxx_error.what());
return false;
}
catch (...)
{
throw;
}
return true;
}
}

View File

@ -8,8 +8,6 @@
namespace postgres
{
String formatConnectionString(String dbname, String host, UInt16 port, String user, String password);
using ConnectionPtr = std::unique_ptr<pqxx::connection>;
using Pool = BorrowedObjectPool<ConnectionPtr>;
using PoolPtr = std::shared_ptr<Pool>;
@ -24,16 +22,14 @@ public:
~ConnectionHolder();
/// Will throw if error is not pqxx::broken_connection.
bool isConnected();
/// Throw on no connection.
bool isValid() { return connection && connection->is_open(); }
pqxx::connection & get();
private:
String connection_string;
PoolPtr pool;
ConnectionPtr connection;
size_t pool_wait_timeout;
};
using ConnectionHolderPtr = std::unique_ptr<ConnectionHolder>;

View File

@ -2,7 +2,7 @@
#include <Common/parseRemoteDescription.h>
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <IO/Operators.h>
namespace DB
{
@ -15,6 +15,17 @@ namespace ErrorCodes
namespace postgres
{
String formatConnectionString(String dbname, String host, UInt16 port, String user, String password)
{
DB::WriteBufferFromOwnString out;
out << "dbname=" << DB::quote << dbname
<< " host=" << DB::quote << host
<< " port=" << port
<< " user=" << DB::quote << user
<< " password=" << DB::quote << password;
return out.str();
}
PoolWithFailover::PoolWithFailover(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix,
size_t pool_size, size_t pool_wait_timeout, size_t max_tries_)
@ -58,7 +69,6 @@ PoolWithFailover::PoolWithFailover(
}
}
PoolWithFailover::PoolWithFailover(
const std::string & database,
const RemoteDescription & addresses,
@ -78,14 +88,12 @@ PoolWithFailover::PoolWithFailover(
}
}
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
: replicas_with_priority(other.replicas_with_priority)
, max_tries(other.max_tries)
{
}
ConnectionHolderPtr PoolWithFailover::get()
{
std::lock_guard lock(mutex);
@ -98,19 +106,34 @@ ConnectionHolderPtr PoolWithFailover::get()
for (size_t i = 0; i < replicas.size(); ++i)
{
const auto & pool_entry = replicas[i];
auto entry = std::make_unique<ConnectionHolder>(pool_entry.connection_string, pool_entry.pool, pool_entry.pool_wait_timeout);
if (entry->isConnected())
try
{
auto entry = std::make_unique<ConnectionHolder>(pool_entry.connection_string, pool_entry.pool, pool_entry.pool_wait_timeout);
if (!entry->isValid())
continue;
/// Move all traversed replicas to the end.
std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end());
if (replicas.size() > 1)
std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end());
return entry;
}
catch (const pqxx::broken_connection & pqxx_error)
{
LOG_ERROR(
&Poco::Logger::get("PostgreSQLConnection"),
"Connection error: {}", pqxx_error.what());
}
catch (...)
{
throw;
}
}
}
}
throw DB::Exception(DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas");
}
}

View File

@ -9,6 +9,8 @@
namespace postgres
{
String formatConnectionString(String dbname, String host, UInt16 port, String user, String password);
class PoolWithFailover
{
@ -39,8 +41,9 @@ public:
ConnectionHolderPtr get();
private:
bool getAndValidateConnection();
struct PoolHolder
{
String connection_string;