From 1bd493696148ab5985fe7edf082e35063b96f1e3 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 27 Dec 2021 11:03:04 +0300 Subject: [PATCH] Add retry for postgres query --- src/Core/PostgreSQL/Connection.cpp | 12 +++---- src/Core/PostgreSQL/Connection.h | 31 +++++++++++++------ src/Core/PostgreSQL/ConnectionHolder.h | 10 ++++-- src/Core/PostgreSQL/PoolWithFailover.cpp | 20 ++++++------ src/Core/PostgreSQL/PoolWithFailover.h | 7 ++--- src/Core/PostgreSQL/Utils.cpp | 2 +- .../Transforms/PostgreSQLSource.cpp | 12 ++++++- .../TableFunctionPostgreSQL.cpp | 1 + 8 files changed, 62 insertions(+), 33 deletions(-) diff --git a/src/Core/PostgreSQL/Connection.cpp b/src/Core/PostgreSQL/Connection.cpp index 75786a51d92..f97a35a9e92 100644 --- a/src/Core/PostgreSQL/Connection.cpp +++ b/src/Core/PostgreSQL/Connection.cpp @@ -12,10 +12,7 @@ Connection::Connection(const ConnectionInfo & connection_info_, bool replication , log(&Poco::Logger::get("PostgreSQLReplicaConnection")) { if (replication) - { - connection_info = std::make_pair( - fmt::format("{} replication=database", connection_info.first), connection_info.second); - } + connection_info = {fmt::format("{} replication=database", connection_info.connection_string), connection_info.host_port}; } void Connection::execWithRetry(const std::function & exec) @@ -61,11 +58,14 @@ void Connection::updateConnection() { if (connection) connection->close(); + /// Always throws if there is no connection. - connection = std::make_unique(connection_info.first); + connection = std::make_unique(connection_info.connection_string); + if (replication) connection->set_variable("default_transaction_isolation", "'repeatable read'"); - LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", connection_info.second); + + LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", connection_info.host_port); } void Connection::connect() diff --git a/src/Core/PostgreSQL/Connection.h b/src/Core/PostgreSQL/Connection.h index d65c38643c1..8c5609dc66b 100644 --- a/src/Core/PostgreSQL/Connection.h +++ b/src/Core/PostgreSQL/Connection.h @@ -8,19 +8,26 @@ #include #include -/* Methods to work with PostgreSQL connection object. +/** Methods to work with PostgreSQL connection object. * Should only be used in case there has to be a single connection object, which * is long-lived and there are no concurrent connection queries. - * Now only use case - for replication handler for replication from PostgreSQL. - * In all other integration engine use pool with failover. - **/ + */ namespace Poco { class Logger; } +namespace pqxx +{ + using ConnectionPtr = std::unique_ptr; +} + namespace postgres { -using ConnectionInfo = std::pair; -using ConnectionPtr = std::unique_ptr; + +struct ConnectionInfo +{ + String connection_string; + String host_port; /// For logs. +}; class Connection : private boost::noncopyable { @@ -33,14 +40,17 @@ public: void connect(); + void updateConnection(); + void tryUpdateConnection(); const ConnectionInfo & getConnectionInfo() { return connection_info; } -private: - void updateConnection(); + String getInfoForLog() const { return connection_info.host_port; } - ConnectionPtr connection; +private: + + pqxx::ConnectionPtr connection; ConnectionInfo connection_info; bool replication; @@ -48,6 +58,9 @@ private: Poco::Logger * log; }; + +using ConnectionPtr = std::unique_ptr; + } #endif diff --git a/src/Core/PostgreSQL/ConnectionHolder.h b/src/Core/PostgreSQL/ConnectionHolder.h index d0d64935e91..38e321e222c 100644 --- a/src/Core/PostgreSQL/ConnectionHolder.h +++ b/src/Core/PostgreSQL/ConnectionHolder.h @@ -7,12 +7,12 @@ #include #include #include +#include "Connection.h" namespace postgres { -using ConnectionPtr = std::unique_ptr; using Pool = BorrowedObjectPool; using PoolPtr = std::shared_ptr; @@ -28,8 +28,12 @@ public: pqxx::connection & get() { - assert(connection != nullptr); - return *connection; + return connection->getRef(); + } + + void update() + { + connection->updateConnection(); } private: diff --git a/src/Core/PostgreSQL/PoolWithFailover.cpp b/src/Core/PostgreSQL/PoolWithFailover.cpp index 3addb511c3b..844c60087e0 100644 --- a/src/Core/PostgreSQL/PoolWithFailover.cpp +++ b/src/Core/PostgreSQL/PoolWithFailover.cpp @@ -32,9 +32,9 @@ PoolWithFailover::PoolWithFailover( { for (const auto & replica_configuration : configurations) { - auto connection_string = formatConnectionString(replica_configuration.database, - replica_configuration.host, replica_configuration.port, replica_configuration.username, replica_configuration.password).first; - replicas_with_priority[priority].emplace_back(connection_string, pool_size, getConnectionForLog(replica_configuration.host, replica_configuration.port)); + auto connection_info = formatConnectionString(replica_configuration.database, + replica_configuration.host, replica_configuration.port, replica_configuration.username, replica_configuration.password); + replicas_with_priority[priority].emplace_back(connection_info, pool_size); } } } @@ -52,8 +52,8 @@ PoolWithFailover::PoolWithFailover( for (const auto & [host, port] : configuration.addresses) { LOG_DEBUG(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port); - auto connection_string = formatConnectionString(configuration.database, host, port, configuration.username, configuration.password).first; - replicas_with_priority[0].emplace_back(connection_string, pool_size, getConnectionForLog(host, port)); + auto connection_string = formatConnectionString(configuration.database, host, port, configuration.username, configuration.password); + replicas_with_priority[0].emplace_back(connection_string, pool_size); } } @@ -83,16 +83,18 @@ ConnectionHolderPtr PoolWithFailover::get() try { /// Create a new connection or reopen an old connection if it became invalid. - if (!connection || !connection->is_open()) + if (!connection) { - connection = std::make_unique(replica.connection_string); - LOG_DEBUG(log, "New connection to {}:{}", connection->hostname(), connection->port()); + connection = std::make_unique(replica.connection_info); + LOG_DEBUG(log, "New connection to {}", connection->getInfoForLog()); } + + connection->connect(); } catch (const pqxx::broken_connection & pqxx_error) { LOG_ERROR(log, "Connection error: {}", pqxx_error.what()); - error_message << "Try " << try_idx + 1 << ". Connection to `" << replica.name_for_log << "` failed: " << pqxx_error.what() << "\n"; + error_message << "Try " << try_idx + 1 << ". Connection to `" << replica.connection_info.host_port << "` failed: " << pqxx_error.what() << "\n"; replica.pool->returnObject(std::move(connection)); continue; diff --git a/src/Core/PostgreSQL/PoolWithFailover.h b/src/Core/PostgreSQL/PoolWithFailover.h index c59010a5d43..e6f691ed2dd 100644 --- a/src/Core/PostgreSQL/PoolWithFailover.h +++ b/src/Core/PostgreSQL/PoolWithFailover.h @@ -44,12 +44,11 @@ public: private: struct PoolHolder { - String connection_string; + ConnectionInfo connection_info; PoolPtr pool; - String name_for_log; - PoolHolder(const String & connection_string_, size_t pool_size, const String & name_for_log_) - : connection_string(connection_string_), pool(std::make_shared(pool_size)), name_for_log(name_for_log_) {} + PoolHolder(const ConnectionInfo & connection_info_, size_t pool_size) + : connection_info(connection_info_), pool(std::make_shared(pool_size)) {} }; /// Highest priority is 0, the bigger the number in map, the less the priority diff --git a/src/Core/PostgreSQL/Utils.cpp b/src/Core/PostgreSQL/Utils.cpp index 60b13218202..b4ad19c819a 100644 --- a/src/Core/PostgreSQL/Utils.cpp +++ b/src/Core/PostgreSQL/Utils.cpp @@ -17,7 +17,7 @@ ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, S << " user=" << DB::quote << user << " password=" << DB::quote << password << " connect_timeout=10"; - return std::make_pair(out.str(), host + ':' + DB::toString(port)); + return {out.str(), host + ':' + DB::toString(port)}; } String getConnectionForLog(const String & host, UInt16 port) diff --git a/src/Processors/Transforms/PostgreSQLSource.cpp b/src/Processors/Transforms/PostgreSQLSource.cpp index ac8408d8338..88f092a2533 100644 --- a/src/Processors/Transforms/PostgreSQLSource.cpp +++ b/src/Processors/Transforms/PostgreSQLSource.cpp @@ -74,7 +74,17 @@ template void PostgreSQLSource::onStart() { if (!tx) - tx = std::make_shared(connection_holder->get()); + { + try + { + tx = std::make_shared(connection_holder->get()); + } + catch (const pqxx::broken_connection &) + { + connection_holder->update(); + tx = std::make_shared(connection_holder->get()); + } + } stream = std::make_unique(*tx, pqxx::from_query, std::string_view(query_str)); } diff --git a/src/TableFunctions/TableFunctionPostgreSQL.cpp b/src/TableFunctions/TableFunctionPostgreSQL.cpp index bcfe8d5444c..d948f40588f 100644 --- a/src/TableFunctions/TableFunctionPostgreSQL.cpp +++ b/src/TableFunctions/TableFunctionPostgreSQL.cpp @@ -50,6 +50,7 @@ ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr c if (!columns) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table structure not returned"); + return ColumnsDescription{*columns}; }