Decode replication messages

This commit is contained in:
kssenii 2021-01-19 15:29:22 +00:00
parent 84ffd76853
commit a1bcc5fb39
4 changed files with 217 additions and 55 deletions

View File

@ -2,9 +2,19 @@
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <ext/range.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/FieldVisitors.h>
#include <Common/hex.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer(
const std::string & table_name_,
@ -23,55 +33,194 @@ PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer(
}
void PostgreSQLReplicaConsumer::readString(const char * message, size_t & pos, size_t size, String & result)
{
assert(size > pos + 2);
char current = unhex2(message + pos);
pos += 2;
while (pos < size && current != '\0')
{
result += current;
current = unhex2(message + pos);
pos += 2;
}
}
Int32 PostgreSQLReplicaConsumer::readInt32(const char * message, size_t & pos)
{
assert(size > pos + 8);
Int32 result = (UInt32(unhex2(message + pos)) << 24)
| (UInt32(unhex2(message + pos + 2)) << 16)
| (UInt32(unhex2(message + pos + 4)) << 8)
| (UInt32(unhex2(message + pos + 6)));
pos += 8;
return result;
}
Int16 PostgreSQLReplicaConsumer::readInt16(const char * message, size_t & pos)
{
assert(size > pos + 4);
Int16 result = (UInt32(unhex2(message + pos)) << 8)
| (UInt32(unhex2(message + pos + 2)));
pos += 4;
return result;
}
Int8 PostgreSQLReplicaConsumer::readInt8(const char * message, size_t & pos)
{
assert(size > pos + 2);
Int8 result = unhex2(message + pos);
pos += 2;
return result;
}
Int64 PostgreSQLReplicaConsumer::readInt64(const char * message, size_t & pos)
{
assert(size > pos + 16);
Int64 result = (UInt64(unhex4(message + pos)) << 48)
| (UInt64(unhex4(message + pos + 4)) << 32)
| (UInt64(unhex4(message + pos + 8)) << 16)
| (UInt64(unhex4(message + pos + 12)));
pos += 16;
return result;
}
void PostgreSQLReplicaConsumer::readTupleData(const char * message, size_t & pos, size_t /* size */)
{
Int16 num_columns = readInt16(message, pos);
/// 'n' means nullable, 'u' means TOASTed value, 't' means text formatted data
LOG_DEBUG(log, "num_columns {}", num_columns);
for (int k = 0; k < num_columns; ++k)
{
char identifier = readInt8(message, pos);
Int32 col_len = readInt32(message, pos);
String result;
for (int i = 0; i < col_len; ++i)
{
result += readInt8(message, pos);
}
LOG_DEBUG(log, "identifier {}, col_len {}, result {}", identifier, col_len, result);
}
//readString(message, pos, size, result);
}
void PostgreSQLReplicaConsumer::decodeReplicationMessage(const char * replication_message, size_t size)
{
/// Skip '\x'
size_t pos = 2;
char type = readInt8(replication_message, pos);
LOG_TRACE(log, "TYPE: {}", type);
switch (type)
{
case 'B': // Begin
{
Int64 transaction_end_lsn = readInt64(replication_message, pos);
Int64 transaction_commit_timestamp = readInt64(replication_message, pos);
LOG_DEBUG(log, "transaction lsn {}, transaction commit timespamp {}",
transaction_end_lsn, transaction_commit_timestamp);
break;
}
case 'C': // Commit
{
readInt8(replication_message, pos);
Int64 commit_lsn = readInt64(replication_message, pos);
Int64 transaction_end_lsn = readInt64(replication_message, pos);
/// Since postgres epoch
Int64 transaction_commit_timestamp = readInt64(replication_message, pos);
LOG_DEBUG(log, "commit lsn {}, transaction lsn {}, transaction commit timestamp {}",
commit_lsn, transaction_end_lsn, transaction_commit_timestamp);
break;
}
case 'O': // Origin
break;
case 'R': // Relation
{
Int32 relation_id = readInt32(replication_message, pos);
String relation_namespace, relation_name;
readString(replication_message, pos, size, relation_namespace);
readString(replication_message, pos, size, relation_name);
Int8 replica_identity = readInt8(replication_message, pos);
Int16 num_columns = readInt16(replication_message, pos);
LOG_DEBUG(log,
"Replication message type 'R', relation_id: {}, namespace: {}, relation name {}, replica identity {}, columns number {}",
relation_id, relation_namespace, relation_name, replica_identity, num_columns);
Int8 key;
Int32 data_type_id, type_modifier;
for (uint16_t i = 0; i < num_columns; ++i)
{
String column_name;
key = readInt8(replication_message, pos);
readString(replication_message, pos, size, column_name);
data_type_id = readInt32(replication_message, pos);
type_modifier = readInt32(replication_message, pos);
LOG_DEBUG(log, "Key {}, column name {}, data type id {}, type modifier {}", key, column_name, data_type_id, type_modifier);
}
break;
}
case 'Y': // Type
break;
case 'I': // Insert
{
Int32 relation_id = readInt32(replication_message, pos);
Int8 new_tuple = readInt8(replication_message, pos);
LOG_DEBUG(log, "relationID {}, newTuple {}", relation_id, new_tuple);
readTupleData(replication_message, pos, size);
break;
}
case 'U': // Update
break;
case 'D': // Delete
break;
case 'T': // Truncate
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Unexpected byte1 value {} while parsing replication message", type);
}
}
void PostgreSQLReplicaConsumer::run()
{
auto options = fmt::format(" (\"proto_version\" '1', \"publication_names\" '{}')", publication_name);
startReplication(replication_slot_name, current_lsn.lsn, -1, options);
}
void PostgreSQLReplicaConsumer::startReplication(
const std::string & slot_name, const std::string start_lsn, const int64_t /* timeline */, const std::string & plugin_args)
{
std::string query_str = fmt::format("START_REPLICATION SLOT {} LOGICAL {}", slot_name, start_lsn);
if (!plugin_args.empty())
query_str += plugin_args;
auto tx = std::make_unique<pqxx::nontransaction>(*replication_connection->conn());
tx->exec(query_str);
/// up_to_lsn is set to NULL, up_to_n_changes is set to max_block_size.
std::string query_str = fmt::format(
"select data FROM pg_logical_slot_peek_binary_changes("
"'{}', NULL, NULL, 'publication_names', '{}', 'proto_version', '1')",
replication_slot_name, publication_name);
pqxx::stream_from stream(*tx, pqxx::from_query, std::string_view(query_str));
//pqxx::stream_from stream(*tx, pqxx::from_query, std::string_view(query_str));
//pqxx::result result{tx->exec(query_str)};
//pqxx::row row{result[0]};
//for (auto res : row)
//{
// if (std::size(res))
// LOG_TRACE(log, "GOT {}", res.as<std::string>());
// else
// LOG_TRACE(log, "GOT NULL");
//}
while (true)
{
const std::vector<pqxx::zview> * row{stream.read_row()};
// while (true)
// {
// const std::vector<pqxx::zview> * row{stream.read_row()};
if (!row)
{
LOG_TRACE(log, "STREAM REPLICATION END");
stream.complete();
tx->commit();
break;
}
// if (!row)
// {
// LOG_TRACE(log, "STREAM REPLICATION END");
// stream.complete();
// tx->commit();
// break;
// }
// LOG_TRACE(log, "STARTED REPLICATION. GOT ROW SIZE", row->size());
// for (const auto idx : ext::range(0, row->size()))
// {
// auto current = (*row)[idx];
// LOG_TRACE(log, "Started replication. GOT: {}", current);
// }
//}
for (const auto idx : ext::range(0, row->size()))
{
LOG_TRACE(log, "Replication message: {}", (*row)[idx]);
decodeReplicationMessage((*row)[idx].c_str(), (*row)[idx].size());
}
}
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <Storages/StoragePostgreSQL.h>
#include "PostgreSQLConnection.h"
#include "PostgreSQLReplicationHandler.h"
#include "pqxx/pqxx"
@ -18,10 +18,19 @@ public:
const LSNPosition & start_lsn);
void run();
void createSubscription();
private:
void readString(const char * message, size_t & pos, size_t size, String & result);
Int64 readInt64(const char * message, size_t & pos);
Int32 readInt32(const char * message, size_t & pos);
Int16 readInt16(const char * message, size_t & pos);
Int8 readInt8(const char * message, size_t & pos);
void readTupleData(const char * message, size_t & pos, size_t size);
void startReplication(
const std::string & slot_name, const std::string start_lsn, const int64_t timeline, const std::string & plugin_args);
void decodeReplicationMessage(const char * replication_message, size_t size);
Poco::Logger * log;
const std::string replication_slot_name;

View File

@ -35,11 +35,11 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
/// Used commands require a specific transaction isolation mode.
replication_connection->conn()->set_variable("default_transaction_isolation", "'repeatable read'");
/// Non temporary replication slot should be the same at restart.
/// Non temporary replication slot. Should be the same at restart.
if (replication_slot.empty())
replication_slot = fmt::format("{}_{}_ch_replication_slot", database_name, table_name);
/// Temporary replication slot is used to determine a start lsn position and to acquire a snapshot for initial table synchronization.
/// Temporary replication slot is used to acquire a snapshot for initial table synchronization and to determine starting lsn position.
temp_replication_slot = replication_slot + "_temp";
}
@ -51,6 +51,8 @@ void PostgreSQLReplicationHandler::startup()
{
publication_name = fmt::format("{}_{}_ch_publication", database_name, table_name);
/// Publication defines what tables are included into replication stream. Should be deleted only if MaterializePostgreSQL
/// table is dropped.
if (!isPublicationExist())
createPublication();
}
@ -70,6 +72,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist()
{
std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name);
pqxx::result result{tx->exec(query_str)};
assert(!result.empty());
bool publication_exists = (result[0][0].as<std::string>() == "t");
if (publication_exists)
@ -81,11 +84,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist()
void PostgreSQLReplicationHandler::createPublication()
{
/* * It is also important that change replica identity for this table to be able to receive old values of updated rows:
* ALTER TABLE pgbench_accounts REPLICA IDENTITY FULL;
* * TRUNCATE and DDL are not included in PUBLICATION.
* * 'ONLY' means just a table, without descendants.
*/
/// 'ONLY' means just a table, without descendants.
std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", publication_name, table_name);
try
{
@ -96,6 +95,10 @@ void PostgreSQLReplicationHandler::createPublication()
{
throw Exception(fmt::format("PostgreSQL table {}.{} does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
}
/// TODO: check replica identity
/// Requires changed replica identity for included table to be able to receive old values of updated rows.
/// (ALTER TABLE table_name REPLICA IDENTITY FULL)
}
@ -103,7 +106,7 @@ void PostgreSQLReplicationHandler::startReplication()
{
auto ntx = std::make_shared<pqxx::nontransaction>(*replication_connection->conn());
/// But it should not actually exist. May exist if failed to drop it before.
/// Normally temporary replication slot should not exist.
if (isReplicationSlotExist(ntx, temp_replication_slot))
dropReplicationSlot(ntx, temp_replication_slot, true);
@ -116,10 +119,9 @@ void PostgreSQLReplicationHandler::startReplication()
/// Do not need this replication slot anymore (snapshot loaded and start lsn determined, will continue replication protocol
/// with another slot, which should be the same at restart (and reused) to minimize memory usage)
/// Non temporary replication slot should be deleted with drop table only.
LOG_DEBUG(log, "Dropping temporaty replication slot");
dropReplicationSlot(ntx, temp_replication_slot, true);
/// Non temporary replication slot should be deleted with drop table only.
if (!isReplicationSlotExist(ntx, replication_slot))
createReplicationSlot(ntx);
@ -203,6 +205,8 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr ntx, st
work.exec(query_str);
work.commit();
}
LOG_TRACE(log, "Replication slot {} is dropped", slot_name);
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <common/logger_useful.h>
#include <Storages/StoragePostgreSQL.h>
#include "PostgreSQLConnection.h"
#include "pqxx/pqxx"