mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
Much better
This commit is contained in:
parent
f0be5c6938
commit
12f98e8b11
@ -1,9 +1,8 @@
|
||||
#include "PostgreSQLConnection.h"
|
||||
|
||||
#if USE_LIBPQXX
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -17,11 +16,41 @@ namespace ErrorCodes
|
||||
namespace postgres
|
||||
{
|
||||
|
||||
Connection::Connection(
|
||||
const String & connection_str_,
|
||||
const String & address_)
|
||||
: connection_str(connection_str_)
|
||||
, address(address_)
|
||||
ConnectionInfo formatConnectionString(
|
||||
std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
|
||||
{
|
||||
DB::WriteBufferFromOwnString out;
|
||||
out << "dbname=" << DB::quote << dbname
|
||||
<< " host=" << DB::quote << host
|
||||
<< " port=" << port
|
||||
<< " user=" << DB::quote << user
|
||||
<< " password=" << DB::quote << password;
|
||||
return std::make_pair(out.str(), host + ':' + DB::toString(port));
|
||||
}
|
||||
|
||||
|
||||
ConnectionPtr createReplicationConnection(const ConnectionInfo & connection_info)
|
||||
{
|
||||
auto new_connection_info = std::make_pair(
|
||||
fmt::format("{} replication=database", connection_info.first),
|
||||
connection_info.second);
|
||||
|
||||
auto connection = std::make_shared<postgres::Connection>(new_connection_info);
|
||||
connection->get()->set_variable("default_transaction_isolation", "'repeatable read'");
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
||||
template <typename T>
|
||||
std::shared_ptr<T> createTransaction(pqxx::connection & connection)
|
||||
{
|
||||
return std::make_shared<T>(connection);
|
||||
}
|
||||
|
||||
|
||||
Connection::Connection(const ConnectionInfo & connection_info_)
|
||||
: connection_info(connection_info_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -54,8 +83,8 @@ void Connection::connectIfNeeded()
|
||||
{
|
||||
if (!connection || !connection->is_open())
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", getAddress());
|
||||
connection = std::make_shared<pqxx::connection>(connection_str);
|
||||
connection = std::make_shared<pqxx::connection>(connection_info.first);
|
||||
LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", connection_info.second);
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,8 +99,7 @@ bool Connection::tryConnectIfNeeded()
|
||||
{
|
||||
LOG_ERROR(
|
||||
&Poco::Logger::get("PostgreSQLConnection"),
|
||||
"Unable to setup connection to {}, reason: {}",
|
||||
getAddress(), pqxx_error.what());
|
||||
"Unable to setup connection to {}, reason: {}", connection_info.second, pqxx_error.what());
|
||||
return false;
|
||||
}
|
||||
catch (...)
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <pqxx/pqxx> // Y_IGNORE
|
||||
#include <Core/Types.h>
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
|
||||
|
||||
namespace pqxx
|
||||
@ -20,13 +21,40 @@ namespace pqxx
|
||||
namespace postgres
|
||||
{
|
||||
|
||||
class Connection;
|
||||
using ConnectionPtr = std::shared_ptr<Connection>;
|
||||
|
||||
|
||||
/// Connection string and address without login/password (for error logs)
|
||||
using ConnectionInfo = std::pair<std::string, std::string>;
|
||||
|
||||
ConnectionInfo formatConnectionString(
|
||||
std::string dbname, std::string host, UInt16 port, std::string user, std::string password);
|
||||
|
||||
ConnectionPtr createReplicationConnection(const ConnectionInfo & connection_info);
|
||||
|
||||
|
||||
template <typename T>
|
||||
class Transaction
|
||||
{
|
||||
public:
|
||||
Transaction(pqxx::connection & connection) : transaction(connection) {}
|
||||
|
||||
~Transaction() { transaction.commit(); }
|
||||
|
||||
T & getRef() { return transaction; }
|
||||
|
||||
void exec(const String & query) { transaction.exec(query); }
|
||||
|
||||
private:
|
||||
T transaction;
|
||||
};
|
||||
|
||||
|
||||
class Connection
|
||||
{
|
||||
|
||||
public:
|
||||
Connection(
|
||||
const String & connection_str_,
|
||||
const String & address_);
|
||||
Connection(const ConnectionInfo & connection_info_);
|
||||
|
||||
Connection(const Connection & other) = delete;
|
||||
|
||||
@ -38,20 +66,17 @@ public:
|
||||
|
||||
bool isConnected() { return tryConnectIfNeeded(); }
|
||||
|
||||
const String & getConnectionString() { return connection_str; }
|
||||
const ConnectionInfo & getConnectionInfo() { return connection_info; }
|
||||
|
||||
private:
|
||||
void connectIfNeeded();
|
||||
|
||||
bool tryConnectIfNeeded();
|
||||
|
||||
const std::string & getAddress() { return address; }
|
||||
|
||||
pqxx::ConnectionPtr connection;
|
||||
std::string connection_str, address;
|
||||
ConnectionInfo connection_info;
|
||||
};
|
||||
|
||||
using ConnectionPtr = std::shared_ptr<Connection>;
|
||||
|
||||
class ConnectionHolder
|
||||
{
|
||||
|
@ -3,8 +3,6 @@
|
||||
#endif
|
||||
|
||||
#if USE_LIBPQXX
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
#include "PostgreSQLConnectionPool.h"
|
||||
#include "PostgreSQLConnection.h"
|
||||
#include <common/logger_useful.h>
|
||||
@ -31,16 +29,14 @@ ConnectionPool::ConnectionPool(
|
||||
"New connection pool. Size: {}, blocks on empty pool: {}",
|
||||
pool_size, block_on_empty_pool);
|
||||
|
||||
address = host + ':' + std::to_string(port);
|
||||
connection_str = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password));
|
||||
connection_info = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password));
|
||||
initialize();
|
||||
}
|
||||
|
||||
|
||||
ConnectionPool::ConnectionPool(const ConnectionPool & other)
|
||||
: pool(std::make_shared<Pool>(other.pool_size))
|
||||
, connection_str(other.connection_str)
|
||||
, address(other.address)
|
||||
, connection_info(other.connection_info)
|
||||
, pool_size(other.pool_size)
|
||||
, pool_wait_timeout(other.pool_wait_timeout)
|
||||
, block_on_empty_pool(other.block_on_empty_pool)
|
||||
@ -53,20 +49,7 @@ void ConnectionPool::initialize()
|
||||
{
|
||||
/// No connection is made, just fill pool with non-connected connection objects.
|
||||
for (size_t i = 0; i < pool_size; ++i)
|
||||
pool->push(std::make_shared<Connection>(connection_str, address));
|
||||
}
|
||||
|
||||
|
||||
std::string ConnectionPool::formatConnectionString(
|
||||
std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
|
||||
{
|
||||
DB::WriteBufferFromOwnString out;
|
||||
out << "dbname=" << DB::quote << dbname
|
||||
<< " host=" << DB::quote << host
|
||||
<< " port=" << port
|
||||
<< " user=" << DB::quote << user
|
||||
<< " password=" << DB::quote << password;
|
||||
return out.str();
|
||||
pool->push(std::make_shared<Connection>(connection_info));
|
||||
}
|
||||
|
||||
|
||||
@ -87,7 +70,7 @@ ConnectionHolderPtr ConnectionPool::get()
|
||||
return std::make_shared<ConnectionHolder>(connection, *pool);
|
||||
}
|
||||
|
||||
connection = std::make_shared<Connection>(connection_str, address);
|
||||
connection = std::make_shared<Connection>(connection_info);
|
||||
return std::make_shared<ConnectionHolder>(connection, *pool);
|
||||
}
|
||||
|
||||
|
@ -41,9 +41,6 @@ public:
|
||||
|
||||
ConnectionHolderPtr get();
|
||||
|
||||
static std::string formatConnectionString(
|
||||
std::string dbname, std::string host, UInt16 port, std::string user, std::string password);
|
||||
|
||||
private:
|
||||
using Pool = ConcurrentBoundedQueue<ConnectionPtr>;
|
||||
using PoolPtr = std::shared_ptr<Pool>;
|
||||
@ -51,7 +48,7 @@ private:
|
||||
void initialize();
|
||||
|
||||
PoolPtr pool;
|
||||
std::string connection_str, address;
|
||||
ConnectionInfo connection_info;
|
||||
size_t pool_size;
|
||||
int64_t pool_wait_timeout;
|
||||
bool block_on_empty_pool;
|
||||
|
@ -287,7 +287,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
const auto & password = safeGetLiteralValue<String>(engine_args[3], engine_name);
|
||||
|
||||
auto parsed_host_port = parseAddress(host_port, 5432);
|
||||
auto connection_string = postgres::ConnectionPool::formatConnectionString(postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password);
|
||||
auto connection_info = postgres::formatConnectionString(postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password);
|
||||
|
||||
auto postgresql_replica_settings = std::make_unique<MaterializePostgreSQLSettings>();
|
||||
|
||||
@ -296,7 +296,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
|
||||
return std::make_shared<DatabaseMaterializePostgreSQL>(
|
||||
context, metadata_path, uuid, engine_define,
|
||||
database_name, postgres_database_name, connection_string,
|
||||
database_name, postgres_database_name, connection_info,
|
||||
std::move(postgresql_replica_settings));
|
||||
}
|
||||
|
||||
|
@ -41,12 +41,12 @@ DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL(
|
||||
const ASTStorage * database_engine_define_,
|
||||
const String & database_name_,
|
||||
const String & postgres_database_name,
|
||||
const String & connection_string,
|
||||
const postgres::ConnectionInfo & connection_info,
|
||||
std::unique_ptr<MaterializePostgreSQLSettings> settings_)
|
||||
: DatabaseAtomic(database_name_, metadata_path_, uuid_, "DatabaseMaterializePostgreSQL<Atomic> (" + database_name_ + ")", context_)
|
||||
, database_engine_define(database_engine_define_->clone())
|
||||
, remote_database_name(postgres_database_name)
|
||||
, connection(std::make_shared<postgres::Connection>(connection_string, ""))
|
||||
, connection(std::make_shared<postgres::Connection>(connection_info))
|
||||
, settings(std::move(settings_))
|
||||
{
|
||||
}
|
||||
@ -56,7 +56,7 @@ void DatabaseMaterializePostgreSQL::startSynchronization()
|
||||
{
|
||||
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
|
||||
remote_database_name,
|
||||
connection->getConnectionString(),
|
||||
connection->getConnectionInfo(),
|
||||
metadata_path + METADATA_SUFFIX,
|
||||
global_context,
|
||||
settings->postgresql_replica_max_block_size.value,
|
||||
|
@ -36,7 +36,7 @@ public:
|
||||
const ASTStorage * database_engine_define_,
|
||||
const String & database_name_,
|
||||
const String & postgres_database_name,
|
||||
const String & connection_string,
|
||||
const postgres::ConnectionInfo & connection_info,
|
||||
std::unique_ptr<MaterializePostgreSQLSettings> settings_);
|
||||
|
||||
String getEngineName() const override { return "MaterializePostgreSQL"; }
|
||||
|
@ -19,7 +19,7 @@ static const auto reschedule_ms = 500;
|
||||
|
||||
PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
|
||||
const std::string & database_name_,
|
||||
const std::string & conn_str,
|
||||
const postgres::ConnectionInfo & connection_info_,
|
||||
const std::string & metadata_path_,
|
||||
const Context & context_,
|
||||
const size_t max_block_size_,
|
||||
@ -29,13 +29,13 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
|
||||
: log(&Poco::Logger::get("PostgreSQLReplicationHandler"))
|
||||
, context(context_)
|
||||
, database_name(database_name_)
|
||||
, connection_str(conn_str)
|
||||
, metadata_path(metadata_path_)
|
||||
, connection_info(connection_info_)
|
||||
, max_block_size(max_block_size_)
|
||||
, allow_minimal_ddl(allow_minimal_ddl_)
|
||||
, is_postgresql_replica_database_engine(is_postgresql_replica_database_engine_)
|
||||
, tables_list(tables_list_)
|
||||
, connection(std::make_shared<postgres::Connection>(conn_str, ""))
|
||||
, connection(std::make_shared<postgres::Connection>(connection_info_))
|
||||
{
|
||||
replication_slot = fmt::format("{}_ch_replication_slot", database_name);
|
||||
publication_name = fmt::format("{}_ch_publication", database_name);
|
||||
@ -63,14 +63,11 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart()
|
||||
{
|
||||
/// Will throw pqxx::broken_connection if no connection at the moment
|
||||
connection->get();
|
||||
|
||||
startSynchronization();
|
||||
}
|
||||
catch (const pqxx::broken_connection & pqxx_error)
|
||||
{
|
||||
LOG_ERROR(log, "Unable to set up connection. Reconnection attempt will continue. Error message: {}",
|
||||
pqxx_error.what());
|
||||
|
||||
LOG_ERROR(log, "Unable to set up connection. Reconnection attempt will continue. Error message: {}", pqxx_error.what());
|
||||
startup_task->scheduleAfter(reschedule_ms);
|
||||
}
|
||||
catch (...)
|
||||
@ -92,20 +89,19 @@ void PostgreSQLReplicationHandler::startSynchronization()
|
||||
{
|
||||
createPublicationIfNeeded(connection->getRef());
|
||||
|
||||
auto replication_connection = std::make_shared<postgres::Connection>(fmt::format("{} replication=database", connection->getConnectionString()), "");
|
||||
replication_connection->get()->set_variable("default_transaction_isolation", "'repeatable read'");
|
||||
auto tx = std::make_shared<pqxx::nontransaction>(replication_connection->getRef());
|
||||
auto replication_connection = postgres::createReplicationConnection(connection_info);
|
||||
postgres::Transaction<pqxx::nontransaction> tx(replication_connection->getRef());
|
||||
|
||||
std::string snapshot_name, start_lsn;
|
||||
|
||||
auto initial_sync = [&]()
|
||||
{
|
||||
createReplicationSlot(tx, start_lsn, snapshot_name);
|
||||
createReplicationSlot(tx.getRef(), start_lsn, snapshot_name);
|
||||
loadFromSnapshot(snapshot_name, storages);
|
||||
};
|
||||
|
||||
/// Replication slot should be deleted with drop table only and created only once, reused after detach.
|
||||
if (!isReplicationSlotExist(tx, replication_slot))
|
||||
if (!isReplicationSlotExist(tx.getRef(), replication_slot))
|
||||
{
|
||||
initial_sync();
|
||||
}
|
||||
@ -114,12 +110,12 @@ void PostgreSQLReplicationHandler::startSynchronization()
|
||||
/// In case of some failure, the following cases are possible (since publication and replication slot are reused):
|
||||
/// 1. If replication slot exists and metadata file (where last synced version is written) does not exist, it is not ok.
|
||||
/// 2. If created a new publication and replication slot existed before it was created, it is not ok.
|
||||
dropReplicationSlot(tx);
|
||||
dropReplicationSlot(tx.getRef());
|
||||
initial_sync();
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Restoring tables...");
|
||||
LOG_TRACE(log, "Restoring {} tables...", storages.size());
|
||||
for (const auto & [table_name, storage] : storages)
|
||||
{
|
||||
try
|
||||
@ -135,8 +131,6 @@ void PostgreSQLReplicationHandler::startSynchronization()
|
||||
}
|
||||
}
|
||||
|
||||
tx->commit();
|
||||
|
||||
consumer = std::make_shared<MaterializePostgreSQLConsumer>(
|
||||
context,
|
||||
connection,
|
||||
@ -226,10 +220,10 @@ void PostgreSQLReplicationHandler::consumerFunc()
|
||||
}
|
||||
|
||||
|
||||
bool PostgreSQLReplicationHandler::isPublicationExist(std::shared_ptr<pqxx::work> tx)
|
||||
bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx)
|
||||
{
|
||||
std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name);
|
||||
pqxx::result result{tx->exec(query_str)};
|
||||
pqxx::result result{tx.exec(query_str)};
|
||||
assert(!result.empty());
|
||||
bool publication_exists = (result[0][0].as<std::string>() == "t");
|
||||
|
||||
@ -245,9 +239,9 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::connection &
|
||||
if (new_publication_created)
|
||||
return;
|
||||
|
||||
auto tx = std::make_shared<pqxx::work>(connection_);
|
||||
postgres::Transaction<pqxx::work> tx(connection_);
|
||||
|
||||
if (!isPublicationExist(tx))
|
||||
if (!isPublicationExist(tx.getRef()))
|
||||
{
|
||||
if (tables_list.empty())
|
||||
{
|
||||
@ -263,7 +257,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::connection &
|
||||
std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", publication_name, tables_list);
|
||||
try
|
||||
{
|
||||
tx->exec(query_str);
|
||||
tx.exec(query_str);
|
||||
new_publication_created = true;
|
||||
LOG_TRACE(log, "Created publication {} with tables list: {}", publication_name, tables_list);
|
||||
}
|
||||
@ -273,15 +267,13 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::connection &
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
|
||||
bool PostgreSQLReplicationHandler::isReplicationSlotExist(NontransactionPtr tx, std::string & slot_name)
|
||||
bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction & tx, std::string & slot_name)
|
||||
{
|
||||
std::string query_str = fmt::format("SELECT active, restart_lsn FROM pg_replication_slots WHERE slot_name = '{}'", slot_name);
|
||||
pqxx::result result{tx->exec(query_str)};
|
||||
pqxx::result result{tx.exec(query_str)};
|
||||
|
||||
/// Replication slot does not exist
|
||||
if (result.empty())
|
||||
@ -296,7 +288,7 @@ bool PostgreSQLReplicationHandler::isReplicationSlotExist(NontransactionPtr tx,
|
||||
|
||||
|
||||
void PostgreSQLReplicationHandler::createReplicationSlot(
|
||||
NontransactionPtr tx, std::string & start_lsn, std::string & snapshot_name, bool temporary)
|
||||
pqxx::nontransaction & tx, std::string & start_lsn, std::string & snapshot_name, bool temporary)
|
||||
{
|
||||
std::string query_str;
|
||||
|
||||
@ -310,7 +302,7 @@ void PostgreSQLReplicationHandler::createReplicationSlot(
|
||||
|
||||
try
|
||||
{
|
||||
pqxx::result result{tx->exec(query_str)};
|
||||
pqxx::result result{tx.exec(query_str)};
|
||||
start_lsn = result[0][1].as<std::string>();
|
||||
snapshot_name = result[0][2].as<std::string>();
|
||||
LOG_TRACE(log, "Created replication slot: {}, start lsn: {}", replication_slot, start_lsn);
|
||||
@ -323,7 +315,7 @@ void PostgreSQLReplicationHandler::createReplicationSlot(
|
||||
}
|
||||
|
||||
|
||||
void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr tx, bool temporary)
|
||||
void PostgreSQLReplicationHandler::dropReplicationSlot(pqxx::nontransaction & tx, bool temporary)
|
||||
{
|
||||
std::string slot_name;
|
||||
if (temporary)
|
||||
@ -333,15 +325,15 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr tx, boo
|
||||
|
||||
std::string query_str = fmt::format("SELECT pg_drop_replication_slot('{}')", slot_name);
|
||||
|
||||
tx->exec(query_str);
|
||||
tx.exec(query_str);
|
||||
LOG_TRACE(log, "Dropped replication slot: {}", slot_name);
|
||||
}
|
||||
|
||||
|
||||
void PostgreSQLReplicationHandler::dropPublication(NontransactionPtr tx)
|
||||
void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
|
||||
{
|
||||
std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name);
|
||||
tx->exec(query_str);
|
||||
tx.exec(query_str);
|
||||
}
|
||||
|
||||
|
||||
@ -350,14 +342,12 @@ void PostgreSQLReplicationHandler::shutdownFinal()
|
||||
if (Poco::File(metadata_path).exists())
|
||||
Poco::File(metadata_path).remove();
|
||||
|
||||
connection = std::make_shared<postgres::Connection>(connection_str, "");
|
||||
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
|
||||
connection = std::make_shared<postgres::Connection>(connection_info);
|
||||
postgres::Transaction<pqxx::nontransaction> tx(connection->getRef());
|
||||
|
||||
dropPublication(tx);
|
||||
if (isReplicationSlotExist(tx, replication_slot))
|
||||
dropReplicationSlot(tx);
|
||||
|
||||
tx->commit();
|
||||
dropPublication(tx.getRef());
|
||||
if (isReplicationSlotExist(tx.getRef(), replication_slot))
|
||||
dropReplicationSlot(tx.getRef());
|
||||
}
|
||||
|
||||
|
||||
@ -379,9 +369,9 @@ NameSet PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::connectio
|
||||
{
|
||||
std::string query = fmt::format("SELECT tablename FROM pg_publication_tables WHERE pubname = '{}'", publication_name);
|
||||
std::unordered_set<std::string> tables;
|
||||
pqxx::read_transaction tx(connection_);
|
||||
postgres::Transaction<pqxx::read_transaction> tx(connection_);
|
||||
|
||||
for (auto table_name : tx.stream<std::string>(query))
|
||||
for (auto table_name : tx.getRef().stream<std::string>(query))
|
||||
tables.insert(std::get<0>(table_name));
|
||||
|
||||
return tables;
|
||||
@ -405,7 +395,6 @@ std::unordered_map<Int32, String> PostgreSQLReplicationHandler::reloadFromSnapsh
|
||||
std::unordered_map<Int32, String> tables_start_lsn;
|
||||
try
|
||||
{
|
||||
auto tx = std::make_shared<pqxx::work>(connection->getRef());
|
||||
Storages sync_storages;
|
||||
for (const auto & relation : relation_data)
|
||||
{
|
||||
@ -414,17 +403,14 @@ std::unordered_map<Int32, String> PostgreSQLReplicationHandler::reloadFromSnapsh
|
||||
sync_storages[table_name] = storage;
|
||||
storage->dropNested();
|
||||
}
|
||||
tx->commit();
|
||||
|
||||
auto replication_connection = std::make_shared<postgres::Connection>(fmt::format("{} replication=database", connection_str), "");
|
||||
replication_connection->get()->set_variable("default_transaction_isolation", "'repeatable read'");
|
||||
auto replication_connection = postgres::createReplicationConnection(connection_info);
|
||||
postgres::Transaction<pqxx::nontransaction> tx(replication_connection->getRef());
|
||||
|
||||
auto r_tx = std::make_shared<pqxx::nontransaction>(replication_connection->getRef());
|
||||
std::string snapshot_name, start_lsn;
|
||||
createReplicationSlot(r_tx, start_lsn, snapshot_name, true);
|
||||
createReplicationSlot(tx.getRef(), start_lsn, snapshot_name, true);
|
||||
/// This snapshot is valid up to the end of the transaction, which exported it.
|
||||
auto success_tables = loadFromSnapshot(snapshot_name, sync_storages);
|
||||
r_tx->commit();
|
||||
|
||||
for (const auto & relation : relation_data)
|
||||
{
|
||||
|
@ -28,7 +28,7 @@ class PostgreSQLReplicationHandler
|
||||
public:
|
||||
PostgreSQLReplicationHandler(
|
||||
const std::string & database_name_,
|
||||
const std::string & conn_str_,
|
||||
const postgres::ConnectionInfo & connection_info_,
|
||||
const std::string & metadata_path_,
|
||||
const Context & context_,
|
||||
const size_t max_block_size_,
|
||||
@ -38,29 +38,31 @@ public:
|
||||
|
||||
void startup();
|
||||
|
||||
/// Stop replication without cleanup.
|
||||
void shutdown();
|
||||
|
||||
/// Clean up replication: remove publication and replication slots.
|
||||
void shutdownFinal();
|
||||
|
||||
void addStorage(const std::string & table_name, StorageMaterializePostgreSQL * storage);
|
||||
|
||||
/// Fetch list of tables which are going to be replicated. Used for database engine.
|
||||
NameSet fetchRequiredTables(pqxx::connection & connection_);
|
||||
|
||||
private:
|
||||
using NontransactionPtr = std::shared_ptr<pqxx::nontransaction>;
|
||||
using Storages = std::unordered_map<String, StorageMaterializePostgreSQL *>;
|
||||
|
||||
bool isPublicationExist(std::shared_ptr<pqxx::work> tx);
|
||||
|
||||
bool isReplicationSlotExist(NontransactionPtr ntx, std::string & slot_name);
|
||||
|
||||
void createPublicationIfNeeded(pqxx::connection & connection_);
|
||||
|
||||
void createReplicationSlot(NontransactionPtr ntx, std::string & start_lsn, std::string & snapshot_name, bool temporary = false);
|
||||
bool isPublicationExist(pqxx::work & tx);
|
||||
|
||||
void dropReplicationSlot(NontransactionPtr tx, bool temporary = false);
|
||||
bool isReplicationSlotExist(pqxx::nontransaction & tx, std::string & slot_name);
|
||||
|
||||
void dropPublication(NontransactionPtr ntx);
|
||||
void createReplicationSlot(pqxx::nontransaction & tx, std::string & start_lsn, std::string & snapshot_name, bool temporary = false);
|
||||
|
||||
void dropReplicationSlot(pqxx::nontransaction & tx, bool temporary = false);
|
||||
|
||||
void dropPublication(pqxx::nontransaction & ntx);
|
||||
|
||||
void waitConnectionAndStart();
|
||||
|
||||
@ -78,19 +80,48 @@ private:
|
||||
|
||||
Poco::Logger * log;
|
||||
const Context & context;
|
||||
const std::string database_name, connection_str, metadata_path;
|
||||
|
||||
/// Remote database name.
|
||||
const String database_name;
|
||||
|
||||
/// Path for replication metadata.
|
||||
const String metadata_path;
|
||||
|
||||
/// Connection string and address for logs.
|
||||
postgres::ConnectionInfo connection_info;
|
||||
|
||||
/// max_block_size for replication stream.
|
||||
const size_t max_block_size;
|
||||
bool allow_minimal_ddl, is_postgresql_replica_database_engine;
|
||||
std::string tables_list, replication_slot, publication_name;
|
||||
|
||||
/// Table structure changes are always tracked. By default, table with changed schema will get into a skip list.
|
||||
bool allow_minimal_ddl = false;
|
||||
|
||||
/// To distinguish whether current replication handler belongs to a MaterializePostgreSQL database engine or single storage.
|
||||
bool is_postgresql_replica_database_engine;
|
||||
|
||||
/// A coma-separated list of tables, which are going to be replicated for database engine. By default, a whole database is replicated.
|
||||
String tables_list;
|
||||
|
||||
String replication_slot, publication_name;
|
||||
|
||||
postgres::ConnectionPtr connection;
|
||||
|
||||
/// Replication consumer. Manages deconding of replication stream and syncing into tables.
|
||||
std::shared_ptr<MaterializePostgreSQLConsumer> consumer;
|
||||
|
||||
BackgroundSchedulePool::TaskHolder startup_task, consumer_task;
|
||||
std::atomic<bool> tables_loaded = false, stop_synchronization = false;
|
||||
|
||||
std::atomic<bool> stop_synchronization = false;
|
||||
|
||||
/// For database engine there are 2 places where it is checked for publication:
|
||||
/// 1. to fetch tables list from already created publication when database is loaded
|
||||
/// 2. at replication startup
|
||||
bool new_publication_created = false;
|
||||
|
||||
/// MaterializePostgreSQL tables. Used for managing all operations with its internal nested tables.
|
||||
Storages storages;
|
||||
|
||||
/// List of nested tables, which is passed to replication consumer.
|
||||
std::unordered_map<String, StoragePtr> nested_storages;
|
||||
};
|
||||
|
||||
|
@ -42,7 +42,7 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
|
||||
const StorageID & table_id_,
|
||||
const String & remote_database_name,
|
||||
const String & remote_table_name_,
|
||||
const String & connection_str,
|
||||
const postgres::ConnectionInfo & connection_info,
|
||||
const StorageInMemoryMetadata & storage_metadata,
|
||||
const Context & context_,
|
||||
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings_)
|
||||
@ -60,7 +60,7 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
|
||||
|
||||
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
|
||||
remote_database_name,
|
||||
connection_str,
|
||||
connection_info,
|
||||
metadata_path,
|
||||
global_context,
|
||||
replication_settings->postgresql_replica_max_block_size.value,
|
||||
@ -445,7 +445,7 @@ void registerStorageMaterializePostgreSQL(StorageFactory & factory)
|
||||
const String & remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
/// No connection is made here, see Storages/PostgreSQL/PostgreSQLConnection.cpp
|
||||
auto connection_string = postgres::ConnectionPool::formatConnectionString(
|
||||
auto connection_info = postgres::formatConnectionString(
|
||||
remote_database,
|
||||
parsed_host_port.first,
|
||||
parsed_host_port.second,
|
||||
@ -453,7 +453,7 @@ void registerStorageMaterializePostgreSQL(StorageFactory & factory)
|
||||
engine_args[4]->as<ASTLiteral &>().value.safeGet<String>());
|
||||
|
||||
return StorageMaterializePostgreSQL::create(
|
||||
args.table_id, remote_database, remote_table, connection_string,
|
||||
args.table_id, remote_database, remote_table, connection_info,
|
||||
metadata, args.context,
|
||||
std::move(postgresql_replication_settings));
|
||||
};
|
||||
|
@ -76,7 +76,7 @@ protected:
|
||||
const StorageID & table_id_,
|
||||
const String & remote_database_name,
|
||||
const String & remote_table_name,
|
||||
const String & connection_str,
|
||||
const postgres::ConnectionInfo & connection_info,
|
||||
const StorageInMemoryMetadata & storage_metadata,
|
||||
const Context & context_,
|
||||
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings_);
|
||||
|
Loading…
Reference in New Issue
Block a user