This commit is contained in:
kssenii 2022-01-08 00:27:29 +03:00
parent 2a58048868
commit 90972e3752
3 changed files with 8 additions and 1 deletions

View File

@ -50,13 +50,17 @@ DatabaseMaterializedPostgreSQL::DatabaseMaterializedPostgreSQL(
, remote_database_name(postgres_database_name)
, connection_info(connection_info_)
, settings(std::move(settings_))
, startup_task(getContext()->getSchedulePool().createTask("MaterializedPostgreSQLDatabaseStartup", [this]{ startSynchronization(); }))
{
startup_task = getContext()->getSchedulePool().createTask("MaterializedPostgreSQLDatabaseStartup", [this]{ startSynchronization(); });
}
void DatabaseMaterializedPostgreSQL::startSynchronization()
{
std::lock_guard lock(handler_mutex);
if (shutdown_called)
return;
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
/* replication_identifier */database_name,
remote_database_name,
@ -379,6 +383,7 @@ void DatabaseMaterializedPostgreSQL::stopReplication()
if (replication_handler)
replication_handler->shutdown();
shutdown_called = true;
/// Clear wrappers over nested, all access is not done to nested tables directly.
materialized_tables.clear();
}

View File

@ -88,6 +88,7 @@ private:
mutable std::mutex handler_mutex;
BackgroundSchedulePool::TaskHolder startup_task;
bool shutdown_called = false;
};
}

View File

@ -153,6 +153,7 @@ void PostgreSQLReplicationHandler::shutdown()
stop_synchronization.store(true);
startup_task->deactivate();
consumer_task->deactivate();
consumer.reset(); /// Clear shared pointers to inner storages.
}