This commit is contained in:
kssenii 2021-03-20 11:53:41 +00:00
parent 4e63b8e5dd
commit caffccd73e
8 changed files with 38 additions and 42 deletions

View File

@ -20,6 +20,7 @@ namespace DB
template<typename T>
class PostgreSQLBlockInputStream : public IBlockInputStream
{
public:
PostgreSQLBlockInputStream(
std::shared_ptr<T> tx_,

View File

@ -87,7 +87,7 @@ void DatabaseMaterializePostgreSQL<Base>::startSynchronization()
remote_database_name,
connection->conn_str(),
metadata_path + METADATA_SUFFIX,
std::make_shared<Context>(global_context),
global_context,
settings->postgresql_replica_max_block_size.value,
settings->postgresql_replica_allow_minimal_ddl, true,
settings->postgresql_replica_tables_list.value);

View File

@ -23,7 +23,7 @@ namespace ErrorCodes
}
MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer(
std::shared_ptr<Context> context_,
const Context & context_,
PostgreSQLConnectionPtr connection_,
const std::string & replication_slot_name_,
const std::string & publication_name_,
@ -358,10 +358,11 @@ void MaterializePostgreSQLConsumer::processReplicationMessage(const char * repli
}
case 'C': // Commit
{
readInt8(replication_message, pos, size); /// unused flags
readInt64(replication_message, pos, size); /// Int64 commit lsn
readInt64(replication_message, pos, size); /// Int64 transaction end lsn
readInt64(replication_message, pos, size); /// Int64 transaction commit timestamp
constexpr size_t unused_flags_len = 1;
constexpr size_t commit_lsn_len = 8;
constexpr size_t transaction_end_lsn_len = 8;
constexpr size_t transaction_commit_timestamp_len = 8;
pos += unused_flags_len + commit_lsn_len + transaction_end_lsn_len + transaction_commit_timestamp_len;
final_lsn = current_lsn;
LOG_DEBUG(log, "Commit lsn: {}", getLSNValue(current_lsn)); /// Will be removed
@ -487,7 +488,7 @@ void MaterializePostgreSQLConsumer::syncTables(std::shared_ptr<pqxx::nontransact
insert->table_id = storage->getStorageID();
insert->columns = buffer.columnsAST;
auto insert_context(*context);
auto insert_context(context);
insert_context.makeQueryContext();
insert_context.addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");

View File

@ -27,7 +27,7 @@ public:
using Storages = std::unordered_map<String, StoragePtr>;
MaterializePostgreSQLConsumer(
std::shared_ptr<Context> context_,
const Context & context_,
PostgreSQLConnectionPtr connection_,
const std::string & replication_slot_name_,
const std::string & publication_name_,
@ -100,7 +100,7 @@ private:
}
Poco::Logger * log;
std::shared_ptr<Context> context;
const Context & context;
const std::string replication_slot_name, publication_name;
MaterializePostgreSQLMetadata metadata;

View File

@ -15,14 +15,13 @@ namespace DB
{
static const auto reschedule_ms = 500;
static const auto max_thread_work_duration_ms = 60000;
PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const std::string & database_name_,
const std::string & conn_str,
const std::string & metadata_path_,
std::shared_ptr<Context> context_,
const Context & context_,
const size_t max_block_size_,
bool allow_minimal_ddl_,
bool is_postgresql_replica_database_engine_,
@ -41,11 +40,8 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
replication_slot = fmt::format("{}_ch_replication_slot", database_name);
publication_name = fmt::format("{}_ch_publication", database_name);
startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ waitConnectionAndStart(); });
startup_task->deactivate();
consumer_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
consumer_task->deactivate();
startup_task = context.getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ waitConnectionAndStart(); });
consumer_task = context.getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
}
@ -87,6 +83,7 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart()
void PostgreSQLReplicationHandler::shutdown()
{
stop_synchronization.store(true);
startup_task->deactivate();
consumer_task->deactivate();
}
@ -97,18 +94,18 @@ void PostgreSQLReplicationHandler::startSynchronization()
auto replication_connection = std::make_shared<PostgreSQLConnection>(fmt::format("{} replication=database", connection->conn_str()));
replication_connection->conn()->set_variable("default_transaction_isolation", "'repeatable read'");
auto ntx = std::make_shared<pqxx::nontransaction>(*replication_connection->conn());
auto tx = std::make_shared<pqxx::nontransaction>(*replication_connection->conn());
std::string snapshot_name, start_lsn;
auto initial_sync = [&]()
{
createReplicationSlot(ntx, start_lsn, snapshot_name);
createReplicationSlot(tx, start_lsn, snapshot_name);
loadFromSnapshot(snapshot_name, storages);
};
/// Replication slot should be deleted with drop table only and created only once, reused after detach.
if (!isReplicationSlotExist(ntx, replication_slot))
if (!isReplicationSlotExist(tx, replication_slot))
{
initial_sync();
}
@ -117,7 +114,7 @@ void PostgreSQLReplicationHandler::startSynchronization()
/// In case of some failure, the following cases are possible (since publication and replication slot are reused):
/// 1. If replication slot exists and metadata file (where last synced version is written) does not exist, it is not ok.
/// 2. If created a new publication and replication slot existed before it was created, it is not ok.
dropReplicationSlot(ntx);
dropReplicationSlot(tx);
initial_sync();
}
else
@ -136,7 +133,7 @@ void PostgreSQLReplicationHandler::startSynchronization()
}
}
ntx->commit();
tx->commit();
consumer = std::make_shared<MaterializePostgreSQLConsumer>(
context,
@ -151,8 +148,6 @@ void PostgreSQLReplicationHandler::startSynchronization()
nested_storages);
consumer_task->activateAndSchedule();
replication_connection->conn()->close();
}
@ -287,10 +282,10 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(
}
bool PostgreSQLReplicationHandler::isReplicationSlotExist(NontransactionPtr ntx, std::string & slot_name)
bool PostgreSQLReplicationHandler::isReplicationSlotExist(NontransactionPtr tx, std::string & slot_name)
{
std::string query_str = fmt::format("SELECT active, restart_lsn FROM pg_replication_slots WHERE slot_name = '{}'", slot_name);
pqxx::result result{ntx->exec(query_str)};
pqxx::result result{tx->exec(query_str)};
/// Replication slot does not exist
if (result.empty())
@ -305,7 +300,7 @@ bool PostgreSQLReplicationHandler::isReplicationSlotExist(NontransactionPtr ntx,
void PostgreSQLReplicationHandler::createReplicationSlot(
NontransactionPtr ntx, std::string & start_lsn, std::string & snapshot_name, bool temporary)
NontransactionPtr tx, std::string & start_lsn, std::string & snapshot_name, bool temporary)
{
std::string query_str;
@ -319,7 +314,7 @@ void PostgreSQLReplicationHandler::createReplicationSlot(
try
{
pqxx::result result{ntx->exec(query_str)};
pqxx::result result{tx->exec(query_str)};
start_lsn = result[0][1].as<std::string>();
snapshot_name = result[0][2].as<std::string>();
LOG_TRACE(log, "Created replication slot: {}, start lsn: {}", replication_slot, start_lsn);
@ -332,7 +327,7 @@ void PostgreSQLReplicationHandler::createReplicationSlot(
}
void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr ntx, bool temporary)
void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr tx, bool temporary)
{
std::string slot_name;
if (temporary)
@ -342,15 +337,15 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr ntx, bo
std::string query_str = fmt::format("SELECT pg_drop_replication_slot('{}')", slot_name);
ntx->exec(query_str);
tx->exec(query_str);
LOG_TRACE(log, "Dropped replication slot: {}", slot_name);
}
void PostgreSQLReplicationHandler::dropPublication(NontransactionPtr ntx)
void PostgreSQLReplicationHandler::dropPublication(NontransactionPtr tx)
{
std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name);
ntx->exec(query_str);
tx->exec(query_str);
}
@ -400,7 +395,7 @@ NameSet PostgreSQLReplicationHandler::fetchTablesFromPublication(PostgreSQLConne
PostgreSQLTableStructure PostgreSQLReplicationHandler::fetchTableStructure(
std::shared_ptr<pqxx::work> tx, const std::string & table_name)
{
auto use_nulls = context->getSettingsRef().external_table_functions_use_nulls;
auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
return fetchPostgreSQLTableStructure(tx, table_name, use_nulls, true);
}
@ -425,12 +420,12 @@ std::unordered_map<Int32, String> PostgreSQLReplicationHandler::reloadFromSnapsh
auto replication_connection = std::make_shared<PostgreSQLConnection>(fmt::format("{} replication=database", connection_str));
replication_connection->conn()->set_variable("default_transaction_isolation", "'repeatable read'");
auto ntx = std::make_shared<pqxx::nontransaction>(*replication_connection->conn());
auto r_tx = std::make_shared<pqxx::nontransaction>(*replication_connection->conn());
std::string snapshot_name, start_lsn;
createReplicationSlot(ntx, start_lsn, snapshot_name, true);
createReplicationSlot(r_tx, start_lsn, snapshot_name, true);
/// This snapshot is valid up to the end of the transaction, which exported it.
auto success_tables = loadFromSnapshot(snapshot_name, sync_storages);
ntx->commit();
r_tx->commit();
for (const auto & relation : relation_data)
{

View File

@ -24,12 +24,11 @@ class StorageMaterializePostgreSQL;
class PostgreSQLReplicationHandler
{
public:
friend class PGReplicaLSN;
PostgreSQLReplicationHandler(
const std::string & database_name_,
const std::string & conn_str_,
const std::string & metadata_path_,
std::shared_ptr<Context> context_,
const Context & context_,
const size_t max_block_size_,
bool allow_minimal_ddl_,
bool is_postgresql_replica_database_engine_,
@ -76,7 +75,7 @@ private:
std::unordered_map<Int32, String> reloadFromSnapshot(const std::vector<std::pair<Int32, String>> & relation_data);
Poco::Logger * log;
std::shared_ptr<Context> context;
const Context & context;
const std::string database_name, connection_str, metadata_path;
const size_t max_block_size;
bool allow_minimal_ddl, is_postgresql_replica_database_engine;

View File

@ -46,7 +46,7 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings_)
: IStorage(table_id_)
, remote_table_name(remote_table_name_)
, global_context(std::make_shared<Context>(context_.getGlobalContext()))
, global_context(context_.getGlobalContext())
, replication_settings(std::move(replication_settings_))
, is_postgresql_replica_database(
DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "MaterializePostgreSQL")
@ -71,7 +71,7 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
StoragePtr nested_storage_,
const Context & context_)
: IStorage(table_id_)
, global_context(std::make_shared<Context>(context_))
, global_context(context_)
, nested_storage(nested_storage_)
, is_postgresql_replica_database(
DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "MaterializePostgreSQL")
@ -267,7 +267,7 @@ void StorageMaterializePostgreSQL::createNestedIfNeeded(const std::function<Post
Context StorageMaterializePostgreSQL::makeNestedTableContext() const
{
auto context(*global_context);
auto context(global_context);
context.makeQueryContext();
context.addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");

View File

@ -90,7 +90,7 @@ private:
std::string getNestedTableName() const;
std::string remote_table_name;
std::shared_ptr<Context> global_context;
const Context global_context;
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings;
std::unique_ptr<PostgreSQLReplicationHandler> replication_handler;