This commit is contained in:
kssenii 2023-08-25 17:13:03 +02:00
parent b884fdb867
commit 49d33774f9
5 changed files with 37 additions and 3 deletions

2
contrib/libpqxx vendored

@ -1 +1 @@
Subproject commit bdd6540fb95ff56c813691ceb5da5a3266cf235d
Subproject commit 791d68fd89902835133c50435e380ec7a73271b7

View File

@ -47,6 +47,8 @@ public:
void tryUpdateConnection();
bool isConnected() const { return connection != nullptr && connection->is_open(); }
const ConnectionInfo & getConnectionInfo() { return connection_info; }
String getInfoForLog() const { return connection_info.host_port; }

View File

@ -28,10 +28,27 @@ public:
ConnectionHolder(const ConnectionHolder & other) = delete;
void setBroken() { is_broken = true; }
~ConnectionHolder()
{
if (auto_close)
{
connection.reset();
}
else if (is_broken)
{
try
{
if (connection->isConnected())
connection->getRef().reset();
}
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
connection.reset();
}
}
pool->returnObject(std::move(connection));
}
@ -49,6 +66,7 @@ private:
PoolPtr pool;
ConnectionPtr connection;
bool auto_close;
bool is_broken = false;
};
using ConnectionHolderPtr = std::unique_ptr<ConnectionHolder>;

View File

@ -59,7 +59,6 @@ PostgreSQLSource<T>::PostgreSQLSource(
init(sample_block);
}
template<typename T>
void PostgreSQLSource<T>::init(const Block & sample_block)
{
@ -82,7 +81,8 @@ void PostgreSQLSource<T>::onStart()
{
try
{
tx = std::make_shared<T>(connection_holder->get());
auto & conn = connection_holder->get();
tx = std::make_shared<T>(conn);
}
catch (const pqxx::broken_connection &)
{
@ -180,6 +180,17 @@ void PostgreSQLSource<T>::onFinish()
if (tx && auto_commit)
tx->commit();
is_completed = true;
}
template<typename T>
PostgreSQLSource<T>::~PostgreSQLSource()
{
if (!is_completed)
{
connection_holder->setBroken();
}
}
template

View File

@ -28,6 +28,8 @@ public:
String getName() const override { return "PostgreSQL"; }
~PostgreSQLSource() override;
protected:
PostgreSQLSource(
std::shared_ptr<T> tx_,
@ -54,6 +56,7 @@ private:
ExternalResultDescription description;
bool started = false;
bool is_completed = false;
postgres::ConnectionHolderPtr connection_holder;