Merge pull request #68288 from ClickHouse/try-fix-postgres-crash

Fix postgres crash
This commit is contained in:
Kseniia Sumarokova 2024-08-14 11:01:35 +00:00 committed by GitHub
commit 2b1fce007f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 15 additions and 11 deletions

View File

@ -35,9 +35,9 @@ PostgreSQLSource<T>::PostgreSQLSource(
const Block & sample_block, const Block & sample_block,
UInt64 max_block_size_) UInt64 max_block_size_)
: ISource(sample_block.cloneEmpty()) : ISource(sample_block.cloneEmpty())
, query_str(query_str_)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, connection_holder(std::move(connection_holder_)) , connection_holder(std::move(connection_holder_))
, query_str(query_str_)
{ {
init(sample_block); init(sample_block);
} }
@ -51,10 +51,10 @@ PostgreSQLSource<T>::PostgreSQLSource(
UInt64 max_block_size_, UInt64 max_block_size_,
bool auto_commit_) bool auto_commit_)
: ISource(sample_block.cloneEmpty()) : ISource(sample_block.cloneEmpty())
, query_str(query_str_)
, tx(std::move(tx_))
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, auto_commit(auto_commit_) , auto_commit(auto_commit_)
, query_str(query_str_)
, tx(std::move(tx_))
{ {
init(sample_block); init(sample_block);
} }
@ -204,15 +204,15 @@ PostgreSQLSource<T>::~PostgreSQLSource()
*/ */
stream->close(); stream->close();
} }
stream.reset();
tx.reset();
} }
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
stream.reset();
tx.reset();
if (connection_holder) if (connection_holder)
connection_holder->setBroken(); connection_holder->setBroken();
} }

View File

@ -38,14 +38,12 @@ protected:
UInt64 max_block_size_, UInt64 max_block_size_,
bool auto_commit_); bool auto_commit_);
String query_str;
std::shared_ptr<T> tx;
std::unique_ptr<pqxx::stream_from> stream;
Status prepare() override; Status prepare() override;
void onStart();
Chunk generate() override; Chunk generate() override;
void onStart();
void onFinish(); void onFinish();
private: private:
@ -61,6 +59,12 @@ private:
postgres::ConnectionHolderPtr connection_holder; postgres::ConnectionHolderPtr connection_holder;
std::unordered_map<size_t, PostgreSQLArrayInfo> array_info; std::unordered_map<size_t, PostgreSQLArrayInfo> array_info;
protected:
String query_str;
/// tx and stream must be destroyed before connection_holder.
std::shared_ptr<T> tx;
std::unique_ptr<pqxx::stream_from> stream;
}; };