Merge pull request #21710 from kssenii/replica-priorities

Support replicas priority for postgres dictionary source
This commit is contained in:
Nikita Mikhaylov 2021-03-17 16:02:00 +03:00 committed by GitHub
commit f51b41b7a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 335 additions and 85 deletions

View File

@ -11,3 +11,10 @@ services:
default:
aliases:
- postgre-sql.local
postgres2:
image: postgres
restart: always
environment:
POSTGRES_PASSWORD: mysecretpassword
ports:
- 5441:5432

View File

@ -548,6 +548,7 @@
M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \
M(579, INCORRECT_PART_TYPE) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \

View File

@ -169,7 +169,7 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, const Conte
return StoragePtr{};
auto storage = StoragePostgreSQL::create(
StorageID(database_name, table_name), table_name, std::make_shared<PostgreSQLConnection>(connection->conn_str()),
StorageID(database_name, table_name), table_name, std::make_shared<PostgreSQLConnection>(*connection),
ColumnsDescription{*columns}, ConstraintsDescription{}, context);
if (cache_tables)

View File

@ -8,7 +8,6 @@
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <Storages/PostgreSQL/PostgreSQLConnection.h>
#include "readInvalidateQuery.h"
#endif
@ -29,11 +28,10 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_prefix,
PostgreSQLConnectionPtr connection_,
const Block & sample_block_)
: dict_struct{dict_struct_}
, sample_block(sample_block_)
, connection(std::move(connection_))
, connection(std::make_shared<PostgreSQLReplicaConnection>(config_, config_prefix))
, log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, db(config_.getString(fmt::format("{}.db", config_prefix), ""))
, table(config_.getString(fmt::format("{}.table", config_prefix), ""))
@ -50,7 +48,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(
PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other)
: dict_struct(other.dict_struct)
, sample_block(other.sample_block)
, connection(std::make_shared<PostgreSQLConnection>(other.connection->conn_str()))
, connection(other.connection)
, log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, db(other.db)
, table(other.table)
@ -68,8 +66,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionar
BlockInputStreamPtr PostgreSQLDictionarySource::loadAll()
{
LOG_TRACE(log, load_all_query);
return std::make_shared<PostgreSQLBlockInputStream>(
connection->conn(), load_all_query, sample_block, max_block_size);
return loadBase(load_all_query);
}
@ -77,23 +74,28 @@ BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll()
{
auto load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query);
return std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), load_update_query, sample_block, max_block_size);
return loadBase(load_update_query);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
const auto query = query_builder.composeLoadIdsQuery(ids);
return std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), query, sample_block, max_block_size);
return loadBase(query);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), query, sample_block, max_block_size);
return loadBase(query);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadBase(const String & query)
{
return std::make_shared<PostgreSQLBlockInputStream>(connection->get(), query, sample_block, max_block_size);
}
bool PostgreSQLDictionarySource::isModified() const
{
if (!invalidate_query.empty())
@ -112,7 +114,7 @@ std::string PostgreSQLDictionarySource::doInvalidateQuery(const std::string & re
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
PostgreSQLBlockInputStream block_input_stream(connection->conn(), request, invalidate_sample_block, 1);
PostgreSQLBlockInputStream block_input_stream(connection->get(), request, invalidate_sample_block, 1);
return readInvalidateQuery(block_input_stream);
}
@ -171,15 +173,8 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
{
#if USE_LIBPQXX
const auto config_prefix = root_config_prefix + ".postgresql";
auto connection = std::make_shared<PostgreSQLConnection>(
config.getString(fmt::format("{}.db", config_prefix), ""),
config.getString(fmt::format("{}.host", config_prefix), ""),
config.getUInt(fmt::format("{}.port", config_prefix), 0),
config.getString(fmt::format("{}.user", config_prefix), ""),
config.getString(fmt::format("{}.password", config_prefix), ""));
return std::make_unique<PostgreSQLDictionarySource>(
dict_struct, config, config_prefix, connection, sample_block);
dict_struct, config, config_prefix, sample_block);
#else
(void)dict_struct;
(void)config;

View File

@ -11,7 +11,7 @@
#include <Core/Block.h>
#include <common/LocalDateTime.h>
#include <common/logger_useful.h>
#include <Storages/StoragePostgreSQL.h>
#include <Storages/PostgreSQL/PostgreSQLReplicaConnection.h>
#include <pqxx/pqxx>
@ -26,7 +26,6 @@ public:
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_prefix,
PostgreSQLConnectionPtr connection_,
const Block & sample_block_);
/// copy-constructor is provided in order to support cloneability
@ -48,10 +47,11 @@ public:
private:
std::string getUpdateFieldAndDate();
std::string doInvalidateQuery(const std::string & request) const;
BlockInputStreamPtr loadBase(const String & query);
const DictionaryStructure dict_struct;
Block sample_block;
PostgreSQLConnectionPtr connection;
PostgreSQLReplicaConnectionPtr connection;
Poco::Logger * log;
const std::string db;

View File

@ -6,23 +6,63 @@
#include <Storages/PostgreSQL/PostgreSQLConnection.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <common/logger_useful.h>
namespace DB
{
PostgreSQLConnection::PostgreSQLConnection(std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
{
address = host + ':' + std::to_string(port);
connection_str = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password));
}
PostgreSQLConnection::PostgreSQLConnection(const PostgreSQLConnection & other)
: connection_str(other.connection_str)
, address(other.address)
{
}
PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::conn()
{
checkUpdateConnection();
connect();
return connection;
}
void PostgreSQLConnection::checkUpdateConnection()
void PostgreSQLConnection::connect()
{
if (!connection || !connection->is_open())
connection = std::make_unique<pqxx::connection>(connection_str);
}
bool PostgreSQLConnection::tryConnect()
{
try
{
connect();
}
catch (const pqxx::broken_connection & pqxx_error)
{
LOG_ERROR(
&Poco::Logger::get("PostgreSQLConnection"),
"Unable to setup connection to {}, reason: {}",
getAddress(), pqxx_error.what());
return false;
}
catch (...)
{
throw;
}
return true;
}
std::string PostgreSQLConnection::formatConnectionString(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
{

View File

@ -16,29 +16,31 @@ namespace DB
/// Connection is not made until actually used.
class PostgreSQLConnection
{
public:
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
public:
PostgreSQLConnection(std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
: connection_str(formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password))) {}
PostgreSQLConnection(std::string dbname, std::string host, UInt16 port, std::string user, std::string password);
PostgreSQLConnection(const std::string & connection_str_) : connection_str(connection_str_) {}
PostgreSQLConnection(const PostgreSQLConnection & other);
PostgreSQLConnection(const PostgreSQLConnection &) = delete;
PostgreSQLConnection operator =(const PostgreSQLConnection &) = delete;
bool tryConnect();
ConnectionPtr conn();
const std::string & getAddress() { return address; }
std::string & conn_str() { return connection_str; }
private:
ConnectionPtr connection;
std::string connection_str;
void connect();
static std::string formatConnectionString(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password);
void checkUpdateConnection();
ConnectionPtr connection;
std::string connection_str, address;
};
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;

View File

@ -0,0 +1,77 @@
#include "PostgreSQLReplicaConnection.h"
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace ErrorCodes
{
extern const int POSTGRESQL_CONNECTION_FAILURE;
}
PostgreSQLReplicaConnection::PostgreSQLReplicaConnection(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const size_t num_retries_)
: log(&Poco::Logger::get("PostgreSQLConnection"))
, num_retries(num_retries_)
{
auto db = config.getString(config_prefix + ".db", "");
auto host = config.getString(config_prefix + ".host", "");
auto port = config.getUInt(config_prefix + ".port", 0);
auto user = config.getString(config_prefix + ".user", "");
auto password = config.getString(config_prefix + ".password", "");
if (config.has(config_prefix + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (config_key.starts_with("replica"))
{
std::string replica_name = config_prefix + "." + config_key;
size_t priority = config.getInt(replica_name + ".priority", 0);
auto replica_host = config.getString(replica_name + ".host", host);
auto replica_port = config.getUInt(replica_name + ".port", port);
auto replica_user = config.getString(replica_name + ".user", user);
auto replica_password = config.getString(replica_name + ".password", password);
replicas[priority] = std::make_shared<PostgreSQLConnection>(db, replica_host, replica_port, replica_user, replica_password);
}
}
}
else
{
replicas[0] = std::make_shared<PostgreSQLConnection>(db, host, port, user, password);
}
}
PostgreSQLReplicaConnection::PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other)
: log(&Poco::Logger::get("PostgreSQLConnection"))
, replicas(other.replicas)
, num_retries(other.num_retries)
{
}
PostgreSQLConnection::ConnectionPtr PostgreSQLReplicaConnection::get()
{
for (size_t i = 0; i < num_retries; ++i)
{
for (auto & replica : replicas)
{
if (replica.second->tryConnect())
return replica.second->conn();
}
}
throw Exception(ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas");
}
}

View File

@ -0,0 +1,38 @@
#pragma once
#include "PostgreSQLConnection.h"
#include <Core/Types.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
namespace DB
{
class PostgreSQLReplicaConnection
{
public:
static constexpr inline auto POSTGRESQL_CONNECTION_DEFAULT_RETRIES_NUM = 5;
PostgreSQLReplicaConnection(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const size_t num_retries_ = POSTGRESQL_CONNECTION_DEFAULT_RETRIES_NUM);
PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other);
PostgreSQLConnection::ConnectionPtr get();
private:
/// Highest priority is 0, the bigger the number in map, the less the priority
using ReplicasByPriority = std::map<size_t, PostgreSQLConnectionPtr>;
Poco::Logger * log;
ReplicasByPriority replicas;
size_t num_retries;
};
using PostgreSQLReplicaConnectionPtr = std::shared_ptr<PostgreSQLReplicaConnection>;
}

View File

@ -0,0 +1,83 @@
<?xml version="1.0"?>
<yandex>
<dictionary>
<name>dict0</name>
<source>
<postgresql>
<db>clickhouse</db>
<host>postgres1</host>
<port>5432</port>
<user>postgres</user>
<password>mysecretpassword</password>
<table>test0</table>
<invalidate_query>SELECT value FROM test0 WHERE id = 0</invalidate_query>
</postgresql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
</id>
<attribute>
<name>id</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>1</lifetime>
</dictionary>
<dictionary>
<name>dict1</name>
<source>
<postgresql>
<db>clickhouse</db>
<user>postgres</user>
<password>mysecretpassword</password>
<table>test1</table>
<replica>
<host>postgres1</host>
<priority>3</priority>
<port>5432</port>
</replica>
<replica>
<host>postgres2</host>
<port>5433</port>
<priority>1</priority>
</replica>
<replica>
<host>postgres2</host>
<port>5432</port>
<priority>2</priority>
</replica>
</postgresql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
</id>
<attribute>
<name>id</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>1</lifetime>
</dictionary>
</yandex>

View File

@ -1,37 +0,0 @@
<?xml version="1.0"?>
<yandex>
<dictionary>
<name>dict0</name>
<source>
<postgresql>
<db>clickhouse</db>
<host>postgres1</host>
<port>5432</port>
<user>postgres</user>
<password>mysecretpassword</password>
<table>test0</table>
<invalidate_query>SELECT value FROM test0 WHERE id = 0</invalidate_query>
</postgresql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
</id>
<attribute>
<name>id</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>1</lifetime>
</dictionary>
</yandex>

View File

@ -6,7 +6,10 @@ from helpers.cluster import ClickHouseCluster
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/config.xml', 'configs/postgres_dict.xml', 'configs/log_conf.xml'], with_postgres=True)
node1 = cluster.add_instance('node1', main_configs=[
'configs/config.xml',
'configs/dictionaries/postgres_dict.xml',
'configs/log_conf.xml'], with_postgres=True)
postgres_dict_table_template = """
CREATE TABLE IF NOT EXISTS {} (
@ -18,11 +21,12 @@ click_dict_table_template = """
) ENGINE = Dictionary({})
"""
def get_postgres_conn(database=False):
def get_postgres_conn(port=5432, database=False):
if database == True:
conn_string = "host='localhost' dbname='clickhouse' user='postgres' password='mysecretpassword'"
conn_string = "host='localhost' port={} dbname='clickhouse' user='postgres' password='mysecretpassword'".format(port)
else:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn_string = "host='localhost' port={} user='postgres' password='mysecretpassword'".format(port)
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
@ -32,15 +36,13 @@ def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("CREATE DATABASE {}".format(name))
def create_postgres_table(conn, table_name):
cursor = conn.cursor()
def create_postgres_table(cursor, table_name):
cursor.execute(postgres_dict_table_template.format(table_name))
def create_and_fill_postgres_table(table_name):
conn = get_postgres_conn(True)
create_postgres_table(conn, table_name)
def create_and_fill_postgres_table(cursor, table_name, host='postgres1', port=5432):
create_postgres_table(cursor, table_name)
# Fill postgres table using clickhouse postgres table function and check
table_func = '''postgresql('postgres1:5432', 'clickhouse', '{}', 'postgres', 'mysecretpassword')'''.format(table_name)
table_func = '''postgresql('{}:{}', 'clickhouse', '{}', 'postgres', 'mysecretpassword')'''.format(host, port, table_name)
node1.query('''INSERT INTO TABLE FUNCTION {} SELECT number, number from numbers(10000)
'''.format(table_func, table_name))
result = node1.query("SELECT count() FROM {}".format(table_func))
@ -54,10 +56,16 @@ def create_dict(table_name, index=0):
def started_cluster():
try:
cluster.start()
postgres_conn = get_postgres_conn()
node1.query("CREATE DATABASE IF NOT EXISTS test")
print("postgres connected")
postgres_conn = get_postgres_conn(port=5432)
print("postgres1 connected")
create_postgres_db(postgres_conn, 'clickhouse')
postgres_conn = get_postgres_conn(port=5441)
print("postgres2 connected")
create_postgres_db(postgres_conn, 'clickhouse')
yield cluster
finally:
@ -65,10 +73,10 @@ def started_cluster():
def test_load_dictionaries(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(database=True)
cursor = conn.cursor()
table_name = 'test0'
create_and_fill_postgres_table(table_name)
create_and_fill_postgres_table(cursor, table_name)
create_dict(table_name)
dict_name = 'dict0'
@ -76,14 +84,17 @@ def test_load_dictionaries(started_cluster):
assert node1.query("SELECT count() FROM `test`.`dict_table_{}`".format(table_name)).rstrip() == '10000'
assert node1.query("SELECT dictGetUInt32('{}', 'id', toUInt64(0))".format(dict_name)) == '0\n'
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(9999))".format(dict_name)) == '9999\n'
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP DICTIONARY IF EXISTS {}".format(dict_name))
def test_invalidate_query(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(database=True)
cursor = conn.cursor()
table_name = 'test0'
create_and_fill_postgres_table(table_name)
create_and_fill_postgres_table(cursor, table_name)
# invalidate query: SELECT value FROM test0 WHERE id = 0
dict_name = 'dict0'
@ -112,6 +123,39 @@ def test_invalidate_query(started_cluster):
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '2\n'
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == '2\n'
node1.query("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP DICTIONARY IF EXISTS {}".format(dict_name))
def test_dictionary_with_replicas(started_cluster):
conn1 = get_postgres_conn(port=5432, database=True)
cursor1 = conn1.cursor()
conn2 = get_postgres_conn(port=5441, database=True)
cursor2 = conn2.cursor()
create_postgres_table(cursor1, 'test1')
create_postgres_table(cursor2, 'test1')
cursor1.execute('INSERT INTO test1 select i, i from generate_series(0, 99) as t(i);');
cursor2.execute('INSERT INTO test1 select i, i from generate_series(100, 199) as t(i);');
create_dict('test1', 1)
result = node1.query("SELECT * FROM `test`.`dict_table_test1` ORDER BY id")
# priority 0 - non running port
assert node1.contains_in_log('Unable to setup connection to postgres2:5433*')
# priority 1 - postgres2, table contains rows with values 100-200
# priority 2 - postgres1, table contains rows with values 0-100
expected = node1.query("SELECT number, number FROM numbers(100, 100)")
assert(result == expected)
cursor1.execute("DROP TABLE IF EXISTS test1")
cursor2.execute("DROP TABLE IF EXISTS test1")
node1.query("DROP TABLE IF EXISTS test1")
node1.query("DROP DICTIONARY IF EXISTS dict1")
if __name__ == '__main__':
cluster.start()