diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp index 017613500ad..5010f574555 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp @@ -2,9 +2,19 @@ #include #include +#include +#include +#include +#include +#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer( const std::string & table_name_, @@ -23,55 +33,194 @@ PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer( } +void PostgreSQLReplicaConsumer::readString(const char * message, size_t & pos, size_t size, String & result) +{ + assert(size > pos + 2); + char current = unhex2(message + pos); + pos += 2; + while (pos < size && current != '\0') + { + result += current; + current = unhex2(message + pos); + pos += 2; + } +} + + +Int32 PostgreSQLReplicaConsumer::readInt32(const char * message, size_t & pos) +{ + assert(size > pos + 8); + Int32 result = (UInt32(unhex2(message + pos)) << 24) + | (UInt32(unhex2(message + pos + 2)) << 16) + | (UInt32(unhex2(message + pos + 4)) << 8) + | (UInt32(unhex2(message + pos + 6))); + pos += 8; + return result; +} + + +Int16 PostgreSQLReplicaConsumer::readInt16(const char * message, size_t & pos) +{ + assert(size > pos + 4); + Int16 result = (UInt32(unhex2(message + pos)) << 8) + | (UInt32(unhex2(message + pos + 2))); + pos += 4; + return result; +} + + +Int8 PostgreSQLReplicaConsumer::readInt8(const char * message, size_t & pos) +{ + assert(size > pos + 2); + Int8 result = unhex2(message + pos); + pos += 2; + return result; +} + + +Int64 PostgreSQLReplicaConsumer::readInt64(const char * message, size_t & pos) +{ + assert(size > pos + 16); + Int64 result = (UInt64(unhex4(message + pos)) << 48) + | (UInt64(unhex4(message + pos + 4)) << 32) + | (UInt64(unhex4(message + pos + 8)) << 16) + | (UInt64(unhex4(message + pos + 12))); + pos += 16; + return result; +} + + +void PostgreSQLReplicaConsumer::readTupleData(const char * message, size_t & pos, size_t /* size */) +{ + Int16 num_columns = readInt16(message, pos); + /// 'n' means nullable, 'u' means TOASTed value, 't' means text formatted data + LOG_DEBUG(log, "num_columns {}", num_columns); + for (int k = 0; k < num_columns; ++k) + { + char identifier = readInt8(message, pos); + Int32 col_len = readInt32(message, pos); + String result; + for (int i = 0; i < col_len; ++i) + { + result += readInt8(message, pos); + } + LOG_DEBUG(log, "identifier {}, col_len {}, result {}", identifier, col_len, result); + } + //readString(message, pos, size, result); +} + + +void PostgreSQLReplicaConsumer::decodeReplicationMessage(const char * replication_message, size_t size) +{ + /// Skip '\x' + size_t pos = 2; + char type = readInt8(replication_message, pos); + + LOG_TRACE(log, "TYPE: {}", type); + switch (type) + { + case 'B': // Begin + { + Int64 transaction_end_lsn = readInt64(replication_message, pos); + Int64 transaction_commit_timestamp = readInt64(replication_message, pos); + LOG_DEBUG(log, "transaction lsn {}, transaction commit timespamp {}", + transaction_end_lsn, transaction_commit_timestamp); + break; + } + case 'C': // Commit + { + readInt8(replication_message, pos); + Int64 commit_lsn = readInt64(replication_message, pos); + Int64 transaction_end_lsn = readInt64(replication_message, pos); + /// Since postgres epoch + Int64 transaction_commit_timestamp = readInt64(replication_message, pos); + LOG_DEBUG(log, "commit lsn {}, transaction lsn {}, transaction commit timestamp {}", + commit_lsn, transaction_end_lsn, transaction_commit_timestamp); + break; + } + case 'O': // Origin + break; + case 'R': // Relation + { + Int32 relation_id = readInt32(replication_message, pos); + String relation_namespace, relation_name; + readString(replication_message, pos, size, relation_namespace); + readString(replication_message, pos, size, relation_name); + Int8 replica_identity = readInt8(replication_message, pos); + Int16 num_columns = readInt16(replication_message, pos); + + LOG_DEBUG(log, + "Replication message type 'R', relation_id: {}, namespace: {}, relation name {}, replica identity {}, columns number {}", + relation_id, relation_namespace, relation_name, replica_identity, num_columns); + + Int8 key; + Int32 data_type_id, type_modifier; + for (uint16_t i = 0; i < num_columns; ++i) + { + String column_name; + key = readInt8(replication_message, pos); + readString(replication_message, pos, size, column_name); + data_type_id = readInt32(replication_message, pos); + type_modifier = readInt32(replication_message, pos); + LOG_DEBUG(log, "Key {}, column name {}, data type id {}, type modifier {}", key, column_name, data_type_id, type_modifier); + } + + break; + } + case 'Y': // Type + break; + case 'I': // Insert + { + Int32 relation_id = readInt32(replication_message, pos); + Int8 new_tuple = readInt8(replication_message, pos); + LOG_DEBUG(log, "relationID {}, newTuple {}", relation_id, new_tuple); + readTupleData(replication_message, pos, size); + break; + } + case 'U': // Update + break; + case 'D': // Delete + break; + case 'T': // Truncate + break; + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Unexpected byte1 value {} while parsing replication message", type); + } +} + + void PostgreSQLReplicaConsumer::run() { - auto options = fmt::format(" (\"proto_version\" '1', \"publication_names\" '{}')", publication_name); - startReplication(replication_slot_name, current_lsn.lsn, -1, options); -} - - -void PostgreSQLReplicaConsumer::startReplication( - const std::string & slot_name, const std::string start_lsn, const int64_t /* timeline */, const std::string & plugin_args) -{ - std::string query_str = fmt::format("START_REPLICATION SLOT {} LOGICAL {}", slot_name, start_lsn); - - if (!plugin_args.empty()) - query_str += plugin_args; - auto tx = std::make_unique(*replication_connection->conn()); - tx->exec(query_str); + /// up_to_lsn is set to NULL, up_to_n_changes is set to max_block_size. + std::string query_str = fmt::format( + "select data FROM pg_logical_slot_peek_binary_changes(" + "'{}', NULL, NULL, 'publication_names', '{}', 'proto_version', '1')", + replication_slot_name, publication_name); + pqxx::stream_from stream(*tx, pqxx::from_query, std::string_view(query_str)); - //pqxx::stream_from stream(*tx, pqxx::from_query, std::string_view(query_str)); - //pqxx::result result{tx->exec(query_str)}; - //pqxx::row row{result[0]}; - //for (auto res : row) - //{ - // if (std::size(res)) - // LOG_TRACE(log, "GOT {}", res.as()); - // else - // LOG_TRACE(log, "GOT NULL"); - //} + while (true) + { + const std::vector * row{stream.read_row()}; - // while (true) - // { - // const std::vector * row{stream.read_row()}; + if (!row) + { + LOG_TRACE(log, "STREAM REPLICATION END"); + stream.complete(); + tx->commit(); + break; + } - // if (!row) - // { - // LOG_TRACE(log, "STREAM REPLICATION END"); - // stream.complete(); - // tx->commit(); - // break; - // } - // LOG_TRACE(log, "STARTED REPLICATION. GOT ROW SIZE", row->size()); - - // for (const auto idx : ext::range(0, row->size())) - // { - // auto current = (*row)[idx]; - // LOG_TRACE(log, "Started replication. GOT: {}", current); - // } - - //} + for (const auto idx : ext::range(0, row->size())) + { + LOG_TRACE(log, "Replication message: {}", (*row)[idx]); + decodeReplicationMessage((*row)[idx].c_str(), (*row)[idx].size()); + } + } } + } + + diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h index 800f444765a..e2833676412 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h @@ -1,6 +1,6 @@ #pragma once -#include +#include "PostgreSQLConnection.h" #include "PostgreSQLReplicationHandler.h" #include "pqxx/pqxx" @@ -18,10 +18,19 @@ public: const LSNPosition & start_lsn); void run(); + void createSubscription(); private: + void readString(const char * message, size_t & pos, size_t size, String & result); + Int64 readInt64(const char * message, size_t & pos); + Int32 readInt32(const char * message, size_t & pos); + Int16 readInt16(const char * message, size_t & pos); + Int8 readInt8(const char * message, size_t & pos); + void readTupleData(const char * message, size_t & pos, size_t size); + void startReplication( const std::string & slot_name, const std::string start_lsn, const int64_t timeline, const std::string & plugin_args); + void decodeReplicationMessage(const char * replication_message, size_t size); Poco::Logger * log; const std::string replication_slot_name; diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 7fbcf1d9ff2..befaf17db11 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -35,11 +35,11 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( /// Used commands require a specific transaction isolation mode. replication_connection->conn()->set_variable("default_transaction_isolation", "'repeatable read'"); - /// Non temporary replication slot should be the same at restart. + /// Non temporary replication slot. Should be the same at restart. if (replication_slot.empty()) replication_slot = fmt::format("{}_{}_ch_replication_slot", database_name, table_name); - /// Temporary replication slot is used to determine a start lsn position and to acquire a snapshot for initial table synchronization. + /// Temporary replication slot is used to acquire a snapshot for initial table synchronization and to determine starting lsn position. temp_replication_slot = replication_slot + "_temp"; } @@ -51,6 +51,8 @@ void PostgreSQLReplicationHandler::startup() { publication_name = fmt::format("{}_{}_ch_publication", database_name, table_name); + /// Publication defines what tables are included into replication stream. Should be deleted only if MaterializePostgreSQL + /// table is dropped. if (!isPublicationExist()) createPublication(); } @@ -70,6 +72,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist() { std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name); pqxx::result result{tx->exec(query_str)}; + assert(!result.empty()); bool publication_exists = (result[0][0].as() == "t"); if (publication_exists) @@ -81,11 +84,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist() void PostgreSQLReplicationHandler::createPublication() { - /* * It is also important that change replica identity for this table to be able to receive old values of updated rows: - * ALTER TABLE pgbench_accounts REPLICA IDENTITY FULL; - * * TRUNCATE and DDL are not included in PUBLICATION. - * * 'ONLY' means just a table, without descendants. - */ + /// 'ONLY' means just a table, without descendants. std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", publication_name, table_name); try { @@ -96,6 +95,10 @@ void PostgreSQLReplicationHandler::createPublication() { throw Exception(fmt::format("PostgreSQL table {}.{} does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE); } + + /// TODO: check replica identity + /// Requires changed replica identity for included table to be able to receive old values of updated rows. + /// (ALTER TABLE table_name REPLICA IDENTITY FULL) } @@ -103,7 +106,7 @@ void PostgreSQLReplicationHandler::startReplication() { auto ntx = std::make_shared(*replication_connection->conn()); - /// But it should not actually exist. May exist if failed to drop it before. + /// Normally temporary replication slot should not exist. if (isReplicationSlotExist(ntx, temp_replication_slot)) dropReplicationSlot(ntx, temp_replication_slot, true); @@ -116,10 +119,9 @@ void PostgreSQLReplicationHandler::startReplication() /// Do not need this replication slot anymore (snapshot loaded and start lsn determined, will continue replication protocol /// with another slot, which should be the same at restart (and reused) to minimize memory usage) - /// Non temporary replication slot should be deleted with drop table only. - LOG_DEBUG(log, "Dropping temporaty replication slot"); dropReplicationSlot(ntx, temp_replication_slot, true); + /// Non temporary replication slot should be deleted with drop table only. if (!isReplicationSlotExist(ntx, replication_slot)) createReplicationSlot(ntx); @@ -203,6 +205,8 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr ntx, st work.exec(query_str); work.commit(); } + + LOG_TRACE(log, "Replication slot {} is dropped", slot_name); } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 06379f2ad4a..3dc94b0c776 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include "PostgreSQLConnection.h" #include "pqxx/pqxx"