diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.cpp b/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.cpp index a9b07a0c65b..049c17eaf8a 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.cpp +++ b/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.cpp @@ -55,6 +55,7 @@ DatabasePostgreSQLReplica::DatabasePostgreSQLReplica( : DatabaseOrdinary( database_name_, metadata_path_, "data/" + escapeForFileName(database_name_) + "/", "DatabasePostgreSQLReplica (" + database_name_ + ")", context) + , log(&Poco::Logger::get("PostgreSQLReplicaDatabaseEngine")) , global_context(context.getGlobalContext()) , metadata_path(metadata_path_) , database_engine_define(database_engine_define_->clone()) @@ -117,7 +118,7 @@ void DatabasePostgreSQLReplica::startSynchronization() } } - LOG_TRACE(&Poco::Logger::get("PostgreSQLReplicaDatabaseEngine"), "Loaded {} tables. Starting synchronization", tables.size()); + LOG_TRACE(log, "Loaded {} tables. Starting synchronization", tables.size()); replication_handler->startup(); } @@ -173,11 +174,20 @@ StoragePtr DatabasePostgreSQLReplica::tryGetTable(const String & name, con } -/// TODO: assert called from sync thread template void DatabasePostgreSQLReplica::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) { - Base::createTable(context, name, table, query); + if (context.hasQueryContext()) + { + auto storage_set = context.getQueryContext().getQueryFactoriesInfo().storages; + if (storage_set.find("ReplacingMergeTree") != storage_set.end()) + { + Base::createTable(context, name, table, query); + return; + } + } + + LOG_WARNING(log, "Create table query allowed only for ReplacingMergeTree engine and from synchronization thread"); } @@ -188,20 +198,6 @@ void DatabasePostgreSQLReplica::dropTable(const Context & context, const S } -template -void DatabasePostgreSQLReplica::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) -{ - Base::attachTable(name, table, relative_table_path); -} - - -template -StoragePtr DatabasePostgreSQLReplica::detachTable(const String & name) -{ - return Base::detachTable(name); -} - - template void DatabasePostgreSQLReplica::drop(const Context & context) { diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.h b/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.h index a73acd7b27b..5847f47ebef 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.h +++ b/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.h @@ -53,10 +53,6 @@ public: void dropTable(const Context & context, const String & name, bool no_delay) override; - void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override; - - StoragePtr detachTable(const String & name) override; - void drop(const Context & context) override; void shutdown() override; @@ -66,6 +62,7 @@ private: void startSynchronization(); StoragePtr getStorage(const String & name); + Poco::Logger * log; const Context global_context; String metadata_path; ASTPtr database_engine_define; diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp index 3435abc1fa9..50896fa8394 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp @@ -1,4 +1,5 @@ #include "PostgreSQLReplicaConsumer.h" +#include "StoragePostgreSQLReplica.h" #include #include @@ -85,7 +86,6 @@ void PostgreSQLReplicaConsumer::stopSynchronization() void PostgreSQLReplicaConsumer::synchronizationStream() { auto start_time = std::chrono::steady_clock::now(); - LOG_TRACE(log, "Starting synchronization stream"); while (!stop_synchronization) { @@ -105,7 +105,6 @@ void PostgreSQLReplicaConsumer::synchronizationStream() void PostgreSQLReplicaConsumer::insertValue(BufferData & buffer, const std::string & value, size_t column_idx) { - LOG_TRACE(log, "INSERTING VALUE {}", value); const auto & sample = buffer.description.sample_block.getByPosition(column_idx); bool is_nullable = buffer.description.types[column_idx].second; @@ -198,21 +197,24 @@ void PostgreSQLReplicaConsumer::readTupleData( BufferData & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value) { Int16 num_columns = readInt16(message, pos, size); - /// 'n' means nullable, 'u' means TOASTed value, 't' means text formatted data - LOG_DEBUG(log, "num_columns {}", num_columns); + LOG_DEBUG(log, "number of columns {}", num_columns); + for (int column_idx = 0; column_idx < num_columns; ++column_idx) { + /// 'n' means nullable, 'u' means TOASTed value, 't' means text formatted data char identifier = readInt8(message, pos, size); Int32 col_len = readInt32(message, pos, size); String value; + for (int i = 0; i < col_len; ++i) { value += readInt8(message, pos, size); } + /// TODO: Check for null values and use insertDefaultValue insertValue(buffer, value, column_idx); - LOG_DEBUG(log, "identifier {}, col_len {}, value {}", identifier, col_len, value); + LOG_DEBUG(log, "Identifier: {}, column length: {}, value: {}", identifier, col_len, value); } switch (type) @@ -233,6 +235,9 @@ void PostgreSQLReplicaConsumer::readTupleData( } case PostgreSQLQuery::UPDATE: { + /// TODO: If table has primary key, skip old value and remove first insert with -1. + // Otherwise use replica identity full (with check) and use fisrt insert. + if (old_value) buffer.columns[num_columns]->insert(Int8(-1)); else @@ -245,33 +250,31 @@ void PostgreSQLReplicaConsumer::readTupleData( } } -/// test relation id can be shuffled ? + +/// https://www.postgresql.org/docs/13/protocol-logicalrep-message-formats.html void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replication_message, size_t size) { /// Skip '\x' size_t pos = 2; char type = readInt8(replication_message, pos, size); - LOG_TRACE(log, "TYPE: {}", type); + LOG_DEBUG(log, "Type of replication message: {}", type); + switch (type) { case 'B': // Begin { - Int64 transaction_end_lsn = readInt64(replication_message, pos, size); - Int64 transaction_commit_timestamp = readInt64(replication_message, pos, size); - LOG_DEBUG(log, "transaction lsn {}, transaction commit timespamp {}", - transaction_end_lsn, transaction_commit_timestamp); + readInt64(replication_message, pos, size); /// Int64 transaction end lsn + readInt64(replication_message, pos, size); /// Int64 transaction commit timestamp break; } case 'C': // Commit { - readInt8(replication_message, pos, size); - Int64 commit_lsn = readInt64(replication_message, pos, size); - Int64 transaction_end_lsn = readInt64(replication_message, pos, size); - /// Since postgres epoch - Int64 transaction_commit_timestamp = readInt64(replication_message, pos, size); - LOG_DEBUG(log, "commit lsn {}, transaction lsn {}, transaction commit timestamp {}", - commit_lsn, transaction_end_lsn, transaction_commit_timestamp); + readInt8(replication_message, pos, size); /// unused flags + readInt64(replication_message, pos, size); /// Int64 commit lsn + readInt64(replication_message, pos, size); /// Int64 transaction end lsn + readInt64(replication_message, pos, size); /// Int64 transaction commit timestamp + final_lsn = current_lsn; break; } @@ -280,38 +283,49 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati case 'R': // Relation { Int32 relation_id = readInt32(replication_message, pos, size); - String relation_namespace, relation_name; - readString(replication_message, pos, size, relation_namespace); + String relation_namespace, relation_name; + + readString(replication_message, pos, size, relation_namespace); readString(replication_message, pos, size, relation_name); + + /// TODO: Save relation id (unique to tables) and check if they might be shuffled in current block. + /// If shuffled, store tables based on those id's and insert accordingly. table_to_insert = relation_name; tables_to_sync.insert(table_to_insert); - LOG_DEBUG(log, "INSERTING TABLE {}", table_to_insert); Int8 replica_identity = readInt8(replication_message, pos, size); Int16 num_columns = readInt16(replication_message, pos, size); + /// TODO: If replica identity is not full, check if there will be a full columns list. LOG_DEBUG(log, - "Replication message type 'R', relation_id: {}, namespace: {}, relation name {}, replica identity {}, columns number {}", + "INFO: relation id: {}, namespace: {}, relation name: {}, replica identity: {}, columns number: {}", relation_id, relation_namespace, relation_name, replica_identity, num_columns); Int8 key; Int32 data_type_id, type_modifier; + + /// TODO: Check here if table structure has changed and, if possible, change table structure and redump table. for (uint16_t i = 0; i < num_columns; ++i) { String column_name; key = readInt8(replication_message, pos, size); readString(replication_message, pos, size, column_name); + data_type_id = readInt32(replication_message, pos, size); type_modifier = readInt32(replication_message, pos, size); - LOG_DEBUG(log, "Key {}, column name {}, data type id {}, type modifier {}", key, column_name, data_type_id, type_modifier); + + LOG_DEBUG(log, + "Key: {}, column name: {}, data type id: {}, type modifier: {}", + key, column_name, data_type_id, type_modifier); } if (storages.find(table_to_insert) == storages.end()) { - throw Exception(ErrorCodes::UNKNOWN_TABLE, - "Table {} does not exist, but is included in replication stream", table_to_insert); + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Storage for table {} does not exist, but is included in replication stream", table_to_insert); } + [[maybe_unused]] auto buffer_iter = buffers.find(table_to_insert); assert(buffer_iter != buffers.end()); @@ -324,7 +338,8 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati Int32 relation_id = readInt32(replication_message, pos, size); Int8 new_tuple = readInt8(replication_message, pos, size); - LOG_DEBUG(log, "relationID {}, newTuple {} current insert tabel {}", relation_id, new_tuple, table_to_insert); + LOG_DEBUG(log, "relationID: {}, newTuple: {}, current insert table: {}", relation_id, new_tuple, table_to_insert); + auto buffer = buffers.find(table_to_insert); if (buffer == buffers.end()) { @@ -341,14 +356,17 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati LOG_DEBUG(log, "relationID {}, key {} current insert table {}", relation_id, primary_key_or_old_tuple_data, table_to_insert); + /// TODO: Two cases: based on primary keys and based on replica identity full. + /// Add first one and add a check for second one. + auto buffer = buffers.find(table_to_insert); readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::UPDATE, true); if (pos + 1 < size) { Int8 new_tuple_data = readInt8(replication_message, pos, size); - LOG_DEBUG(log, "new tuple data {}", new_tuple_data); readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::UPDATE); + LOG_DEBUG(log, "new tuple data: {}", new_tuple_data); } break; @@ -356,11 +374,9 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati case 'D': // Delete { Int32 relation_id = readInt32(replication_message, pos, size); - //Int8 index_replica_identity = readInt8(replication_message, pos); Int8 full_replica_identity = readInt8(replication_message, pos, size); - LOG_DEBUG(log, "relationID {}, full replica identity {}", - relation_id, full_replica_identity); + LOG_DEBUG(log, "relationID: {}, full replica identity: {}", relation_id, full_replica_identity); auto buffer = buffers.find(table_to_insert); readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::DELETE); @@ -377,38 +393,32 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati void PostgreSQLReplicaConsumer::syncTables(std::shared_ptr tx) { - LOG_TRACE(log, "AVAILABLE TABLES {}", tables_to_sync.size()); for (const auto & table_name : tables_to_sync) { try { - LOG_TRACE(log, "ATTEMPT SYNCING TABLE {}", table_name); auto & buffer = buffers.find(table_name)->second; Block result_rows = buffer.description.sample_block.cloneWithColumns(std::move(buffer.columns)); if (result_rows.rows()) { - LOG_TRACE(log, "SYNCING TABLE {} rows {} max_block_size {}", table_name, result_rows.rows(), max_block_size); - metadata.commitMetadata(final_lsn, [&]() { - Context insert_context(*context); + auto storage = storages[table_name]; + + auto insert = std::make_shared(); + insert->table_id = storage->getStorageID(); + + auto insert_context(*context); insert_context.makeQueryContext(); insert_context.addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree"); - auto insert = std::make_shared(); - insert->table_id = storages[table_name]->getStorageID(); - InterpreterInsertQuery interpreter(insert, insert_context); auto block_io = interpreter.execute(); - - /// TODO: what if one block is not enough OneBlockInputStream input(result_rows); copyData(input, *block_io.out); - LOG_TRACE(log, "TABLE SYNC END"); - auto actual_lsn = advanceLSN(tx); buffer.columns = buffer.description.sample_block.cloneEmptyColumns(); @@ -422,6 +432,7 @@ void PostgreSQLReplicaConsumer::syncTables(std::shared_ptr } } + LOG_DEBUG(log, "Table sync end for {} tables", tables_to_sync.size()); tables_to_sync.clear(); tx->commit(); } @@ -429,8 +440,6 @@ void PostgreSQLReplicaConsumer::syncTables(std::shared_ptr String PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr tx) { - LOG_TRACE(log, "CURRENT LSN FROM TO {}", final_lsn); - std::string query_str = fmt::format("SELECT end_lsn FROM pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn); pqxx::result result{tx->exec(query_str)}; @@ -468,7 +477,6 @@ bool PostgreSQLReplicaConsumer::readFromReplicationSlot() if (!row) { - LOG_TRACE(log, "STREAM REPLICATION END"); stream.complete(); if (slot_empty) @@ -481,19 +489,17 @@ bool PostgreSQLReplicaConsumer::readFromReplicationSlot() } slot_empty = false; - current_lsn = (*row)[0]; - LOG_TRACE(log, "Replication message: {}", (*row)[1]); + processReplicationMessage((*row)[1].c_str(), (*row)[1].size()); } } catch (const pqxx::sql_error & e) { - /// sql replication interface has the problem that it registers relcache + /// For now sql replication interface is used and it has the problem that it registers relcache /// callbacks on each pg_logical_slot_get_changes and there is no way to invalidate them: /// https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c#L1128 /// So at some point will get out of limit and then they will be cleaned. - std::string error_message = e.what(); if (error_message.find("out of relcache_callback_list slots") == std::string::npos) tryLogCurrentException(__PRETTY_FUNCTION__); diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.cpp index 9cd5f368a6d..a5ae25c3f53 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.cpp @@ -27,7 +27,6 @@ PostgreSQLReplicaMetadata::PostgreSQLReplicaMetadata(const std::string & metadat void PostgreSQLReplicaMetadata::readMetadata() { - LOG_DEBUG(&Poco::Logger::get("PostgreSQLReplicaMetadata"), "kssenii 1 {}", metadata_file); if (Poco::File(metadata_file).exists()) { ReadBufferFromFile in(metadata_file, DBMS_DEFAULT_BUFFER_SIZE); diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index dd961186494..5a1ab778382 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -22,7 +22,7 @@ namespace ErrorCodes static const auto reschedule_ms = 500; -/// TODO: context should be const +/// TODO: add test for syncing only subset of databse tables PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( const std::string & database_name_, @@ -81,7 +81,6 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart() } catch (...) { - /// TODO: throw tryLogCurrentException(__PRETTY_FUNCTION__); } } @@ -101,7 +100,6 @@ bool PostgreSQLReplicationHandler::isPublicationExist(std::shared_ptr() == "t"); - /// TODO: check if publication is still valid? if (publication_exists) LOG_TRACE(log, "Publication {} already exists. Using existing version", publication_name); @@ -131,26 +129,26 @@ void PostgreSQLReplicationHandler::createPublication(std::shared_ptr e.addMessage("while creating pg_publication"); throw; } - - /// TODO: check replica identity? - /// Requires changed replica identity for included table to be able to receive old values of updated rows. } void PostgreSQLReplicationHandler::startSynchronization() { - /// used commands require a specific transaction isolation mode. + /// Used commands require a specific transaction isolation mode. replication_connection->conn()->set_variable("default_transaction_isolation", "'repeatable read'"); auto tx = std::make_shared(*replication_connection->conn()); + bool new_publication = false; + if (publication_name.empty()) { publication_name = fmt::format("{}_ch_publication", database_name); - /// Publication defines what tables are included into replication stream. Should be deleted only if MaterializePostgreSQL - /// table is dropped. if (!isPublicationExist(tx)) + { createPublication(tx); + new_publication = true; + } } else if (!isPublicationExist(tx)) { @@ -175,9 +173,11 @@ void PostgreSQLReplicationHandler::startSynchronization() { initial_sync(); } - else if (!Poco::File(metadata_path).exists()) + else if (!Poco::File(metadata_path).exists() || new_publication) { - /// If replication slot exists and metadata file (where last synced version is written) does not exist, it is not normal. + /// In case of some failure, the following cases are possible (since publication and replication slot are reused): + /// 1. If replication slot exists and metadata file (where last synced version is written) does not exist, it is not ok. + /// 2. If created a new publication and replication slot existed before it was created, it is not ok. dropReplicationSlot(ntx, replication_slot); initial_sync(); } @@ -210,8 +210,6 @@ void PostgreSQLReplicationHandler::startSynchronization() void PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_name) { - LOG_DEBUG(log, "Creating transaction snapshot"); - for (const auto & storage_data : storages) { try @@ -231,12 +229,9 @@ void PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_name) /// Already connected to needed database, no need to add it to query. query_str = fmt::format("SELECT * FROM {}", storage_data.first); - Context insert_context(*context); - insert_context.makeQueryContext(); - insert_context.addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree"); - auto insert = std::make_shared(); insert->table_id = nested_storage->getStorageID(); + auto insert_context = storage_data.second->makeNestedTableContext(); InterpreterInsertQuery interpreter(insert, insert_context); auto block_io = interpreter.execute(); @@ -259,7 +254,7 @@ void PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_name) } } - LOG_DEBUG(log, "Done loading from snapshot"); + LOG_DEBUG(log, "Table dump end"); } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 5a44215a612..e36e1cd0490 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -40,12 +40,15 @@ private: using NontransactionPtr = std::shared_ptr; bool isPublicationExist(std::shared_ptr tx); + bool isReplicationSlotExist(NontransactionPtr ntx, std::string & slot_name); void createPublication(std::shared_ptr tx); + void createReplicationSlot(NontransactionPtr ntx, std::string & start_lsn, std::string & snapshot_name); void dropReplicationSlot(NontransactionPtr tx, std::string & slot_name); + void dropPublication(NontransactionPtr ntx); void waitConnectionAndStart(); diff --git a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp index 7def1a317be..e45d678ba0b 100644 --- a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp +++ b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp @@ -252,23 +252,23 @@ void StoragePostgreSQLReplica::createNestedIfNeeded(const std::functiondatabase = table_id.database_name; ast_drop->if_exists = true; - auto drop_context(*global_context); - drop_context.makeQueryContext(); - - auto interpreter = InterpreterDropQuery(ast_drop, drop_context); + auto context = makeNestedTableContext(); + auto interpreter = InterpreterDropQuery(ast_drop, context); interpreter.execute(); } diff --git a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h index e8de30afeb2..4059cb744e6 100644 --- a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h +++ b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h @@ -58,7 +58,10 @@ public: /// Throw if impossible to get StoragePtr getNested(); + Context makeNestedTableContext() const; + void setNestedLoaded() { nested_loaded.store(true); } + bool isNestedLoaded() { return nested_loaded.load(); } protected: @@ -81,8 +84,6 @@ private: std::string getNestedTableName() const; - Context makeGetNestedTableContext() const; - void dropNested(); std::string remote_table_name;