Add retry for postgres query

This commit is contained in:
kssenii 2021-12-27 11:03:04 +03:00
parent a7cf7b4d6f
commit 1bd4936961
8 changed files with 62 additions and 33 deletions

View File

@ -12,10 +12,7 @@ Connection::Connection(const ConnectionInfo & connection_info_, bool replication
, log(&Poco::Logger::get("PostgreSQLReplicaConnection"))
{
if (replication)
{
connection_info = std::make_pair(
fmt::format("{} replication=database", connection_info.first), connection_info.second);
}
connection_info = {fmt::format("{} replication=database", connection_info.connection_string), connection_info.host_port};
}
void Connection::execWithRetry(const std::function<void(pqxx::nontransaction &)> & exec)
@ -61,11 +58,14 @@ void Connection::updateConnection()
{
if (connection)
connection->close();
/// Always throws if there is no connection.
connection = std::make_unique<pqxx::connection>(connection_info.first);
connection = std::make_unique<pqxx::connection>(connection_info.connection_string);
if (replication)
connection->set_variable("default_transaction_isolation", "'repeatable read'");
LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", connection_info.second);
LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", connection_info.host_port);
}
void Connection::connect()

View File

@ -8,19 +8,26 @@
#include <Core/Types.h>
#include <boost/noncopyable.hpp>
/* Methods to work with PostgreSQL connection object.
/** Methods to work with PostgreSQL connection object.
* Should only be used in case there has to be a single connection object, which
* is long-lived and there are no concurrent connection queries.
* Now only use case - for replication handler for replication from PostgreSQL.
* In all other integration engine use pool with failover.
**/
*/
namespace Poco { class Logger; }
namespace pqxx
{
using ConnectionPtr = std::unique_ptr<pqxx::connection>;
}
namespace postgres
{
using ConnectionInfo = std::pair<String, String>;
using ConnectionPtr = std::unique_ptr<pqxx::connection>;
struct ConnectionInfo
{
String connection_string;
String host_port; /// For logs.
};
class Connection : private boost::noncopyable
{
@ -33,14 +40,17 @@ public:
void connect();
void updateConnection();
void tryUpdateConnection();
const ConnectionInfo & getConnectionInfo() { return connection_info; }
private:
void updateConnection();
String getInfoForLog() const { return connection_info.host_port; }
ConnectionPtr connection;
private:
pqxx::ConnectionPtr connection;
ConnectionInfo connection_info;
bool replication;
@ -48,6 +58,9 @@ private:
Poco::Logger * log;
};
using ConnectionPtr = std::unique_ptr<Connection>;
}
#endif

View File

@ -7,12 +7,12 @@
#include <pqxx/pqxx>
#include <Core/Types.h>
#include <base/BorrowedObjectPool.h>
#include "Connection.h"
namespace postgres
{
using ConnectionPtr = std::unique_ptr<pqxx::connection>;
using Pool = BorrowedObjectPool<ConnectionPtr>;
using PoolPtr = std::shared_ptr<Pool>;
@ -28,8 +28,12 @@ public:
pqxx::connection & get()
{
assert(connection != nullptr);
return *connection;
return connection->getRef();
}
void update()
{
connection->updateConnection();
}
private:

View File

@ -32,9 +32,9 @@ PoolWithFailover::PoolWithFailover(
{
for (const auto & replica_configuration : configurations)
{
auto connection_string = formatConnectionString(replica_configuration.database,
replica_configuration.host, replica_configuration.port, replica_configuration.username, replica_configuration.password).first;
replicas_with_priority[priority].emplace_back(connection_string, pool_size, getConnectionForLog(replica_configuration.host, replica_configuration.port));
auto connection_info = formatConnectionString(replica_configuration.database,
replica_configuration.host, replica_configuration.port, replica_configuration.username, replica_configuration.password);
replicas_with_priority[priority].emplace_back(connection_info, pool_size);
}
}
}
@ -52,8 +52,8 @@ PoolWithFailover::PoolWithFailover(
for (const auto & [host, port] : configuration.addresses)
{
LOG_DEBUG(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port);
auto connection_string = formatConnectionString(configuration.database, host, port, configuration.username, configuration.password).first;
replicas_with_priority[0].emplace_back(connection_string, pool_size, getConnectionForLog(host, port));
auto connection_string = formatConnectionString(configuration.database, host, port, configuration.username, configuration.password);
replicas_with_priority[0].emplace_back(connection_string, pool_size);
}
}
@ -83,16 +83,18 @@ ConnectionHolderPtr PoolWithFailover::get()
try
{
/// Create a new connection or reopen an old connection if it became invalid.
if (!connection || !connection->is_open())
if (!connection)
{
connection = std::make_unique<pqxx::connection>(replica.connection_string);
LOG_DEBUG(log, "New connection to {}:{}", connection->hostname(), connection->port());
connection = std::make_unique<Connection>(replica.connection_info);
LOG_DEBUG(log, "New connection to {}", connection->getInfoForLog());
}
connection->connect();
}
catch (const pqxx::broken_connection & pqxx_error)
{
LOG_ERROR(log, "Connection error: {}", pqxx_error.what());
error_message << "Try " << try_idx + 1 << ". Connection to `" << replica.name_for_log << "` failed: " << pqxx_error.what() << "\n";
error_message << "Try " << try_idx + 1 << ". Connection to `" << replica.connection_info.host_port << "` failed: " << pqxx_error.what() << "\n";
replica.pool->returnObject(std::move(connection));
continue;

View File

@ -44,12 +44,11 @@ public:
private:
struct PoolHolder
{
String connection_string;
ConnectionInfo connection_info;
PoolPtr pool;
String name_for_log;
PoolHolder(const String & connection_string_, size_t pool_size, const String & name_for_log_)
: connection_string(connection_string_), pool(std::make_shared<Pool>(pool_size)), name_for_log(name_for_log_) {}
PoolHolder(const ConnectionInfo & connection_info_, size_t pool_size)
: connection_info(connection_info_), pool(std::make_shared<Pool>(pool_size)) {}
};
/// Highest priority is 0, the bigger the number in map, the less the priority

View File

@ -17,7 +17,7 @@ ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, S
<< " user=" << DB::quote << user
<< " password=" << DB::quote << password
<< " connect_timeout=10";
return std::make_pair(out.str(), host + ':' + DB::toString(port));
return {out.str(), host + ':' + DB::toString(port)};
}
String getConnectionForLog(const String & host, UInt16 port)

View File

@ -74,7 +74,17 @@ template<typename T>
void PostgreSQLSource<T>::onStart()
{
if (!tx)
{
try
{
tx = std::make_shared<T>(connection_holder->get());
}
catch (const pqxx::broken_connection &)
{
connection_holder->update();
tx = std::make_shared<T>(connection_holder->get());
}
}
stream = std::make_unique<pqxx::stream_from>(*tx, pqxx::from_query, std::string_view(query_str));
}

View File

@ -50,6 +50,7 @@ ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr c
if (!columns)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table structure not returned");
return ColumnsDescription{*columns};
}