Merge pull request #33187 from kssenii/materialized-postgresql-fix-cleanup

materialized postgresql make sure temporary replication slots are deleted
This commit is contained in:
Kseniia Sumarokova 2021-12-28 22:04:32 +03:00 committed by GitHub
commit 0c41b46e75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 115 additions and 63 deletions

View File

@ -20,6 +20,7 @@ namespace DB
static const auto RESCHEDULE_MS = 1000;
static const auto BACKOFF_TRESHOLD_MS = 10000;
static const auto CLEANUP_RESCHEDULE_MS = 600000 * 3; /// 30 min
namespace ErrorCodes
{
@ -28,6 +29,30 @@ namespace ErrorCodes
extern const int POSTGRESQL_REPLICATION_INTERNAL_ERROR;
}
class TemporaryReplicationSlot
{
public:
TemporaryReplicationSlot(
PostgreSQLReplicationHandler * handler_,
std::shared_ptr<pqxx::nontransaction> tx_,
String & start_lsn,
String & snapshot_name)
: handler(handler_), tx(tx_)
{
handler->createReplicationSlot(*tx, start_lsn, snapshot_name, /* temporary */true);
}
~TemporaryReplicationSlot()
{
handler->dropReplicationSlot(*tx, /* temporary */true);
}
private:
PostgreSQLReplicationHandler * handler;
std::shared_ptr<pqxx::nontransaction> tx;
};
PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const String & replication_identifier,
const String & postgres_database_,
@ -69,6 +94,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ checkConnectionAndStart(); });
consumer_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
cleanup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ cleanupFunc(); });
}
@ -148,6 +174,7 @@ void PostgreSQLReplicationHandler::shutdown()
stop_synchronization.store(true);
startup_task->deactivate();
consumer_task->deactivate();
cleanup_task->deactivate();
}
@ -268,6 +295,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
(is_materialized_postgresql_database ? postgres_database : postgres_database + '.' + tables_list));
consumer_task->activateAndSchedule();
cleanup_task->activateAndSchedule();
/// Do not rely anymore on saved storage pointers.
materialized_storages.clear();
@ -278,6 +306,7 @@ ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializ
{
postgres::Connection connection(connection_info);
pqxx::nontransaction tx(connection.getRef());
auto [postgres_table_schema, postgres_table_name] = getSchemaAndTableName(table_name);
auto table_structure = std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, postgres_table_name, postgres_table_schema, true, true, true));
@ -330,6 +359,21 @@ StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection &
}
void PostgreSQLReplicationHandler::cleanupFunc()
{
/// It is very important to make sure temporary replication slots are removed!
/// So just in case every 30 minutes check if one still exists.
postgres::Connection connection(connection_info);
String last_committed_lsn;
connection.execWithRetry([&](pqxx::nontransaction & tx)
{
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true);
});
cleanup_task->scheduleAfter(CLEANUP_RESCHEDULE_MS);
}
void PostgreSQLReplicationHandler::consumerFunc()
{
std::vector<std::pair<Int32, String>> skipped_tables;
@ -774,10 +818,12 @@ void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPost
StoragePtr nested_storage;
{
pqxx::nontransaction tx(replication_connection.getRef());
if (isReplicationSlotExist(tx, start_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true);
createReplicationSlot(tx, start_lsn, snapshot_name, /* temporary */true);
auto tx = std::make_shared<pqxx::nontransaction>(replication_connection.getRef());
if (isReplicationSlotExist(*tx, start_lsn, /* temporary */true))
dropReplicationSlot(*tx, /* temporary */true);
TemporaryReplicationSlot temporary_slot(this, tx, start_lsn, snapshot_name);
/// Protect against deadlock.
auto nested = DatabaseCatalog::instance().tryGetTable(materialized_storage->getNestedStorageID(), materialized_storage->getNestedTableContext());
@ -848,14 +894,14 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
try
{
postgres::Connection replication_connection(connection_info, /* replication */true);
pqxx::nontransaction tx(replication_connection.getRef());
auto tx = std::make_shared<pqxx::nontransaction>(replication_connection.getRef());
{
String snapshot_name, start_lsn;
if (isReplicationSlotExist(*tx, start_lsn, /* temporary */true))
dropReplicationSlot(*tx, /* temporary */true);
if (isReplicationSlotExist(tx, start_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true);
createReplicationSlot(tx, start_lsn, snapshot_name, /* temporary */true);
TemporaryReplicationSlot temporary_slot(this, tx, start_lsn, snapshot_name);
postgres::Connection tmp_connection(connection_info);
for (const auto & [relation_id, table_name] : relation_data)
@ -920,9 +966,9 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
dropReplicationSlot(tx, /* temporary */true);
tx.commit();
tx->commit();
}
catch (...)
{

View File

@ -15,6 +15,8 @@ struct SettingChange;
class PostgreSQLReplicationHandler
{
friend class TemporaryReplicationSlot;
public:
PostgreSQLReplicationHandler(
const String & replication_identifier,
@ -52,6 +54,8 @@ public:
void setSetting(const SettingChange & setting);
void cleanupFunc();
private:
using MaterializedStorages = std::unordered_map<String, StorageMaterializedPostgreSQL *>;
@ -133,7 +137,9 @@ private:
/// Replication consumer. Manages decoding of replication stream and syncing into tables.
std::shared_ptr<MaterializedPostgreSQLConsumer> consumer;
BackgroundSchedulePool::TaskHolder startup_task, consumer_task;
BackgroundSchedulePool::TaskHolder startup_task;
BackgroundSchedulePool::TaskHolder consumer_task;
BackgroundSchedulePool::TaskHolder cleanup_task;
std::atomic<bool> stop_synchronization = false;