Slightly better

This commit is contained in:
kssenii 2021-02-12 15:48:01 +00:00
parent 010a640ed8
commit 219dece1d0
8 changed files with 99 additions and 104 deletions

View File

@ -55,6 +55,7 @@ DatabasePostgreSQLReplica<DatabaseOrdinary>::DatabasePostgreSQLReplica(
: DatabaseOrdinary(
database_name_, metadata_path_, "data/" + escapeForFileName(database_name_) + "/",
"DatabasePostgreSQLReplica<Ordinary> (" + database_name_ + ")", context)
, log(&Poco::Logger::get("PostgreSQLReplicaDatabaseEngine"))
, global_context(context.getGlobalContext())
, metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone())
@ -117,7 +118,7 @@ void DatabasePostgreSQLReplica<Base>::startSynchronization()
}
}
LOG_TRACE(&Poco::Logger::get("PostgreSQLReplicaDatabaseEngine"), "Loaded {} tables. Starting synchronization", tables.size());
LOG_TRACE(log, "Loaded {} tables. Starting synchronization", tables.size());
replication_handler->startup();
}
@ -173,11 +174,20 @@ StoragePtr DatabasePostgreSQLReplica<Base>::tryGetTable(const String & name, con
}
/// TODO: assert called from sync thread
template<typename Base>
void DatabasePostgreSQLReplica<Base>::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query)
{
Base::createTable(context, name, table, query);
if (context.hasQueryContext())
{
auto storage_set = context.getQueryContext().getQueryFactoriesInfo().storages;
if (storage_set.find("ReplacingMergeTree") != storage_set.end())
{
Base::createTable(context, name, table, query);
return;
}
}
LOG_WARNING(log, "Create table query allowed only for ReplacingMergeTree engine and from synchronization thread");
}
@ -188,20 +198,6 @@ void DatabasePostgreSQLReplica<Base>::dropTable(const Context & context, const S
}
template<typename Base>
void DatabasePostgreSQLReplica<Base>::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
{
Base::attachTable(name, table, relative_table_path);
}
template<typename Base>
StoragePtr DatabasePostgreSQLReplica<Base>::detachTable(const String & name)
{
return Base::detachTable(name);
}
template<typename Base>
void DatabasePostgreSQLReplica<Base>::drop(const Context & context)
{

View File

@ -53,10 +53,6 @@ public:
void dropTable(const Context & context, const String & name, bool no_delay) override;
void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(const String & name) override;
void drop(const Context & context) override;
void shutdown() override;
@ -66,6 +62,7 @@ private:
void startSynchronization();
StoragePtr getStorage(const String & name);
Poco::Logger * log;
const Context global_context;
String metadata_path;
ASTPtr database_engine_define;

View File

@ -1,4 +1,5 @@
#include "PostgreSQLReplicaConsumer.h"
#include "StoragePostgreSQLReplica.h"
#include <Columns/ColumnNullable.h>
#include <Common/hex.h>
@ -85,7 +86,6 @@ void PostgreSQLReplicaConsumer::stopSynchronization()
void PostgreSQLReplicaConsumer::synchronizationStream()
{
auto start_time = std::chrono::steady_clock::now();
LOG_TRACE(log, "Starting synchronization stream");
while (!stop_synchronization)
{
@ -105,7 +105,6 @@ void PostgreSQLReplicaConsumer::synchronizationStream()
void PostgreSQLReplicaConsumer::insertValue(BufferData & buffer, const std::string & value, size_t column_idx)
{
LOG_TRACE(log, "INSERTING VALUE {}", value);
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
bool is_nullable = buffer.description.types[column_idx].second;
@ -198,21 +197,24 @@ void PostgreSQLReplicaConsumer::readTupleData(
BufferData & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value)
{
Int16 num_columns = readInt16(message, pos, size);
/// 'n' means nullable, 'u' means TOASTed value, 't' means text formatted data
LOG_DEBUG(log, "num_columns {}", num_columns);
LOG_DEBUG(log, "number of columns {}", num_columns);
for (int column_idx = 0; column_idx < num_columns; ++column_idx)
{
/// 'n' means nullable, 'u' means TOASTed value, 't' means text formatted data
char identifier = readInt8(message, pos, size);
Int32 col_len = readInt32(message, pos, size);
String value;
for (int i = 0; i < col_len; ++i)
{
value += readInt8(message, pos, size);
}
/// TODO: Check for null values and use insertDefaultValue
insertValue(buffer, value, column_idx);
LOG_DEBUG(log, "identifier {}, col_len {}, value {}", identifier, col_len, value);
LOG_DEBUG(log, "Identifier: {}, column length: {}, value: {}", identifier, col_len, value);
}
switch (type)
@ -233,6 +235,9 @@ void PostgreSQLReplicaConsumer::readTupleData(
}
case PostgreSQLQuery::UPDATE:
{
/// TODO: If table has primary key, skip old value and remove first insert with -1.
// Otherwise use replica identity full (with check) and use fisrt insert.
if (old_value)
buffer.columns[num_columns]->insert(Int8(-1));
else
@ -245,33 +250,31 @@ void PostgreSQLReplicaConsumer::readTupleData(
}
}
/// test relation id can be shuffled ?
/// https://www.postgresql.org/docs/13/protocol-logicalrep-message-formats.html
void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replication_message, size_t size)
{
/// Skip '\x'
size_t pos = 2;
char type = readInt8(replication_message, pos, size);
LOG_TRACE(log, "TYPE: {}", type);
LOG_DEBUG(log, "Type of replication message: {}", type);
switch (type)
{
case 'B': // Begin
{
Int64 transaction_end_lsn = readInt64(replication_message, pos, size);
Int64 transaction_commit_timestamp = readInt64(replication_message, pos, size);
LOG_DEBUG(log, "transaction lsn {}, transaction commit timespamp {}",
transaction_end_lsn, transaction_commit_timestamp);
readInt64(replication_message, pos, size); /// Int64 transaction end lsn
readInt64(replication_message, pos, size); /// Int64 transaction commit timestamp
break;
}
case 'C': // Commit
{
readInt8(replication_message, pos, size);
Int64 commit_lsn = readInt64(replication_message, pos, size);
Int64 transaction_end_lsn = readInt64(replication_message, pos, size);
/// Since postgres epoch
Int64 transaction_commit_timestamp = readInt64(replication_message, pos, size);
LOG_DEBUG(log, "commit lsn {}, transaction lsn {}, transaction commit timestamp {}",
commit_lsn, transaction_end_lsn, transaction_commit_timestamp);
readInt8(replication_message, pos, size); /// unused flags
readInt64(replication_message, pos, size); /// Int64 commit lsn
readInt64(replication_message, pos, size); /// Int64 transaction end lsn
readInt64(replication_message, pos, size); /// Int64 transaction commit timestamp
final_lsn = current_lsn;
break;
}
@ -280,38 +283,49 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati
case 'R': // Relation
{
Int32 relation_id = readInt32(replication_message, pos, size);
String relation_namespace, relation_name;
readString(replication_message, pos, size, relation_namespace);
String relation_namespace, relation_name;
readString(replication_message, pos, size, relation_namespace);
readString(replication_message, pos, size, relation_name);
/// TODO: Save relation id (unique to tables) and check if they might be shuffled in current block.
/// If shuffled, store tables based on those id's and insert accordingly.
table_to_insert = relation_name;
tables_to_sync.insert(table_to_insert);
LOG_DEBUG(log, "INSERTING TABLE {}", table_to_insert);
Int8 replica_identity = readInt8(replication_message, pos, size);
Int16 num_columns = readInt16(replication_message, pos, size);
/// TODO: If replica identity is not full, check if there will be a full columns list.
LOG_DEBUG(log,
"Replication message type 'R', relation_id: {}, namespace: {}, relation name {}, replica identity {}, columns number {}",
"INFO: relation id: {}, namespace: {}, relation name: {}, replica identity: {}, columns number: {}",
relation_id, relation_namespace, relation_name, replica_identity, num_columns);
Int8 key;
Int32 data_type_id, type_modifier;
/// TODO: Check here if table structure has changed and, if possible, change table structure and redump table.
for (uint16_t i = 0; i < num_columns; ++i)
{
String column_name;
key = readInt8(replication_message, pos, size);
readString(replication_message, pos, size, column_name);
data_type_id = readInt32(replication_message, pos, size);
type_modifier = readInt32(replication_message, pos, size);
LOG_DEBUG(log, "Key {}, column name {}, data type id {}, type modifier {}", key, column_name, data_type_id, type_modifier);
LOG_DEBUG(log,
"Key: {}, column name: {}, data type id: {}, type modifier: {}",
key, column_name, data_type_id, type_modifier);
}
if (storages.find(table_to_insert) == storages.end())
{
throw Exception(ErrorCodes::UNKNOWN_TABLE,
"Table {} does not exist, but is included in replication stream", table_to_insert);
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Storage for table {} does not exist, but is included in replication stream", table_to_insert);
}
[[maybe_unused]] auto buffer_iter = buffers.find(table_to_insert);
assert(buffer_iter != buffers.end());
@ -324,7 +338,8 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati
Int32 relation_id = readInt32(replication_message, pos, size);
Int8 new_tuple = readInt8(replication_message, pos, size);
LOG_DEBUG(log, "relationID {}, newTuple {} current insert tabel {}", relation_id, new_tuple, table_to_insert);
LOG_DEBUG(log, "relationID: {}, newTuple: {}, current insert table: {}", relation_id, new_tuple, table_to_insert);
auto buffer = buffers.find(table_to_insert);
if (buffer == buffers.end())
{
@ -341,14 +356,17 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati
LOG_DEBUG(log, "relationID {}, key {} current insert table {}", relation_id, primary_key_or_old_tuple_data, table_to_insert);
/// TODO: Two cases: based on primary keys and based on replica identity full.
/// Add first one and add a check for second one.
auto buffer = buffers.find(table_to_insert);
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::UPDATE, true);
if (pos + 1 < size)
{
Int8 new_tuple_data = readInt8(replication_message, pos, size);
LOG_DEBUG(log, "new tuple data {}", new_tuple_data);
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::UPDATE);
LOG_DEBUG(log, "new tuple data: {}", new_tuple_data);
}
break;
@ -356,11 +374,9 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati
case 'D': // Delete
{
Int32 relation_id = readInt32(replication_message, pos, size);
//Int8 index_replica_identity = readInt8(replication_message, pos);
Int8 full_replica_identity = readInt8(replication_message, pos, size);
LOG_DEBUG(log, "relationID {}, full replica identity {}",
relation_id, full_replica_identity);
LOG_DEBUG(log, "relationID: {}, full replica identity: {}", relation_id, full_replica_identity);
auto buffer = buffers.find(table_to_insert);
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::DELETE);
@ -377,38 +393,32 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati
void PostgreSQLReplicaConsumer::syncTables(std::shared_ptr<pqxx::nontransaction> tx)
{
LOG_TRACE(log, "AVAILABLE TABLES {}", tables_to_sync.size());
for (const auto & table_name : tables_to_sync)
{
try
{
LOG_TRACE(log, "ATTEMPT SYNCING TABLE {}", table_name);
auto & buffer = buffers.find(table_name)->second;
Block result_rows = buffer.description.sample_block.cloneWithColumns(std::move(buffer.columns));
if (result_rows.rows())
{
LOG_TRACE(log, "SYNCING TABLE {} rows {} max_block_size {}", table_name, result_rows.rows(), max_block_size);
metadata.commitMetadata(final_lsn, [&]()
{
Context insert_context(*context);
auto storage = storages[table_name];
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID();
auto insert_context(*context);
insert_context.makeQueryContext();
insert_context.addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storages[table_name]->getStorageID();
InterpreterInsertQuery interpreter(insert, insert_context);
auto block_io = interpreter.execute();
/// TODO: what if one block is not enough
OneBlockInputStream input(result_rows);
copyData(input, *block_io.out);
LOG_TRACE(log, "TABLE SYNC END");
auto actual_lsn = advanceLSN(tx);
buffer.columns = buffer.description.sample_block.cloneEmptyColumns();
@ -422,6 +432,7 @@ void PostgreSQLReplicaConsumer::syncTables(std::shared_ptr<pqxx::nontransaction>
}
}
LOG_DEBUG(log, "Table sync end for {} tables", tables_to_sync.size());
tables_to_sync.clear();
tx->commit();
}
@ -429,8 +440,6 @@ void PostgreSQLReplicaConsumer::syncTables(std::shared_ptr<pqxx::nontransaction>
String PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr<pqxx::nontransaction> tx)
{
LOG_TRACE(log, "CURRENT LSN FROM TO {}", final_lsn);
std::string query_str = fmt::format("SELECT end_lsn FROM pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn);
pqxx::result result{tx->exec(query_str)};
@ -468,7 +477,6 @@ bool PostgreSQLReplicaConsumer::readFromReplicationSlot()
if (!row)
{
LOG_TRACE(log, "STREAM REPLICATION END");
stream.complete();
if (slot_empty)
@ -481,19 +489,17 @@ bool PostgreSQLReplicaConsumer::readFromReplicationSlot()
}
slot_empty = false;
current_lsn = (*row)[0];
LOG_TRACE(log, "Replication message: {}", (*row)[1]);
processReplicationMessage((*row)[1].c_str(), (*row)[1].size());
}
}
catch (const pqxx::sql_error & e)
{
/// sql replication interface has the problem that it registers relcache
/// For now sql replication interface is used and it has the problem that it registers relcache
/// callbacks on each pg_logical_slot_get_changes and there is no way to invalidate them:
/// https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c#L1128
/// So at some point will get out of limit and then they will be cleaned.
std::string error_message = e.what();
if (error_message.find("out of relcache_callback_list slots") == std::string::npos)
tryLogCurrentException(__PRETTY_FUNCTION__);

View File

@ -27,7 +27,6 @@ PostgreSQLReplicaMetadata::PostgreSQLReplicaMetadata(const std::string & metadat
void PostgreSQLReplicaMetadata::readMetadata()
{
LOG_DEBUG(&Poco::Logger::get("PostgreSQLReplicaMetadata"), "kssenii 1 {}", metadata_file);
if (Poco::File(metadata_file).exists())
{
ReadBufferFromFile in(metadata_file, DBMS_DEFAULT_BUFFER_SIZE);

View File

@ -22,7 +22,7 @@ namespace ErrorCodes
static const auto reschedule_ms = 500;
/// TODO: context should be const
/// TODO: add test for syncing only subset of databse tables
PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const std::string & database_name_,
@ -81,7 +81,6 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart()
}
catch (...)
{
/// TODO: throw
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -101,7 +100,6 @@ bool PostgreSQLReplicationHandler::isPublicationExist(std::shared_ptr<pqxx::work
assert(!result.empty());
bool publication_exists = (result[0][0].as<std::string>() == "t");
/// TODO: check if publication is still valid?
if (publication_exists)
LOG_TRACE(log, "Publication {} already exists. Using existing version", publication_name);
@ -131,26 +129,26 @@ void PostgreSQLReplicationHandler::createPublication(std::shared_ptr<pqxx::work>
e.addMessage("while creating pg_publication");
throw;
}
/// TODO: check replica identity?
/// Requires changed replica identity for included table to be able to receive old values of updated rows.
}
void PostgreSQLReplicationHandler::startSynchronization()
{
/// used commands require a specific transaction isolation mode.
/// Used commands require a specific transaction isolation mode.
replication_connection->conn()->set_variable("default_transaction_isolation", "'repeatable read'");
auto tx = std::make_shared<pqxx::work>(*replication_connection->conn());
bool new_publication = false;
if (publication_name.empty())
{
publication_name = fmt::format("{}_ch_publication", database_name);
/// Publication defines what tables are included into replication stream. Should be deleted only if MaterializePostgreSQL
/// table is dropped.
if (!isPublicationExist(tx))
{
createPublication(tx);
new_publication = true;
}
}
else if (!isPublicationExist(tx))
{
@ -175,9 +173,11 @@ void PostgreSQLReplicationHandler::startSynchronization()
{
initial_sync();
}
else if (!Poco::File(metadata_path).exists())
else if (!Poco::File(metadata_path).exists() || new_publication)
{
/// If replication slot exists and metadata file (where last synced version is written) does not exist, it is not normal.
/// In case of some failure, the following cases are possible (since publication and replication slot are reused):
/// 1. If replication slot exists and metadata file (where last synced version is written) does not exist, it is not ok.
/// 2. If created a new publication and replication slot existed before it was created, it is not ok.
dropReplicationSlot(ntx, replication_slot);
initial_sync();
}
@ -210,8 +210,6 @@ void PostgreSQLReplicationHandler::startSynchronization()
void PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_name)
{
LOG_DEBUG(log, "Creating transaction snapshot");
for (const auto & storage_data : storages)
{
try
@ -231,12 +229,9 @@ void PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_name)
/// Already connected to needed database, no need to add it to query.
query_str = fmt::format("SELECT * FROM {}", storage_data.first);
Context insert_context(*context);
insert_context.makeQueryContext();
insert_context.addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = nested_storage->getStorageID();
auto insert_context = storage_data.second->makeNestedTableContext();
InterpreterInsertQuery interpreter(insert, insert_context);
auto block_io = interpreter.execute();
@ -259,7 +254,7 @@ void PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_name)
}
}
LOG_DEBUG(log, "Done loading from snapshot");
LOG_DEBUG(log, "Table dump end");
}

View File

@ -40,12 +40,15 @@ private:
using NontransactionPtr = std::shared_ptr<pqxx::nontransaction>;
bool isPublicationExist(std::shared_ptr<pqxx::work> tx);
bool isReplicationSlotExist(NontransactionPtr ntx, std::string & slot_name);
void createPublication(std::shared_ptr<pqxx::work> tx);
void createReplicationSlot(NontransactionPtr ntx, std::string & start_lsn, std::string & snapshot_name);
void dropReplicationSlot(NontransactionPtr tx, std::string & slot_name);
void dropPublication(NontransactionPtr ntx);
void waitConnectionAndStart();

View File

@ -252,23 +252,23 @@ void StoragePostgreSQLReplica::createNestedIfNeeded(const std::function<PostgreS
if (nested_storage)
return;
Context context_copy(*global_context);
auto context = makeNestedTableContext();
const auto ast_create = getCreateNestedTableQuery(fetch_table_structure);
InterpreterCreateQuery interpreter(ast_create, context_copy);
InterpreterCreateQuery interpreter(ast_create, context);
interpreter.execute();
nested_storage = getNested();
}
Context StoragePostgreSQLReplica::makeGetNestedTableContext() const
Context StoragePostgreSQLReplica::makeNestedTableContext() const
{
auto get_context(*global_context);
get_context.makeQueryContext();
get_context.addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");
auto context(*global_context);
context.makeQueryContext();
context.addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");
return get_context;
return context;
}
@ -277,7 +277,7 @@ StoragePtr StoragePostgreSQLReplica::getNested()
if (nested_storage)
return nested_storage;
auto context = makeGetNestedTableContext();
auto context = makeNestedTableContext();
nested_storage = DatabaseCatalog::instance().getTable(
StorageID(getStorageID().database_name, getNestedTableName()), context);
@ -290,7 +290,7 @@ StoragePtr StoragePostgreSQLReplica::tryGetNested()
if (nested_storage)
return nested_storage;
auto context = makeGetNestedTableContext();
auto context = makeNestedTableContext();
nested_storage = DatabaseCatalog::instance().tryGetTable(
StorageID(getStorageID().database_name, getNestedTableName()), context);
@ -338,10 +338,8 @@ void StoragePostgreSQLReplica::dropNested()
ast_drop->database = table_id.database_name;
ast_drop->if_exists = true;
auto drop_context(*global_context);
drop_context.makeQueryContext();
auto interpreter = InterpreterDropQuery(ast_drop, drop_context);
auto context = makeNestedTableContext();
auto interpreter = InterpreterDropQuery(ast_drop, context);
interpreter.execute();
}

View File

@ -58,7 +58,10 @@ public:
/// Throw if impossible to get
StoragePtr getNested();
Context makeNestedTableContext() const;
void setNestedLoaded() { nested_loaded.store(true); }
bool isNestedLoaded() { return nested_loaded.load(); }
protected:
@ -81,8 +84,6 @@ private:
std::string getNestedTableName() const;
Context makeGetNestedTableContext() const;
void dropNested();
std::string remote_table_name;