This commit is contained in:
kssenii 2021-02-04 21:05:43 +00:00
parent d255b63364
commit 02e19f9422
7 changed files with 176 additions and 70 deletions

View File

@ -67,8 +67,6 @@ PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer(
if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray)
preparePostgreSQLArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type);
columns = description.sample_block.cloneEmptyColumns();
wal_reader_task = context->getSchedulePool().createTask("PostgreSQLReplicaWALReader", [this]{ replicationStream(); });
wal_reader_task->deactivate();
@ -92,7 +90,7 @@ void PostgreSQLReplicaConsumer::replicationStream()
{
size_t count_empty_slot_reads = 0;
auto start_time = std::chrono::steady_clock::now();
metadata.readDataVersion();
metadata.readMetadata();
LOG_TRACE(log, "Starting replication stream");
@ -384,18 +382,19 @@ void PostgreSQLReplicaConsumer::syncIntoTable(Block & block)
}
void PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr<pqxx::nontransaction> ntx)
String PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr<pqxx::nontransaction> ntx)
{
LOG_TRACE(log, "CURRENT LSN FROM TO {}", final_lsn.lsn);
std::string query_str = fmt::format("SELECT pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn.lsn);
pqxx::result result{ntx->exec(query_str)};
if (!result.empty())
{
std::string s1 = result[0].size() > 0 && !result[0][0].is_null() ? result[0][0].as<std::string>() : "NULL";
std::string s2 = result[0].size() > 1 && !result[0][1].is_null() ? result[0][1].as<std::string>() : "NULL";
LOG_TRACE(log, "ADVANCE LSN: {} and {}", s1, s2);
}
std::string query_str = fmt::format("SELECT end_lsn FROM pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn.lsn);
pqxx::result result{ntx->exec(query_str)};
ntx->commit();
if (!result.empty())
return result[0][0].as<std::string>();
return final_lsn.lsn;
}
@ -454,13 +453,10 @@ bool PostgreSQLReplicaConsumer::readFromReplicationSlot()
if (result_rows.rows())
{
assert(!slot_empty);
metadata.commitVersion([&]()
metadata.commitMetadata(final_lsn.lsn, [&]()
{
syncIntoTable(result_rows);
advanceLSN(tx);
/// TODO: Can transaction still be active if got exception before commiting it? It must be closed if connection is ok.
tx->commit();
return advanceLSN(tx);
});
}

View File

@ -22,9 +22,6 @@ struct LSNPosition
uint64_t upper_half, lower_half, result;
std::sscanf(lsn.data(), "%lX/%lX", &upper_half, &lower_half);
result = (upper_half << 32) + lower_half;
//LOG_DEBUG(&Poco::Logger::get("LSNParsing"),
// "Created replication slot. upper half: {}, lower_half: {}, start lsn: {}",
// upper_half, lower_half, result);
return result;
}
@ -32,7 +29,6 @@ struct LSNPosition
{
char result[16];
std::snprintf(result, sizeof(result), "%lX/%lX", (lsn_value >> 32), lsn_value & 0xFFFFFFFF);
//assert(lsn_value == result.getValue());
std::string ans = result;
return ans;
}
@ -79,7 +75,7 @@ private:
void insertDefaultValue(size_t column_idx);
void syncIntoTable(Block & block);
void advanceLSN(std::shared_ptr<pqxx::nontransaction> ntx);
String advanceLSN(std::shared_ptr<pqxx::nontransaction> ntx);
/// Methods to parse replication message data.
void readTupleData(const char * message, size_t & pos, PostgreSQLQuery type, bool old_value = false);

View File

@ -19,29 +19,51 @@ namespace ErrorCodes
PostgreSQLReplicaMetadata::PostgreSQLReplicaMetadata(const std::string & metadata_file_path)
: metadata_file(metadata_file_path)
, tmp_metadata_file(metadata_file_path + ".tmp")
, data_version(1)
, last_version(1)
{
}
void PostgreSQLReplicaMetadata::readDataVersion()
void PostgreSQLReplicaMetadata::readMetadata()
{
if (Poco::File(metadata_file).exists())
{
ReadBufferFromFile in(metadata_file, DBMS_DEFAULT_BUFFER_SIZE);
assertString("\nData version:\t", in);
readIntText(data_version, in);
assertString("\nLast version:\t", in);
readIntText(last_version, in);
assertString("\nLast LSN:\t", in);
readString(last_lsn, in);
if (checkString("\nActual LSN:\t", in))
{
std::string actual_lsn;
readString(actual_lsn, in);
if (!actual_lsn.empty())
last_lsn = actual_lsn;
}
LOG_DEBUG(&Poco::Logger::get("PostgreSQLReplicaMetadata"),
"Last written version is {}. (From metadata file {})", data_version, metadata_file);
"Last written version is {}. (From metadata file {})", last_version, metadata_file);
}
}
void PostgreSQLReplicaMetadata::writeDataVersion()
void PostgreSQLReplicaMetadata::writeMetadata(bool append_metadata)
{
WriteBufferFromFile out(tmp_metadata_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT);
writeString("\nData version:\t" + toString(data_version), out);
if (!append_metadata)
{
writeString("\nLast version:\t" + toString(last_version), out);
writeString("\nLast LSN:\t" + toString(last_lsn), out);
}
else
{
writeString("\nActual LSN:\t" + toString(last_lsn), out);
}
out.next();
out.sync();
@ -51,14 +73,15 @@ void PostgreSQLReplicaMetadata::writeDataVersion()
/// While data is recieved, version is updated. Before table sync, write last version to tmp file.
/// Then sync data to table and rename tmp to non-tmp.
void PostgreSQLReplicaMetadata::commitVersion(const std::function<void()> & finalizeStreamFunc)
void PostgreSQLReplicaMetadata::commitMetadata(std::string & lsn, const std::function<String()> & finalizeStreamFunc)
{
writeDataVersion();
std::string actual_lsn;
last_lsn = lsn;
writeMetadata();
try
{
/// TODO: return last actially written lsn and write it to file
finalizeStreamFunc();
actual_lsn = finalizeStreamFunc();
Poco::File(tmp_metadata_file).renameTo(metadata_file);
}
catch (...)
@ -66,6 +89,14 @@ void PostgreSQLReplicaMetadata::commitVersion(const std::function<void()> & fina
Poco::File(tmp_metadata_file).remove();
throw;
}
/// This is not supposed to happen
if (actual_lsn != last_lsn)
{
writeMetadata(true);
LOG_WARNING(&Poco::Logger::get("PostgreSQLReplicaMetadata"),
"Last written LSN {} is not equal to actual LSN {}", last_lsn, actual_lsn);
}
}
}

View File

@ -10,22 +10,22 @@ class PostgreSQLReplicaMetadata
public:
PostgreSQLReplicaMetadata(const std::string & metadata_file_path);
void commitVersion(const std::function<void()> & syncTableFunc);
void readDataVersion();
void commitMetadata(std::string & lsn, const std::function<String()> & syncTableFunc);
void readMetadata();
size_t version()
{
return data_version++;
return last_version++;
}
private:
void writeDataVersion();
void writeMetadata(bool append_metadata = false);
const std::string metadata_file;
const std::string tmp_metadata_file;
size_t data_version;
uint64_t last_version;
std::string last_lsn;
};
}

View File

@ -35,6 +35,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
, context(context_)
, database_name(database_name_)
, table_name(table_name_)
, connection_str(conn_str)
, publication_name(publication_name_)
, replication_slot(replication_slot_name_)
, max_block_size(max_block_size_)
@ -70,6 +71,7 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart()
{
LOG_ERROR(log, "Unable to set up connection for table {}.{}. Reconnection attempt continues. Error message: {}",
database_name, table_name, pqxx_error.what());
startup_task->scheduleAfter(reschedule_ms);
}
catch (Exception & e)
@ -78,7 +80,6 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart()
throw;
}
LOG_DEBUG(log, "PostgreSQLReplica starting replication proccess");
startReplication();
}
@ -90,7 +91,7 @@ void PostgreSQLReplicationHandler::shutdown()
}
bool PostgreSQLReplicationHandler::isPublicationExist()
bool PostgreSQLReplicationHandler::isPublicationExist(std::shared_ptr<pqxx::work> tx)
{
std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name);
pqxx::result result{tx->exec(query_str)};
@ -105,7 +106,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist()
}
void PostgreSQLReplicationHandler::createPublication()
void PostgreSQLReplicationHandler::createPublication(std::shared_ptr<pqxx::work> tx)
{
/// 'ONLY' means just a table, without descendants.
std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", publication_name, table_name);
@ -119,28 +120,29 @@ void PostgreSQLReplicationHandler::createPublication()
throw Exception(fmt::format("PostgreSQL table {}.{} does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
}
/// TODO: check replica identity
/// TODO: check replica identity?
/// Requires changed replica identity for included table to be able to receive old values of updated rows.
/// (ALTER TABLE table_name REPLICA IDENTITY FULL ?)
}
void PostgreSQLReplicationHandler::startReplication()
{
LOG_DEBUG(log, "PostgreSQLReplica starting replication proccess");
/// used commands require a specific transaction isolation mode.
replication_connection->conn()->set_variable("default_transaction_isolation", "'repeatable read'");
tx = std::make_shared<pqxx::work>(*replication_connection->conn());
auto tx = std::make_shared<pqxx::work>(*replication_connection->conn());
if (publication_name.empty())
{
publication_name = fmt::format("{}_{}_ch_publication", database_name, table_name);
/// Publication defines what tables are included into replication stream. Should be deleted only if MaterializePostgreSQL
/// table is dropped.
if (!isPublicationExist())
createPublication();
if (!isPublicationExist(tx))
createPublication(tx);
}
else if (!isPublicationExist())
else if (!isPublicationExist(tx))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -157,8 +159,7 @@ void PostgreSQLReplicationHandler::startReplication()
std::string snapshot_name;
LSNPosition start_lsn;
/// Non temporary replication slot should be deleted with drop table only and created only once, reused after detach.
if (!isReplicationSlotExist(ntx, replication_slot))
auto initial_sync = [&]()
{
/// Temporary replication slot
createTempReplicationSlot(ntx, start_lsn, snapshot_name);
@ -168,6 +169,18 @@ void PostgreSQLReplicationHandler::startReplication()
dropReplicationSlot(ntx, tmp_replication_slot);
/// Non-temporary replication slot
createReplicationSlot(ntx);
};
/// Non temporary replication slot should be deleted with drop table only and created only once, reused after detach.
if (!isReplicationSlotExist(ntx, replication_slot))
{
initial_sync();
}
else if (!Poco::File(metadata_path).exists())
{
/// If non-temporary slot exists and metadata file (where last synced version is written) does not exist, it is not normal.
dropReplicationSlot(ntx, replication_slot);
initial_sync();
}
ntx->commit();
@ -187,6 +200,9 @@ void PostgreSQLReplicationHandler::startReplication()
LOG_DEBUG(&Poco::Logger::get("StoragePostgreSQLMetadata"), "Successfully created replication consumer");
consumer->startSynchronization();
/// Takes time to close
replication_connection->conn()->close();
}
@ -287,7 +303,7 @@ void PostgreSQLReplicationHandler::createReplicationSlot(NontransactionPtr ntx)
void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr ntx, std::string & slot_name)
{
std::string query_str = fmt::format("DROP_REPLICATION_SLOT {}", slot_name);
std::string query_str = fmt::format("SELECT pg_drop_replication_slot('{}')", slot_name);
ntx->exec(query_str);
LOG_TRACE(log, "Replication slot {} is dropped", slot_name);
}
@ -303,14 +319,13 @@ void PostgreSQLReplicationHandler::dropPublication(NontransactionPtr ntx)
}
/// Only used when MaterializePostgreSQL table is dropped.
void PostgreSQLReplicationHandler::shutdownFinal()
{
/// TODO: check: if metadata file does not exist and replication slot does exist, then need to drop it at startup
if (Poco::File(metadata_path).exists())
Poco::File(metadata_path).remove();
auto ntx = std::make_shared<pqxx::nontransaction>(*replication_connection->conn());
connection = std::make_shared<PostgreSQLConnection>(connection_str);
auto ntx = std::make_shared<pqxx::nontransaction>(*connection->conn());
dropPublication(ntx);
if (isReplicationSlotExist(ntx, replication_slot))
@ -319,5 +334,4 @@ void PostgreSQLReplicationHandler::shutdownFinal()
ntx->commit();
}
}

View File

@ -28,16 +28,15 @@ public:
const std::string & replication_slot_name_,
const size_t max_block_size_);
void startup(StoragePtr storage_);
void startup(StoragePtr storage);
void shutdown();
void shutdownFinal();
private:
using NontransactionPtr = std::shared_ptr<pqxx::nontransaction>;
void waitConnectionAndStart();
bool isPublicationExist();
void createPublication();
bool isPublicationExist(std::shared_ptr<pqxx::work> tx);
void createPublication(std::shared_ptr<pqxx::work> tx);
bool isReplicationSlotExist(NontransactionPtr ntx, std::string & slot_name);
void createTempReplicationSlot(NontransactionPtr ntx, LSNPosition & start_lsn, std::string & snapshot_name);
@ -45,22 +44,19 @@ private:
void dropReplicationSlot(NontransactionPtr tx, std::string & slot_name);
void dropPublication(NontransactionPtr ntx);
void waitConnectionAndStart();
void startReplication();
void loadFromSnapshot(std::string & snapshot_name);
Context createQueryContext();
void getTableOutput(const Context & query_context);
Poco::Logger * log;
std::shared_ptr<Context> context;
const std::string database_name, table_name;
const std::string database_name, table_name, connection_str;
std::string publication_name, replication_slot;
std::string tmp_replication_slot;
const size_t max_block_size;
PostgreSQLConnectionPtr connection, replication_connection;
std::shared_ptr<pqxx::work> tx;
const String metadata_path;
BackgroundSchedulePool::TaskHolder startup_task;
std::shared_ptr<PostgreSQLReplicaConsumer> consumer;

View File

@ -78,12 +78,13 @@ def test_initial_load_from_snapshot(started_cluster):
PRIMARY KEY key;
''')
time.sleep(0.2)
time.sleep(1)
result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;')
cursor.execute('DROP TABLE postgresql_replica;')
postgresql_replica_check_result(result, True)
@pytest.mark.timeout(180)
def test_no_connection_at_startup(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
@ -102,8 +103,8 @@ def test_no_connection_at_startup(started_cluster):
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while int(result) == 0:
time.sleep(0.5);
result = instance.query('SELECT count() FROM test.postgresql_replica;')
time.sleep(1);
result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;')
cursor.execute('DROP TABLE postgresql_replica;')
@ -123,7 +124,11 @@ def test_detach_attach_is_ok(started_cluster):
PRIMARY KEY key;
''')
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) == 0):
time.sleep(0.2)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;')
postgresql_replica_check_result(result, True)
@ -149,8 +154,12 @@ def test_replicating_insert_queries(started_cluster):
PRIMARY KEY key;
''')
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) == 0):
time.sleep(0.2)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
result = instance.query('SELECT count() FROM test.postgresql_replica;')
assert(int(result) == 10)
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 10 + number, 10 + number from numbers(10)")
@ -188,7 +197,11 @@ def test_replicating_delete_queries(started_cluster):
PRIMARY KEY key;
''')
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) == 0):
time.sleep(0.2)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;')
postgresql_replica_check_result(result, True)
@ -220,8 +233,11 @@ def test_replicating_update_queries(started_cluster):
PRIMARY KEY key;
''')
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) == 0):
time.sleep(0.2)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
assert(int(result) == 50)
cursor.execute('UPDATE postgresql_replica SET value = value - 10;')
@ -245,6 +261,13 @@ def test_resume_from_written_version(started_cluster):
PRIMARY KEY key;
''')
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) == 0):
time.sleep(0.2)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
assert(int(result) == 50)
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, 50 + number from numbers(50)")
time.sleep(2)
@ -265,6 +288,56 @@ def test_resume_from_written_version(started_cluster):
postgresql_replica_check_result(result, True)
@pytest.mark.timeout(180)
def test_many_replication_messages(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000)")
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1)
ENGINE = PostgreSQLReplica(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
''')
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) == 100000):
time.sleep(0.2)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000, 100000)")
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) != 200000):
result = instance.query('SELECT count() FROM test.postgresql_replica;')
time.sleep(1)
result = instance.query('SELECT key FROM test.postgresql_replica ORDER BY key;')
expected = instance.query("SELECT number from numbers(200000)")
assert(result == expected)
cursor.execute('UPDATE postgresql_replica SET value = key + 1 WHERE key < 100000;')
result = instance.query('SELECT key FROM test.postgresql_replica WHERE value = key + 1 ORDER BY key;')
expected = instance.query("SELECT number from numbers(100000)")
while (result != expected):
result = instance.query('SELECT key FROM test.postgresql_replica WHERE value = key + 1 ORDER BY key;')
time.sleep(1)
cursor.execute('DELETE FROM postgresql_replica WHERE key % 2 = 1;')
cursor.execute('DELETE FROM postgresql_replica WHERE key != value;')
result = instance.query('SELECT count() FROM (SELECT * FROM test.postgresql_replica);')
while (int(result) != 50000):
result = instance.query('SELECT count() FROM (SELECT * FROM test.postgresql_replica);')
time.sleep(1)
cursor.execute('DROP TABLE postgresql_replica;')
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")