diff --git a/contrib/libpq b/contrib/libpq index 1f9c286dba6..c7624588ddd 160000 --- a/contrib/libpq +++ b/contrib/libpq @@ -1 +1 @@ -Subproject commit 1f9c286dba60809edb64e384d6727d80d269b6cf +Subproject commit c7624588ddd84f153dd5990e81b886e4568bddde diff --git a/src/DataStreams/PostgreSQLBlockInputStream.cpp b/src/DataStreams/PostgreSQLBlockInputStream.cpp index 6faa71a2d72..2257020385c 100644 --- a/src/DataStreams/PostgreSQLBlockInputStream.cpp +++ b/src/DataStreams/PostgreSQLBlockInputStream.cpp @@ -28,13 +28,13 @@ namespace ErrorCodes } PostgreSQLBlockInputStream::PostgreSQLBlockInputStream( - WrappedPostgreSQLConnection connection_, + WrappedPostgreSQLConnectionPtr connection_, const std::string & query_str_, const Block & sample_block, const UInt64 max_block_size_) : query_str(query_str_) , max_block_size(max_block_size_) - , connection(connection_) + , connection(std::move(connection_)) { description.init(sample_block); for (const auto idx : ext::range(0, description.sample_block.columns())) @@ -48,7 +48,7 @@ PostgreSQLBlockInputStream::PostgreSQLBlockInputStream( void PostgreSQLBlockInputStream::readPrefix() { - tx = std::make_unique(*connection); + tx = std::make_unique(connection->conn()); stream = std::make_unique(*tx, pqxx::from_query, std::string_view(query_str)); } diff --git a/src/DataStreams/PostgreSQLBlockInputStream.h b/src/DataStreams/PostgreSQLBlockInputStream.h index 13c78b89f88..acf4dce4620 100644 --- a/src/DataStreams/PostgreSQLBlockInputStream.h +++ b/src/DataStreams/PostgreSQLBlockInputStream.h @@ -19,7 +19,7 @@ class PostgreSQLBlockInputStream : public IBlockInputStream { public: PostgreSQLBlockInputStream( - WrappedPostgreSQLConnection connection_, + WrappedPostgreSQLConnectionPtr connection_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_); @@ -46,7 +46,7 @@ private: const UInt64 max_block_size; ExternalResultDescription description; - WrappedPostgreSQLConnection connection; + WrappedPostgreSQLConnectionPtr connection; std::unique_ptr tx; std::unique_ptr stream; diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp b/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp index 5b07fa270a9..229c338d765 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp @@ -91,7 +91,7 @@ std::unordered_set DatabasePostgreSQL::fetchTablesList() const std::string query = "SELECT tablename FROM pg_catalog.pg_tables " "WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'"; auto connection = connection_pool->get(); - pqxx::read_transaction tx(*connection); + pqxx::read_transaction tx(connection->conn()); for (auto table_name : tx.stream(query)) tables.insert(std::get<0>(table_name)); @@ -110,7 +110,7 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const } auto connection = connection_pool->get(); - pqxx::nontransaction tx(*connection); + pqxx::nontransaction tx(connection->conn()); try { diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp index 48759eebb29..4e37346ee95 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp @@ -94,7 +94,7 @@ static DataTypePtr convertPostgreSQLDataType(std::string & type, bool is_nullabl std::shared_ptr fetchPostgreSQLTableStructure( - WrappedPostgreSQLConnection connection, const String & postgres_table_name, bool use_nulls) + WrappedPostgreSQLConnectionPtr connection, const String & postgres_table_name, bool use_nulls) { auto columns = NamesAndTypesList(); @@ -113,7 +113,7 @@ std::shared_ptr fetchPostgreSQLTableStructure( "AND NOT attisdropped AND attnum > 0", postgres_table_name); try { - pqxx::read_transaction tx(*connection); + pqxx::read_transaction tx(connection->conn()); pqxx::stream_from stream(tx, pqxx::from_query, std::string_view(query)); std::tuple row; @@ -133,7 +133,7 @@ std::shared_ptr fetchPostgreSQLTableStructure( { throw Exception(fmt::format( "PostgreSQL table {}.{} does not exist", - connection->dbname(), postgres_table_name), ErrorCodes::UNKNOWN_TABLE); + connection->conn().dbname(), postgres_table_name), ErrorCodes::UNKNOWN_TABLE); } catch (Exception & e) { diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h index 83924e8dfdf..a05ea993271 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h @@ -12,7 +12,7 @@ namespace DB { std::shared_ptr fetchPostgreSQLTableStructure( - WrappedPostgreSQLConnection connection, const String & postgres_table_name, bool use_nulls); + WrappedPostgreSQLConnectionPtr connection, const String & postgres_table_name, bool use_nulls); } diff --git a/src/Storages/PostgreSQL/PostgreSQLConnection.cpp b/src/Storages/PostgreSQL/PostgreSQLConnection.cpp index 7360d00fa1e..668360d2266 100644 --- a/src/Storages/PostgreSQL/PostgreSQLConnection.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLConnection.cpp @@ -20,13 +20,6 @@ PostgreSQLConnection::PostgreSQLConnection( } -PostgreSQLConnection::PostgreSQLConnection(const PostgreSQLConnection & other) - : connection_str(other.connection_str) - , address(other.address) -{ -} - - PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::get() { connectIfNeeded(); diff --git a/src/Storages/PostgreSQL/PostgreSQLConnection.h b/src/Storages/PostgreSQL/PostgreSQLConnection.h index 17c3cc963b7..0ee094ba251 100644 --- a/src/Storages/PostgreSQL/PostgreSQLConnection.h +++ b/src/Storages/PostgreSQL/PostgreSQLConnection.h @@ -7,14 +7,19 @@ #if USE_LIBPQXX #include // Y_IGNORE #include +#include namespace DB { +class WrappedPostgreSQLConnection; + class PostgreSQLConnection { +friend class WrappedPostgreSQLConnection; + using ConnectionPtr = std::shared_ptr; public: @@ -22,7 +27,7 @@ public: const String & connection_str_, const String & address_); - PostgreSQLConnection(const PostgreSQLConnection & other); + PostgreSQLConnection(const PostgreSQLConnection & other) = delete; ConnectionPtr get(); @@ -30,11 +35,12 @@ public: bool isConnected() { return tryConnectIfNeeded(); } - bool available() { return ref_count.load() == 0; } + int32_t isAvailable() { return !locked.load(); } - void incrementRef() { ref_count++; } +protected: + void lock() { locked.store(true); } - void decrementRef() { ref_count--; } + void unlock() { locked.store(false); } private: void connectIfNeeded(); @@ -45,7 +51,7 @@ private: ConnectionPtr connection; std::string connection_str, address; - std::atomic ref_count{0}; + std::atomic locked{false}; }; using PostgreSQLConnectionPtr = std::shared_ptr; @@ -55,15 +61,13 @@ class WrappedPostgreSQLConnection { public: - WrappedPostgreSQLConnection(PostgreSQLConnectionPtr connection_) : connection(connection_) { connection->incrementRef(); } + WrappedPostgreSQLConnection(PostgreSQLConnectionPtr connection_) : connection(connection_) { connection->lock(); } - WrappedPostgreSQLConnection(const WrappedPostgreSQLConnection & other) : connection(other.connection) {} + WrappedPostgreSQLConnection(const WrappedPostgreSQLConnection & other) = delete; - ~WrappedPostgreSQLConnection() { connection->decrementRef(); } + ~WrappedPostgreSQLConnection() { connection->unlock(); } - pqxx::connection & operator*() const { return *connection->get(); } - - pqxx::connection * operator->() const { return connection->get().get(); } + pqxx::connection & conn() const { return *connection->get(); } bool isConnected() { return connection->isConnected(); } @@ -71,6 +75,8 @@ private: PostgreSQLConnectionPtr connection; }; +using WrappedPostgreSQLConnectionPtr = std::shared_ptr; + } diff --git a/src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp index a27eb3c8f50..6395cbd942e 100644 --- a/src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp @@ -7,6 +7,7 @@ #include #include "PostgreSQLConnectionPool.h" #include "PostgreSQLConnection.h" +#include namespace DB @@ -50,25 +51,25 @@ void PostgreSQLConnectionPool::initialize() } -WrappedPostgreSQLConnection PostgreSQLConnectionPool::get() +WrappedPostgreSQLConnectionPtr PostgreSQLConnectionPool::get() { std::lock_guard lock(mutex); for (const auto & connection : pool) { - if (connection->available()) - return WrappedPostgreSQLConnection(connection); + if (connection->isAvailable()) + return std::make_shared(connection); } auto connection = std::make_shared(connection_str, address); - return WrappedPostgreSQLConnection(connection); + return std::make_shared(connection); } bool PostgreSQLConnectionPool::isConnected() { auto connection = get(); - return connection.isConnected(); + return connection->isConnected(); } } diff --git a/src/Storages/PostgreSQL/PostgreSQLConnectionPool.h b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.h index df9e667b4ff..cfcbb4b72f7 100644 --- a/src/Storages/PostgreSQL/PostgreSQLConnectionPool.h +++ b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.h @@ -25,7 +25,7 @@ public: PostgreSQLConnectionPool operator =(const PostgreSQLConnectionPool &) = delete; - WrappedPostgreSQLConnection get(); + WrappedPostgreSQLConnectionPtr get(); protected: bool isConnected(); diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.cpp index f5ad91d9a7e..914c2643418 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.cpp @@ -52,7 +52,7 @@ PostgreSQLReplicaConnection::PostgreSQLReplicaConnection( } -WrappedPostgreSQLConnection PostgreSQLReplicaConnection::get() +WrappedPostgreSQLConnectionPtr PostgreSQLReplicaConnection::get() { for (size_t i = 0; i < num_retries; ++i) { diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.h b/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.h index 611fcb9127d..15a2a2cf3b2 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.h @@ -21,7 +21,7 @@ public: PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other) = default; - WrappedPostgreSQLConnection get(); + WrappedPostgreSQLConnectionPtr get(); private: diff --git a/src/Storages/StoragePostgreSQL.cpp b/src/Storages/StoragePostgreSQL.cpp index 13f56f9f531..2f350643139 100644 --- a/src/Storages/StoragePostgreSQL.cpp +++ b/src/Storages/StoragePostgreSQL.cpp @@ -97,10 +97,10 @@ class PostgreSQLBlockOutputStream : public IBlockOutputStream public: explicit PostgreSQLBlockOutputStream( const StorageMetadataPtr & metadata_snapshot_, - WrappedPostgreSQLConnection connection_, + WrappedPostgreSQLConnectionPtr connection_, const std::string & remote_table_name_) : metadata_snapshot(metadata_snapshot_) - , connection(connection_) + , connection(std::move(connection_)) , remote_table_name(remote_table_name_) { } @@ -110,7 +110,7 @@ public: void writePrefix() override { - work = std::make_unique(*connection); + work = std::make_unique(connection->conn()); } @@ -276,7 +276,7 @@ public: private: StorageMetadataPtr metadata_snapshot; - WrappedPostgreSQLConnection connection; + WrappedPostgreSQLConnectionPtr connection; std::string remote_table_name; std::unique_ptr work; diff --git a/tests/integration/test_storage_postgresql/configs/log_conf.xml b/tests/integration/test_storage_postgresql/configs/log_conf.xml new file mode 100644 index 00000000000..f9d15e572aa --- /dev/null +++ b/tests/integration/test_storage_postgresql/configs/log_conf.xml @@ -0,0 +1,11 @@ + + + trace + /var/log/clickhouse-server/log.log + /var/log/clickhouse-server/log.err.log + 1000M + 10 + /var/log/clickhouse-server/stderr.log + /var/log/clickhouse-server/stdout.log + + diff --git a/tests/integration/test_storage_postgresql/test.py b/tests/integration/test_storage_postgresql/test.py index 65f12451683..7d7a6888cc0 100644 --- a/tests/integration/test_storage_postgresql/test.py +++ b/tests/integration/test_storage_postgresql/test.py @@ -9,7 +9,7 @@ from helpers.test_tools import assert_eq_with_retry from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT cluster = ClickHouseCluster(__file__) -node1 = cluster.add_instance('node1', main_configs=[], with_postgres=True) +node1 = cluster.add_instance('node1', main_configs=["configs/log_conf.xml"], with_postgres=True) def get_postgres_conn(database=False): if database == True: @@ -176,15 +176,15 @@ def test_concurrent_queries(started_cluster): cursor.execute('CREATE TABLE test_table (key integer, value integer)') def node_insert(_): - for i in range(5): - result = node1.query("INSERT INTO test_table SELECT number, number FROM numbers(100)", user='default') + for i in range(20): + result = node1.query("INSERT INTO test_table SELECT number, number FROM numbers(1000)", user='default') busy_pool = Pool(10) p = busy_pool.map_async(node_insert, range(10)) p.wait() result = node1.query("SELECT count() FROM test_table", user='default') print(result) - assert(int(result) == 10 * 5 * 100) + assert(int(result) == 20 * 10 * 1000) def node_select(_): for i in range(5): @@ -195,8 +195,8 @@ def test_concurrent_queries(started_cluster): p.wait() def node_insert_select(_): - for i in range(5): - result = node1.query("INSERT INTO test_table SELECT number, number FROM numbers(100)", user='default') + for i in range(20): + result = node1.query("INSERT INTO test_table SELECT number, number FROM numbers(1000)", user='default') result = node1.query("SELECT * FROM test_table LIMIT 100", user='default') busy_pool = Pool(10) @@ -204,7 +204,7 @@ def test_concurrent_queries(started_cluster): p.wait() result = node1.query("SELECT count() FROM test_table", user='default') print(result) - assert(int(result) == 10 * 5 * 100 * 2) + assert(int(result) == 20 * 10 * 1000 * 2) if __name__ == '__main__':