This commit is contained in:
kssenii 2023-10-12 16:32:56 +02:00
parent d1f6888119
commit 1917a882b7
7 changed files with 153 additions and 46 deletions

View File

@ -63,20 +63,11 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
if (shutdown_called)
return;
String replication_identifier;
if (settings->materialized_postgresql_use_unique_replication_consumer_identifier)
{
replication_identifier = fmt::format("{}_{}", getUUID(), TSA_SUPPRESS_WARNING_FOR_READ(database_name));
}
else
{
replication_identifier = TSA_SUPPRESS_WARNING_FOR_READ(database_name);
}
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
replication_identifier,
remote_database_name,
/* table_name */"",
TSA_SUPPRESS_WARNING_FOR_READ(database_name), /// FIXME
toString(getUUID()),
connection_info,
getContext(),
is_attach,

View File

@ -574,6 +574,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
void MaterializedPostgreSQLConsumer::syncTables()
{
size_t synced_tables = 0;
while (!tables_to_sync.empty())
{
auto table_name = *tables_to_sync.begin();
@ -604,6 +605,7 @@ void MaterializedPostgreSQLConsumer::syncTables()
CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
++synced_tables;
}
}
catch (...)
@ -616,7 +618,8 @@ void MaterializedPostgreSQLConsumer::syncTables()
tables_to_sync.erase(tables_to_sync.begin());
}
LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})", tables_to_sync.size(), current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));
LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})",
synced_tables, current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));
updateLsn();
}

View File

@ -17,12 +17,14 @@
#include <Interpreters/Context.h>
#include <Databases/DatabaseOnDisk.h>
#include <boost/algorithm/string/trim.hpp>
#include <Poco/String.h>
namespace DB
{
static const auto CLEANUP_RESCHEDULE_MS = 600000 * 3; /// 30 min
static constexpr size_t replication_slot_name_max_size = 64;
namespace ErrorCodes
{
@ -56,10 +58,70 @@ private:
};
namespace
{
/// There can be several replication slots per publication, but one publication per table/database replication.
/// Replication slot might be unique (contain uuid) to allow have multiple replicas for the same PostgreSQL table/database.
String getPublicationName(const String & postgres_database, const String & postgres_table)
{
return fmt::format(
"{}_ch_publication",
postgres_table.empty() ? postgres_database : fmt::format("{}_{}", postgres_database, postgres_table));
}
void checkReplicationSlot(String name)
{
for (const auto & c : name)
{
const bool ok = (std::isalpha(c) && std::islower(c)) || std::isdigit(c) || c == '_';
if (!ok)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Replication slot can contain lower-case letters, numbers, and the underscore character. "
"Got: {}", name);
}
}
if (name.size() > replication_slot_name_max_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Too big replication slot size: {}", name);
}
String normalizeReplicationSlot(String name)
{
name = Poco::toLower(name);
for (auto & c : name)
if (c == '-')
c = '_';
return name;
}
String getReplicationSlotName(
const String & postgres_database,
const String & postgres_table,
const String & clickhouse_uuid,
const MaterializedPostgreSQLSettings & replication_settings)
{
String slot_name = replication_settings.materialized_postgresql_replication_slot;
if (slot_name.empty())
{
if (replication_settings.materialized_postgresql_use_unique_replication_consumer_identifier)
slot_name = clickhouse_uuid;
else
slot_name = postgres_table.empty() ? postgres_database : fmt::format("{}_{}_ch_replication_slot", postgres_database, postgres_table);
slot_name = normalizeReplicationSlot(slot_name);
}
return slot_name;
}
}
PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const String & replication_identifier,
const String & postgres_database_,
const String & current_database_name_,
const String & postgres_table_,
const String & clickhouse_database_,
const String & clickhouse_uuid_,
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
bool is_attach_,
@ -70,14 +132,18 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
, is_attach(is_attach_)
, postgres_database(postgres_database_)
, postgres_schema(replication_settings.materialized_postgresql_schema)
, current_database_name(current_database_name_)
, current_database_name(clickhouse_database_)
, connection_info(connection_info_)
, max_block_size(replication_settings.materialized_postgresql_max_block_size)
, is_materialized_postgresql_database(is_materialized_postgresql_database_)
, tables_list(replication_settings.materialized_postgresql_tables_list)
, schema_list(replication_settings.materialized_postgresql_schema_list)
, schema_as_a_part_of_table_name(!schema_list.empty() || replication_settings.materialized_postgresql_tables_list_with_schema)
, user_managed_slot(!replication_settings.materialized_postgresql_replication_slot.value.empty())
, user_provided_snapshot(replication_settings.materialized_postgresql_snapshot)
, replication_slot(getReplicationSlotName(postgres_database_, postgres_table_, clickhouse_uuid_, replication_settings))
, tmp_replication_slot(replication_slot + "_tmp")
, publication_name(getPublicationName(postgres_database_, postgres_table_))
, reschedule_backoff_min_ms(replication_settings.materialized_postgresql_backoff_min_ms)
, reschedule_backoff_max_ms(replication_settings.materialized_postgresql_backoff_max_ms)
, reschedule_backoff_factor(replication_settings.materialized_postgresql_backoff_factor)
@ -89,13 +155,9 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
if (!schema_list.empty() && !postgres_schema.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot have schema list and common schema at the same time");
replication_slot = replication_settings.materialized_postgresql_replication_slot;
if (replication_slot.empty())
{
user_managed_slot = false;
replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier);
}
publication_name = fmt::format("{}_ch_publication", replication_identifier);
checkReplicationSlot(replication_slot);
LOG_INFO(log, "Using replication slot {} and publication {}", replication_slot, publication_name);
startup_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ checkConnectionAndStart(); });
consumer_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
@ -496,7 +558,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransactio
throw Exception(ErrorCodes::LOGICAL_ERROR, "No table found to be replicated");
/// 'ONLY' means just a table, without descendants.
std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", publication_name, tables_list);
std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", doubleQuoteString(publication_name), tables_list);
try
{
tx.exec(query_str);
@ -519,7 +581,7 @@ bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction &
{
String slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
slot_name = tmp_replication_slot;
else
slot_name = replication_slot;
@ -546,11 +608,11 @@ void PostgreSQLReplicationHandler::createReplicationSlot(
String query_str, slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
slot_name = tmp_replication_slot;
else
slot_name = replication_slot;
query_str = fmt::format("CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT", slot_name);
query_str = fmt::format("CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT", doubleQuoteString(slot_name));
try
{
@ -573,7 +635,7 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(pqxx::nontransaction & tx
std::string slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
slot_name = tmp_replication_slot;
else
slot_name = replication_slot;

View File

@ -21,9 +21,10 @@ public:
using ConsumerPtr = std::shared_ptr<MaterializedPostgreSQLConsumer>;
PostgreSQLReplicationHandler(
const String & replication_identifier,
const String & postgres_database_,
const String & current_database_name_,
const String & postgres_table_,
const String & clickhouse_database_,
const String & clickhouse_uuid_,
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
bool is_attach_,
@ -128,10 +129,11 @@ private:
/// This is possible to allow replicating tables from multiple schemas in the same MaterializedPostgreSQL database engine.
mutable bool schema_as_a_part_of_table_name = false;
bool user_managed_slot = true;
String user_provided_snapshot;
String replication_slot, publication_name;
const bool user_managed_slot;
const String user_provided_snapshot;
const String replication_slot;
const String tmp_replication_slot;
const String publication_name;
/// Replication consumer. Manages decoding of replication stream and syncing into tables.
ConsumerPtr consumer;

View File

@ -74,22 +74,13 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
setInMemoryMetadata(storage_metadata);
String replication_identifier;
if (replication_settings->materialized_postgresql_use_unique_replication_consumer_identifier)
{
replication_identifier = fmt::format("{}_{}_{}", table_id_.uuid, remote_database_name, remote_table_name_);
}
else
{
replication_identifier = fmt::format("{}_{}", remote_database_name, remote_table_name_);
}
replication_settings->materialized_postgresql_tables_list = remote_table_name_;
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
replication_identifier,
remote_database_name,
remote_table_name_,
table_id_.database_name,
toString(table_id_.uuid),
connection_info,
getContext(),
is_attach,

View File

@ -113,11 +113,19 @@ class PostgresManager:
self.created_materialized_postgres_db_list = set()
self.created_ch_postgres_db_list = set()
def init(self, instance, ip, port, default_database="postgres_database"):
def init(
self,
instance,
ip,
port,
default_database="postgres_database",
postgres_db_exists=False,
):
self.instance = instance
self.ip = ip
self.port = port
self.default_database = default_database
self.postgres_db_exists = postgres_db_exists
self.prepare()
def get_default_database(self):
@ -138,7 +146,8 @@ class PostgresManager:
self.conn = get_postgres_conn(ip=self.ip, port=self.port)
self.cursor = self.conn.cursor()
if self.default_database != "":
self.create_postgres_db(self.default_database)
if not self.postgres_db_exists:
self.create_postgres_db(self.default_database)
self.conn = get_postgres_conn(
ip=self.ip,
port=self.port,
@ -364,6 +373,12 @@ def check_tables_are_synchronized(
time.sleep(1)
result = instance.query(result_query)
if result != expected:
count = int(instance.query(f"select count() from {table_path}"))
expected_count = int(
instance.query(f"select count() from {postgres_database}.{table_name}")
)
print(f"Having {count}, expected {expected_count}")
assert result == expected

View File

@ -719,6 +719,49 @@ def test_too_many_parts(started_cluster):
pg_manager2.drop_materialized_db()
def test_replica_consumer(started_cluster):
table = "test_replica_consumer"
pg_manager_replica = PostgresManager()
pg_manager_replica.init(
instance2,
cluster.postgres_ip,
cluster.postgres_port,
default_database="postgres_database",
postgres_db_exists=True
)
for pm in [pg_manager, pg_manager_replica]:
pm.create_and_fill_postgres_table(table)
pm.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=[
f"materialized_postgresql_tables_list = '{table}'",
"materialized_postgresql_backoff_min_ms = 100",
"materialized_postgresql_backoff_max_ms = 100",
"materialized_postgresql_use_unique_replication_consumer_identifier = 1"
],
)
assert 50 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
assert 50 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))
instance.query(f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(1000, 1000)")
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
check_tables_are_synchronized(
instance2, table, postgres_database=pg_manager_replica.get_default_database()
)
assert 1050 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
assert 1050 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))
pg_manager_replica.clear()
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")