This commit is contained in:
kssenii 2021-03-18 20:04:54 +00:00
parent b135b814fb
commit f1ef87d966
15 changed files with 60 additions and 49 deletions

2
contrib/libpq vendored

@ -1 +1 @@
Subproject commit 1f9c286dba60809edb64e384d6727d80d269b6cf Subproject commit c7624588ddd84f153dd5990e81b886e4568bddde

View File

@ -28,13 +28,13 @@ namespace ErrorCodes
} }
PostgreSQLBlockInputStream::PostgreSQLBlockInputStream( PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
WrappedPostgreSQLConnection connection_, WrappedPostgreSQLConnectionPtr connection_,
const std::string & query_str_, const std::string & query_str_,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_) const UInt64 max_block_size_)
: query_str(query_str_) : query_str(query_str_)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, connection(connection_) , connection(std::move(connection_))
{ {
description.init(sample_block); description.init(sample_block);
for (const auto idx : ext::range(0, description.sample_block.columns())) for (const auto idx : ext::range(0, description.sample_block.columns()))
@ -48,7 +48,7 @@ PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
void PostgreSQLBlockInputStream::readPrefix() void PostgreSQLBlockInputStream::readPrefix()
{ {
tx = std::make_unique<pqxx::read_transaction>(*connection); tx = std::make_unique<pqxx::read_transaction>(connection->conn());
stream = std::make_unique<pqxx::stream_from>(*tx, pqxx::from_query, std::string_view(query_str)); stream = std::make_unique<pqxx::stream_from>(*tx, pqxx::from_query, std::string_view(query_str));
} }

View File

@ -19,7 +19,7 @@ class PostgreSQLBlockInputStream : public IBlockInputStream
{ {
public: public:
PostgreSQLBlockInputStream( PostgreSQLBlockInputStream(
WrappedPostgreSQLConnection connection_, WrappedPostgreSQLConnectionPtr connection_,
const std::string & query_str, const std::string & query_str,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_); const UInt64 max_block_size_);
@ -46,7 +46,7 @@ private:
const UInt64 max_block_size; const UInt64 max_block_size;
ExternalResultDescription description; ExternalResultDescription description;
WrappedPostgreSQLConnection connection; WrappedPostgreSQLConnectionPtr connection;
std::unique_ptr<pqxx::read_transaction> tx; std::unique_ptr<pqxx::read_transaction> tx;
std::unique_ptr<pqxx::stream_from> stream; std::unique_ptr<pqxx::stream_from> stream;

View File

@ -91,7 +91,7 @@ std::unordered_set<std::string> DatabasePostgreSQL::fetchTablesList() const
std::string query = "SELECT tablename FROM pg_catalog.pg_tables " std::string query = "SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'"; "WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'";
auto connection = connection_pool->get(); auto connection = connection_pool->get();
pqxx::read_transaction tx(*connection); pqxx::read_transaction tx(connection->conn());
for (auto table_name : tx.stream<std::string>(query)) for (auto table_name : tx.stream<std::string>(query))
tables.insert(std::get<0>(table_name)); tables.insert(std::get<0>(table_name));
@ -110,7 +110,7 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
} }
auto connection = connection_pool->get(); auto connection = connection_pool->get();
pqxx::nontransaction tx(*connection); pqxx::nontransaction tx(connection->conn());
try try
{ {

View File

@ -94,7 +94,7 @@ static DataTypePtr convertPostgreSQLDataType(std::string & type, bool is_nullabl
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure( std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
WrappedPostgreSQLConnection connection, const String & postgres_table_name, bool use_nulls) WrappedPostgreSQLConnectionPtr connection, const String & postgres_table_name, bool use_nulls)
{ {
auto columns = NamesAndTypesList(); auto columns = NamesAndTypesList();
@ -113,7 +113,7 @@ std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
"AND NOT attisdropped AND attnum > 0", postgres_table_name); "AND NOT attisdropped AND attnum > 0", postgres_table_name);
try try
{ {
pqxx::read_transaction tx(*connection); pqxx::read_transaction tx(connection->conn());
pqxx::stream_from stream(tx, pqxx::from_query, std::string_view(query)); pqxx::stream_from stream(tx, pqxx::from_query, std::string_view(query));
std::tuple<std::string, std::string, std::string, uint16_t> row; std::tuple<std::string, std::string, std::string, uint16_t> row;
@ -133,7 +133,7 @@ std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
{ {
throw Exception(fmt::format( throw Exception(fmt::format(
"PostgreSQL table {}.{} does not exist", "PostgreSQL table {}.{} does not exist",
connection->dbname(), postgres_table_name), ErrorCodes::UNKNOWN_TABLE); connection->conn().dbname(), postgres_table_name), ErrorCodes::UNKNOWN_TABLE);
} }
catch (Exception & e) catch (Exception & e)
{ {

View File

@ -12,7 +12,7 @@ namespace DB
{ {
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure( std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
WrappedPostgreSQLConnection connection, const String & postgres_table_name, bool use_nulls); WrappedPostgreSQLConnectionPtr connection, const String & postgres_table_name, bool use_nulls);
} }

View File

@ -20,13 +20,6 @@ PostgreSQLConnection::PostgreSQLConnection(
} }
PostgreSQLConnection::PostgreSQLConnection(const PostgreSQLConnection & other)
: connection_str(other.connection_str)
, address(other.address)
{
}
PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::get() PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::get()
{ {
connectIfNeeded(); connectIfNeeded();

View File

@ -7,14 +7,19 @@
#if USE_LIBPQXX #if USE_LIBPQXX
#include <pqxx/pqxx> // Y_IGNORE #include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h> #include <Core/Types.h>
#include <common/logger_useful.h>
namespace DB namespace DB
{ {
class WrappedPostgreSQLConnection;
class PostgreSQLConnection class PostgreSQLConnection
{ {
friend class WrappedPostgreSQLConnection;
using ConnectionPtr = std::shared_ptr<pqxx::connection>; using ConnectionPtr = std::shared_ptr<pqxx::connection>;
public: public:
@ -22,7 +27,7 @@ public:
const String & connection_str_, const String & connection_str_,
const String & address_); const String & address_);
PostgreSQLConnection(const PostgreSQLConnection & other); PostgreSQLConnection(const PostgreSQLConnection & other) = delete;
ConnectionPtr get(); ConnectionPtr get();
@ -30,11 +35,12 @@ public:
bool isConnected() { return tryConnectIfNeeded(); } bool isConnected() { return tryConnectIfNeeded(); }
bool available() { return ref_count.load() == 0; } int32_t isAvailable() { return !locked.load(); }
void incrementRef() { ref_count++; } protected:
void lock() { locked.store(true); }
void decrementRef() { ref_count--; } void unlock() { locked.store(false); }
private: private:
void connectIfNeeded(); void connectIfNeeded();
@ -45,7 +51,7 @@ private:
ConnectionPtr connection; ConnectionPtr connection;
std::string connection_str, address; std::string connection_str, address;
std::atomic<uint8_t> ref_count{0}; std::atomic<bool> locked{false};
}; };
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>; using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
@ -55,15 +61,13 @@ class WrappedPostgreSQLConnection
{ {
public: public:
WrappedPostgreSQLConnection(PostgreSQLConnectionPtr connection_) : connection(connection_) { connection->incrementRef(); } WrappedPostgreSQLConnection(PostgreSQLConnectionPtr connection_) : connection(connection_) { connection->lock(); }
WrappedPostgreSQLConnection(const WrappedPostgreSQLConnection & other) : connection(other.connection) {} WrappedPostgreSQLConnection(const WrappedPostgreSQLConnection & other) = delete;
~WrappedPostgreSQLConnection() { connection->decrementRef(); } ~WrappedPostgreSQLConnection() { connection->unlock(); }
pqxx::connection & operator*() const { return *connection->get(); } pqxx::connection & conn() const { return *connection->get(); }
pqxx::connection * operator->() const { return connection->get().get(); }
bool isConnected() { return connection->isConnected(); } bool isConnected() { return connection->isConnected(); }
@ -71,6 +75,8 @@ private:
PostgreSQLConnectionPtr connection; PostgreSQLConnectionPtr connection;
}; };
using WrappedPostgreSQLConnectionPtr = std::shared_ptr<WrappedPostgreSQLConnection>;
} }

View File

@ -7,6 +7,7 @@
#include <IO/Operators.h> #include <IO/Operators.h>
#include "PostgreSQLConnectionPool.h" #include "PostgreSQLConnectionPool.h"
#include "PostgreSQLConnection.h" #include "PostgreSQLConnection.h"
#include <common/logger_useful.h>
namespace DB namespace DB
@ -50,25 +51,25 @@ void PostgreSQLConnectionPool::initialize()
} }
WrappedPostgreSQLConnection PostgreSQLConnectionPool::get() WrappedPostgreSQLConnectionPtr PostgreSQLConnectionPool::get()
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
for (const auto & connection : pool) for (const auto & connection : pool)
{ {
if (connection->available()) if (connection->isAvailable())
return WrappedPostgreSQLConnection(connection); return std::make_shared<WrappedPostgreSQLConnection>(connection);
} }
auto connection = std::make_shared<PostgreSQLConnection>(connection_str, address); auto connection = std::make_shared<PostgreSQLConnection>(connection_str, address);
return WrappedPostgreSQLConnection(connection); return std::make_shared<WrappedPostgreSQLConnection>(connection);
} }
bool PostgreSQLConnectionPool::isConnected() bool PostgreSQLConnectionPool::isConnected()
{ {
auto connection = get(); auto connection = get();
return connection.isConnected(); return connection->isConnected();
} }
} }

View File

@ -25,7 +25,7 @@ public:
PostgreSQLConnectionPool operator =(const PostgreSQLConnectionPool &) = delete; PostgreSQLConnectionPool operator =(const PostgreSQLConnectionPool &) = delete;
WrappedPostgreSQLConnection get(); WrappedPostgreSQLConnectionPtr get();
protected: protected:
bool isConnected(); bool isConnected();

View File

@ -52,7 +52,7 @@ PostgreSQLReplicaConnection::PostgreSQLReplicaConnection(
} }
WrappedPostgreSQLConnection PostgreSQLReplicaConnection::get() WrappedPostgreSQLConnectionPtr PostgreSQLReplicaConnection::get()
{ {
for (size_t i = 0; i < num_retries; ++i) for (size_t i = 0; i < num_retries; ++i)
{ {

View File

@ -21,7 +21,7 @@ public:
PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other) = default; PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other) = default;
WrappedPostgreSQLConnection get(); WrappedPostgreSQLConnectionPtr get();
private: private:

View File

@ -97,10 +97,10 @@ class PostgreSQLBlockOutputStream : public IBlockOutputStream
public: public:
explicit PostgreSQLBlockOutputStream( explicit PostgreSQLBlockOutputStream(
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
WrappedPostgreSQLConnection connection_, WrappedPostgreSQLConnectionPtr connection_,
const std::string & remote_table_name_) const std::string & remote_table_name_)
: metadata_snapshot(metadata_snapshot_) : metadata_snapshot(metadata_snapshot_)
, connection(connection_) , connection(std::move(connection_))
, remote_table_name(remote_table_name_) , remote_table_name(remote_table_name_)
{ {
} }
@ -110,7 +110,7 @@ public:
void writePrefix() override void writePrefix() override
{ {
work = std::make_unique<pqxx::work>(*connection); work = std::make_unique<pqxx::work>(connection->conn());
} }
@ -276,7 +276,7 @@ public:
private: private:
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
WrappedPostgreSQLConnection connection; WrappedPostgreSQLConnectionPtr connection;
std::string remote_table_name; std::string remote_table_name;
std::unique_ptr<pqxx::work> work; std::unique_ptr<pqxx::work> work;

View File

@ -0,0 +1,11 @@
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -9,7 +9,7 @@ from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=[], with_postgres=True) node1 = cluster.add_instance('node1', main_configs=["configs/log_conf.xml"], with_postgres=True)
def get_postgres_conn(database=False): def get_postgres_conn(database=False):
if database == True: if database == True:
@ -176,15 +176,15 @@ def test_concurrent_queries(started_cluster):
cursor.execute('CREATE TABLE test_table (key integer, value integer)') cursor.execute('CREATE TABLE test_table (key integer, value integer)')
def node_insert(_): def node_insert(_):
for i in range(5): for i in range(20):
result = node1.query("INSERT INTO test_table SELECT number, number FROM numbers(100)", user='default') result = node1.query("INSERT INTO test_table SELECT number, number FROM numbers(1000)", user='default')
busy_pool = Pool(10) busy_pool = Pool(10)
p = busy_pool.map_async(node_insert, range(10)) p = busy_pool.map_async(node_insert, range(10))
p.wait() p.wait()
result = node1.query("SELECT count() FROM test_table", user='default') result = node1.query("SELECT count() FROM test_table", user='default')
print(result) print(result)
assert(int(result) == 10 * 5 * 100) assert(int(result) == 20 * 10 * 1000)
def node_select(_): def node_select(_):
for i in range(5): for i in range(5):
@ -195,8 +195,8 @@ def test_concurrent_queries(started_cluster):
p.wait() p.wait()
def node_insert_select(_): def node_insert_select(_):
for i in range(5): for i in range(20):
result = node1.query("INSERT INTO test_table SELECT number, number FROM numbers(100)", user='default') result = node1.query("INSERT INTO test_table SELECT number, number FROM numbers(1000)", user='default')
result = node1.query("SELECT * FROM test_table LIMIT 100", user='default') result = node1.query("SELECT * FROM test_table LIMIT 100", user='default')
busy_pool = Pool(10) busy_pool = Pool(10)
@ -204,7 +204,7 @@ def test_concurrent_queries(started_cluster):
p.wait() p.wait()
result = node1.query("SELECT count() FROM test_table", user='default') result = node1.query("SELECT count() FROM test_table", user='default')
print(result) print(result)
assert(int(result) == 10 * 5 * 100 * 2) assert(int(result) == 20 * 10 * 1000 * 2)
if __name__ == '__main__': if __name__ == '__main__':