Get rid of metadata file, rely only on lsn

This commit is contained in:
kssenii 2021-05-10 09:10:02 +00:00
parent 4ac023e511
commit ae1191d0c0
8 changed files with 54 additions and 247 deletions

View File

@ -31,8 +31,6 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
static const auto METADATA_SUFFIX = ".materialize_postgresql_metadata";
DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL(
ContextPtr context_,
const String & metadata_path_,
@ -57,7 +55,6 @@ void DatabaseMaterializePostgreSQL::startSynchronization()
remote_database_name,
database_name,
connection_info,
metadata_path + METADATA_SUFFIX,
getContext(),
settings->materialize_postgresql_max_block_size.value,
settings->materialize_postgresql_allow_automatic_update,
@ -189,12 +186,6 @@ void DatabaseMaterializePostgreSQL::drop(ContextPtr local_context)
if (replication_handler)
replication_handler->shutdownFinal();
/// Remove metadata
Poco::File metadata(getMetadataPath() + METADATA_SUFFIX);
if (metadata.exists())
metadata.remove(false);
DatabaseAtomic::drop(StorageMaterializePostgreSQL::makeNestedTableContext(local_context));
}

View File

@ -1,8 +1,6 @@
#include "MaterializePostgreSQLConsumer.h"
#if USE_LIBPQXX
#include "StorageMaterializePostgreSQL.h"
#include <Columns/ColumnNullable.h>
#include <Common/hex.h>
#include <DataStreams/copyData.h>
@ -27,7 +25,6 @@ MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer(
std::shared_ptr<postgres::Connection> connection_,
const std::string & replication_slot_name_,
const std::string & publication_name_,
const std::string & metadata_path,
const std::string & start_lsn,
const size_t max_block_size_,
bool allow_automatic_update_,
@ -36,7 +33,6 @@ MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer(
, context(context_)
, replication_slot_name(replication_slot_name_)
, publication_name(publication_name_)
, metadata(metadata_path)
, connection(connection_)
, current_lsn(start_lsn)
, max_block_size(max_block_size_)
@ -80,27 +76,6 @@ void MaterializePostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storage
}
void MaterializePostgreSQLConsumer::readMetadata()
{
try
{
metadata.readMetadata();
if (!metadata.lsn().empty())
{
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
final_lsn = metadata.lsn();
final_lsn = advanceLSN(tx);
tx->commit();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void MaterializePostgreSQLConsumer::insertValue(Buffer & buffer, const std::string & value, size_t column_idx)
{
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
@ -241,14 +216,14 @@ void MaterializePostgreSQLConsumer::readTupleData(
case PostgreSQLQuery::INSERT:
{
buffer.columns[num_columns]->insert(Int8(1));
buffer.columns[num_columns + 1]->insert(UInt64(metadata.getAndIncrementVersion()));
buffer.columns[num_columns + 1]->insert(lsn_value);
break;
}
case PostgreSQLQuery::DELETE:
{
buffer.columns[num_columns]->insert(Int8(-1));
buffer.columns[num_columns + 1]->insert(UInt64(metadata.getAndIncrementVersion()));
buffer.columns[num_columns + 1]->insert(lsn_value);
break;
}
@ -260,7 +235,7 @@ void MaterializePostgreSQLConsumer::readTupleData(
else
buffer.columns[num_columns]->insert(Int8(1));
buffer.columns[num_columns + 1]->insert(UInt64(metadata.getAndIncrementVersion()));
buffer.columns[num_columns + 1]->insert(lsn_value);
break;
}
@ -488,30 +463,27 @@ void MaterializePostgreSQLConsumer::syncTables(std::shared_ptr<pqxx::nontransact
if (result_rows.rows())
{
metadata.commitMetadata(final_lsn, [&]()
{
auto storage = storages[table_name];
auto storage = storages[table_name];
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID();
insert->columns = buffer.columnsAST;
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID();
insert->columns = buffer.columnsAST;
auto insert_context = Context::createCopy(context);
insert_context->makeQueryContext();
insert_context->addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");
auto insert_context = Context::createCopy(context);
insert_context->makeQueryContext();
insert_context->addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");
InterpreterInsertQuery interpreter(insert, insert_context, true);
auto block_io = interpreter.execute();
OneBlockInputStream input(result_rows);
InterpreterInsertQuery interpreter(insert, insert_context, true);
auto block_io = interpreter.execute();
OneBlockInputStream input(result_rows);
assertBlocksHaveEqualStructure(input.getHeader(), block_io.out->getHeader(), "postgresql replica table sync");
copyData(input, *block_io.out);
assertBlocksHaveEqualStructure(input.getHeader(), block_io.out->getHeader(), "postgresql replica table sync");
copyData(input, *block_io.out);
auto actual_lsn = advanceLSN(tx);
buffer.columns = buffer.description.sample_block.cloneEmptyColumns();
return actual_lsn;
});
/// The next attempt to read data will start with actual_lsn, returned from advanceLSN. current_lsn acts as
/// a version for rows in RelplacingMergeTree table.
current_lsn = advanceLSN(tx);
buffer.columns = buffer.description.sample_block.cloneEmptyColumns();
}
}
catch (...)
@ -632,6 +604,8 @@ bool MaterializePostgreSQLConsumer::readFromReplicationSlot()
slot_empty = false;
current_lsn = (*row)[0];
lsn_value = getLSNValue(current_lsn);
LOG_DEBUG(log, "Current lsn: {}, value: {}", current_lsn, lsn_value);
processReplicationMessage((*row)[1].c_str(), (*row)[1].size());
}
@ -704,7 +678,4 @@ void MaterializePostgreSQLConsumer::updateNested(const String & table_name, Stor
skip_list[table_id] = table_start_lsn;
}
}
#endif

View File

@ -1,11 +1,5 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include "MaterializePostgreSQLMetadata.h"
#include <Core/PostgreSQL/Connection.h>
#include <Core/PostgreSQL/insertPostgreSQLValue.h>
@ -15,7 +9,6 @@
#include <Storages/IStorage.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Parsers/ASTExpressionList.h>
#include "pqxx/pqxx" // Y_IGNORE
namespace DB
@ -29,16 +22,13 @@ public:
MaterializePostgreSQLConsumer(
ContextPtr context_,
std::shared_ptr<postgres::Connection> connection_,
const std::string & replication_slot_name_,
const std::string & publication_name_,
const std::string & metadata_path,
const std::string & start_lsn,
const String & replication_slot_name_,
const String & publication_name_,
const String & start_lsn,
const size_t max_block_size_,
bool allow_automatic_update_,
Storages storages_);
void readMetadata();
bool consume(std::vector<std::pair<Int32, String>> & skipped_tables);
/// Called from reloadFromSnapshot by replication handler. This method is needed to move a table back into synchronization
@ -105,14 +95,17 @@ private:
ContextPtr context;
const std::string replication_slot_name, publication_name;
MaterializePostgreSQLMetadata metadata;
std::shared_ptr<postgres::Connection> connection;
std::string current_lsn, final_lsn;
/// current_lsn converted from String to Int64 via getLSNValue().
UInt64 lsn_value;
const size_t max_block_size;
bool allow_automatic_update;
std::string table_to_insert;
String table_to_insert;
/// List of tables which need to be synced after last replication stream.
std::unordered_set<std::string> tables_to_sync;
@ -147,7 +140,4 @@ private:
/// i.e. we will not miss the first start_lsn position for reloaded table.
std::unordered_map<Int32, String> skip_list;
};
}
#endif

View File

@ -1,100 +0,0 @@
#include "MaterializePostgreSQLMetadata.h"
#if USE_LIBPQXX
#include <Poco/File.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <common/logger_useful.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
namespace DB
{
MaterializePostgreSQLMetadata::MaterializePostgreSQLMetadata(const std::string & metadata_file_path)
: metadata_file(metadata_file_path)
, tmp_metadata_file(metadata_file_path + ".tmp")
, last_version(1)
{
}
void MaterializePostgreSQLMetadata::readMetadata()
{
if (Poco::File(metadata_file).exists())
{
ReadBufferFromFile in(metadata_file, DBMS_DEFAULT_BUFFER_SIZE);
assertString("\nLast version:\t", in);
readIntText(last_version, in);
assertString("\nLast LSN:\t", in);
readString(last_lsn, in);
if (checkString("\nActual LSN:\t", in))
{
std::string actual_lsn;
readString(actual_lsn, in);
if (!actual_lsn.empty())
last_lsn = actual_lsn;
}
LOG_DEBUG(&Poco::Logger::get("MaterializePostgreSQLMetadata"),
"Last written version is {}. (From metadata file {})", last_version, metadata_file);
}
}
void MaterializePostgreSQLMetadata::writeMetadata(bool append_metadata)
{
WriteBufferFromFile out(tmp_metadata_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT);
if (append_metadata)
{
writeString("\nActual LSN:\t" + toString(last_lsn), out);
}
else
{
writeString("\nLast version:\t" + toString(last_version), out);
writeString("\nLast LSN:\t" + toString(last_lsn), out);
}
out.next();
out.sync();
out.close();
}
/// While data is received, version is updated. Before table sync, write last version to tmp file.
/// Then sync data to table and rename tmp to non-tmp.
void MaterializePostgreSQLMetadata::commitMetadata(std::string & lsn, const std::function<String()> & finalizeStreamFunc)
{
std::string actual_lsn;
last_lsn = lsn;
writeMetadata();
try
{
actual_lsn = finalizeStreamFunc();
/// This is not supposed to happen
if (actual_lsn != last_lsn)
{
writeMetadata(true);
LOG_WARNING(&Poco::Logger::get("MaterializePostgreSQLMetadata"),
"Last written LSN {} is not equal to actual LSN {}", last_lsn, actual_lsn);
}
Poco::File(tmp_metadata_file).renameTo(metadata_file);
}
catch (...)
{
Poco::File(tmp_metadata_file).remove();
throw;
}
}
}
#endif

View File

@ -1,31 +0,0 @@
#pragma once
#include <Interpreters/Context.h>
namespace DB
{
class MaterializePostgreSQLMetadata
{
public:
MaterializePostgreSQLMetadata(const std::string & metadata_file_path);
void commitMetadata(std::string & lsn, const std::function<String()> & finalizeStreamFunc);
void readMetadata();
size_t getAndIncrementVersion() { return last_version++; }
std::string lsn() { return last_lsn; }
private:
void writeMetadata(bool append_metadata = false);
const std::string metadata_file;
const std::string tmp_metadata_file;
uint64_t last_version;
std::string last_lsn;
};
}

View File

@ -1,6 +1,5 @@
#include "PostgreSQLReplicationHandler.h"
#if USE_LIBPQXX
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Storages/PostgreSQL/StorageMaterializePostgreSQL.h>
@ -8,8 +7,8 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Common/setThreadName.h>
#include <Interpreters/Context.h>
#include <DataStreams/copyData.h>
#include <filesystem>
namespace DB
@ -26,7 +25,6 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const String & remote_database_name_,
const String & current_database_name_,
const postgres::ConnectionInfo & connection_info_,
const std::string & metadata_path_,
ContextPtr context_,
const size_t max_block_size_,
bool allow_automatic_update_,
@ -36,7 +34,6 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
, context(context_)
, remote_database_name(remote_database_name_)
, current_database_name(current_database_name_)
, metadata_path(metadata_path_)
, connection_info(connection_info_)
, max_block_size(max_block_size_)
, allow_automatic_update(allow_automatic_update_)
@ -103,7 +100,15 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// List of nested tables (table_name -> nested_storage), which is passed to replication consumer.
std::unordered_map<String, StoragePtr> nested_storages;
std::string snapshot_name, start_lsn;
/// snapshot_name is initialized only if a new replication slot is created.
/// start_lsn is initialized in two places:
/// 1. if replication slot does not exist, start_lsn will be returned with its creation return parameters;
/// 2. if replication slot already exist, start_lsn is read from pg_replication_slots as
/// `confirmed_flush_lsn` - the address (LSN) up to which the logical slot's consumer has confirmed receiving data.
/// Data older than this is not available anymore.
/// TODO: more tests
String snapshot_name, start_lsn;
auto initial_sync = [&]()
{
@ -131,12 +136,13 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// There is one replication slot for each replication handler. In case of MaterializePostgreSQL database engine,
/// there is one replication slot per database. Its lifetime must be equal to the lifetime of replication handler.
/// Recreation of a replication slot imposes reloading of all tables.
if (!isReplicationSlotExist(tx.getRef(), replication_slot))
if (!isReplicationSlotExist(tx.getRef(), replication_slot, start_lsn))
{
initial_sync();
}
/// Replication slot depends on publication, so if replication slot exists and new
/// publication was just created - drop that replication slot and start from scratch.
/// TODO: tests
else if (new_publication_created)
{
dropReplicationSlot(tx.getRef());
@ -189,7 +195,6 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
connection,
replication_slot,
publication_name,
metadata_path,
start_lsn,
max_block_size,
allow_automatic_update,
@ -317,24 +322,26 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bo
}
bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction & tx, std::string & slot_name)
bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction & tx, String & slot_name, String & start_lsn)
{
std::string query_str = fmt::format("SELECT active, restart_lsn FROM pg_replication_slots WHERE slot_name = '{}'", slot_name);
std::string query_str = fmt::format("SELECT active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '{}'", slot_name);
pqxx::result result{tx.exec(query_str)};
/// Replication slot does not exist
if (result.empty())
return false;
LOG_TRACE(log, "Replication slot {} already exists (active: {}). Restart lsn position is {}",
slot_name, result[0][0].as<bool>(), result[0][0].as<std::string>());
start_lsn = result[0][2].as<std::string>();
LOG_TRACE(log, "Replication slot {} already exists (active: {}). Restart lsn position: {}, confirmed flush lsn: {}",
slot_name, result[0][0].as<bool>(), result[0][1].as<std::string>(), start_lsn);
return true;
}
void PostgreSQLReplicationHandler::createReplicationSlot(
pqxx::nontransaction & tx, std::string & start_lsn, std::string & snapshot_name, bool temporary)
pqxx::nontransaction & tx, String & start_lsn, String & snapshot_name, bool temporary)
{
std::string query_str;
@ -385,12 +392,10 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
void PostgreSQLReplicationHandler::shutdownFinal()
{
if (std::filesystem::exists(metadata_path))
std::filesystem::remove(metadata_path);
postgres::Transaction<pqxx::nontransaction> tx(connection->getRef());
dropPublication(tx.getRef());
if (isReplicationSlotExist(tx.getRef(), replication_slot))
String last_committed_lsn;
if (isReplicationSlotExist(tx.getRef(), replication_slot, last_committed_lsn))
dropReplicationSlot(tx.getRef());
}
@ -508,6 +513,5 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
dropReplicationSlot(tx.getRef(), /* temporary */true);
}
}
#endif
}

View File

@ -1,13 +1,6 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include "MaterializePostgreSQLConsumer.h"
#include "MaterializePostgreSQLMetadata.h"
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Core/PostgreSQL/Utils.h>
@ -29,7 +22,6 @@ public:
const String & remote_database_name_,
const String & current_database_name_,
const postgres::ConnectionInfo & connection_info_,
const String & metadata_path_,
ContextPtr context_,
const size_t max_block_size_,
bool allow_automatic_update_,
@ -69,9 +61,9 @@ private:
/// Methods to manage Replication Slots.
bool isReplicationSlotExist(pqxx::nontransaction & tx, std::string & slot_name);
bool isReplicationSlotExist(pqxx::nontransaction & tx, String & slot_name, String & start_lsn);
void createReplicationSlot(pqxx::nontransaction & tx, std::string & start_lsn, std::string & snapshot_name, bool temporary = false);
void createReplicationSlot(pqxx::nontransaction & tx, String & start_lsn, String & snapshot_name, bool temporary = false);
void dropReplicationSlot(pqxx::nontransaction & tx, bool temporary = false);
@ -85,16 +77,13 @@ private:
void reloadFromSnapshot(const std::vector<std::pair<Int32, String>> & relation_data);
PostgreSQLTableStructurePtr fetchTableStructure(pqxx::ReplicationTransaction & tx, const std::string & table_name);
PostgreSQLTableStructurePtr fetchTableStructure(pqxx::ReplicationTransaction & tx, const String & table_name);
Poco::Logger * log;
ContextPtr context;
const String remote_database_name, current_database_name;
/// Path for replication metadata.
const String metadata_path;
/// Connection string and address for logs.
postgres::ConnectionInfo connection_info;
@ -133,5 +122,3 @@ private:
};
}
#endif

View File

@ -63,15 +63,10 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
setInMemoryMetadata(storage_metadata);
/// Path to store replication metadata (like last written version, etc).
auto metadata_path = DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getMetadataPath()
+ "/.metadata_" + table_id_.database_name + "_" + table_id_.table_name + "_" + toString(table_id_.uuid);
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
remote_database_name,
table_id_.database_name,
connection_info,
metadata_path,
getContext(),
replication_settings->materialize_postgresql_max_block_size.value,
/* allow_automatic_update */ false, /* is_materialize_postgresql_database */false);