From 49d33774f9ee1e066d323c799045b9c266c6aee9 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 25 Aug 2023 17:13:03 +0200 Subject: [PATCH] Fix --- contrib/libpqxx | 2 +- src/Core/PostgreSQL/Connection.h | 2 ++ src/Core/PostgreSQL/ConnectionHolder.h | 18 ++++++++++++++++++ src/Processors/Sources/PostgreSQLSource.cpp | 15 +++++++++++++-- src/Processors/Sources/PostgreSQLSource.h | 3 +++ 5 files changed, 37 insertions(+), 3 deletions(-) diff --git a/contrib/libpqxx b/contrib/libpqxx index bdd6540fb95..791d68fd899 160000 --- a/contrib/libpqxx +++ b/contrib/libpqxx @@ -1 +1 @@ -Subproject commit bdd6540fb95ff56c813691ceb5da5a3266cf235d +Subproject commit 791d68fd89902835133c50435e380ec7a73271b7 diff --git a/src/Core/PostgreSQL/Connection.h b/src/Core/PostgreSQL/Connection.h index 96cc19babea..efc10b6ed20 100644 --- a/src/Core/PostgreSQL/Connection.h +++ b/src/Core/PostgreSQL/Connection.h @@ -47,6 +47,8 @@ public: void tryUpdateConnection(); + bool isConnected() const { return connection != nullptr && connection->is_open(); } + const ConnectionInfo & getConnectionInfo() { return connection_info; } String getInfoForLog() const { return connection_info.host_port; } diff --git a/src/Core/PostgreSQL/ConnectionHolder.h b/src/Core/PostgreSQL/ConnectionHolder.h index 16803c823ba..43998c494c9 100644 --- a/src/Core/PostgreSQL/ConnectionHolder.h +++ b/src/Core/PostgreSQL/ConnectionHolder.h @@ -28,10 +28,27 @@ public: ConnectionHolder(const ConnectionHolder & other) = delete; + void setBroken() { is_broken = true; } + ~ConnectionHolder() { if (auto_close) + { connection.reset(); + } + else if (is_broken) + { + try + { + if (connection->isConnected()) + connection->getRef().reset(); + } + catch (...) + { + DB::tryLogCurrentException(__PRETTY_FUNCTION__); + connection.reset(); + } + } pool->returnObject(std::move(connection)); } @@ -49,6 +66,7 @@ private: PoolPtr pool; ConnectionPtr connection; bool auto_close; + bool is_broken = false; }; using ConnectionHolderPtr = std::unique_ptr; diff --git a/src/Processors/Sources/PostgreSQLSource.cpp b/src/Processors/Sources/PostgreSQLSource.cpp index 115e24d5740..f57d0fe9cc1 100644 --- a/src/Processors/Sources/PostgreSQLSource.cpp +++ b/src/Processors/Sources/PostgreSQLSource.cpp @@ -59,7 +59,6 @@ PostgreSQLSource::PostgreSQLSource( init(sample_block); } - template void PostgreSQLSource::init(const Block & sample_block) { @@ -82,7 +81,8 @@ void PostgreSQLSource::onStart() { try { - tx = std::make_shared(connection_holder->get()); + auto & conn = connection_holder->get(); + tx = std::make_shared(conn); } catch (const pqxx::broken_connection &) { @@ -180,6 +180,17 @@ void PostgreSQLSource::onFinish() if (tx && auto_commit) tx->commit(); + + is_completed = true; +} + +template +PostgreSQLSource::~PostgreSQLSource() +{ + if (!is_completed) + { + connection_holder->setBroken(); + } } template diff --git a/src/Processors/Sources/PostgreSQLSource.h b/src/Processors/Sources/PostgreSQLSource.h index 312e9f5fb18..8a648ae8bb5 100644 --- a/src/Processors/Sources/PostgreSQLSource.h +++ b/src/Processors/Sources/PostgreSQLSource.h @@ -28,6 +28,8 @@ public: String getName() const override { return "PostgreSQL"; } + ~PostgreSQLSource() override; + protected: PostgreSQLSource( std::shared_ptr tx_, @@ -54,6 +56,7 @@ private: ExternalResultDescription description; bool started = false; + bool is_completed = false; postgres::ConnectionHolderPtr connection_holder;