This commit is contained in:
kssenii 2024-08-13 16:25:07 +02:00
parent 99282e526a
commit 5a6090ad05
2 changed files with 15 additions and 11 deletions

View File

@ -35,9 +35,9 @@ PostgreSQLSource<T>::PostgreSQLSource(
const Block & sample_block,
UInt64 max_block_size_)
: ISource(sample_block.cloneEmpty())
, query_str(query_str_)
, max_block_size(max_block_size_)
, connection_holder(std::move(connection_holder_))
, query_str(query_str_)
{
init(sample_block);
}
@ -51,10 +51,10 @@ PostgreSQLSource<T>::PostgreSQLSource(
UInt64 max_block_size_,
bool auto_commit_)
: ISource(sample_block.cloneEmpty())
, query_str(query_str_)
, tx(std::move(tx_))
, max_block_size(max_block_size_)
, auto_commit(auto_commit_)
, query_str(query_str_)
, tx(std::move(tx_))
{
init(sample_block);
}
@ -204,15 +204,15 @@ PostgreSQLSource<T>::~PostgreSQLSource()
*/
stream->close();
}
stream.reset();
tx.reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
stream.reset();
tx.reset();
if (connection_holder)
connection_holder->setBroken();
}

View File

@ -38,14 +38,12 @@ protected:
UInt64 max_block_size_,
bool auto_commit_);
String query_str;
std::shared_ptr<T> tx;
std::unique_ptr<pqxx::stream_from> stream;
Status prepare() override;
void onStart();
Chunk generate() override;
void onStart();
void onFinish();
private:
@ -61,6 +59,12 @@ private:
postgres::ConnectionHolderPtr connection_holder;
std::unordered_map<size_t, PostgreSQLArrayInfo> array_info;
protected:
String query_str;
/// tx and stream must be destroyed before connection_holder.
std::shared_ptr<T> tx;
std::unique_ptr<pqxx::stream_from> stream;
};