From 02e19f942226184cdfa7d7827c9b8bef995253e4 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 4 Feb 2021 21:05:43 +0000 Subject: [PATCH] Better --- .../PostgreSQL/PostgreSQLReplicaConsumer.cpp | 30 +++---- .../PostgreSQL/PostgreSQLReplicaConsumer.h | 6 +- .../PostgreSQL/PostgreSQLReplicaMetadata.cpp | 53 +++++++++--- .../PostgreSQL/PostgreSQLReplicaMetadata.h | 12 +-- .../PostgreSQLReplicationHandler.cpp | 46 ++++++---- .../PostgreSQL/PostgreSQLReplicationHandler.h | 14 ++- .../test_storage_postgresql_replica/test.py | 85 +++++++++++++++++-- 7 files changed, 176 insertions(+), 70 deletions(-) diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp index b4a7344a9cd..e8e73cd2d52 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp @@ -67,8 +67,6 @@ PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer( if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray) preparePostgreSQLArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type); - columns = description.sample_block.cloneEmptyColumns(); - wal_reader_task = context->getSchedulePool().createTask("PostgreSQLReplicaWALReader", [this]{ replicationStream(); }); wal_reader_task->deactivate(); @@ -92,7 +90,7 @@ void PostgreSQLReplicaConsumer::replicationStream() { size_t count_empty_slot_reads = 0; auto start_time = std::chrono::steady_clock::now(); - metadata.readDataVersion(); + metadata.readMetadata(); LOG_TRACE(log, "Starting replication stream"); @@ -384,18 +382,19 @@ void PostgreSQLReplicaConsumer::syncIntoTable(Block & block) } -void PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr ntx) +String PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr ntx) { LOG_TRACE(log, "CURRENT LSN FROM TO {}", final_lsn.lsn); - std::string query_str = fmt::format("SELECT pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn.lsn); - pqxx::result result{ntx->exec(query_str)}; - if (!result.empty()) - { - std::string s1 = result[0].size() > 0 && !result[0][0].is_null() ? result[0][0].as() : "NULL"; - std::string s2 = result[0].size() > 1 && !result[0][1].is_null() ? result[0][1].as() : "NULL"; - LOG_TRACE(log, "ADVANCE LSN: {} and {}", s1, s2); - } + std::string query_str = fmt::format("SELECT end_lsn FROM pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn.lsn); + pqxx::result result{ntx->exec(query_str)}; + + ntx->commit(); + + if (!result.empty()) + return result[0][0].as(); + + return final_lsn.lsn; } @@ -454,13 +453,10 @@ bool PostgreSQLReplicaConsumer::readFromReplicationSlot() if (result_rows.rows()) { assert(!slot_empty); - metadata.commitVersion([&]() + metadata.commitMetadata(final_lsn.lsn, [&]() { syncIntoTable(result_rows); - advanceLSN(tx); - - /// TODO: Can transaction still be active if got exception before commiting it? It must be closed if connection is ok. - tx->commit(); + return advanceLSN(tx); }); } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h index cbe19c4436e..efb9dabc121 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h @@ -22,9 +22,6 @@ struct LSNPosition uint64_t upper_half, lower_half, result; std::sscanf(lsn.data(), "%lX/%lX", &upper_half, &lower_half); result = (upper_half << 32) + lower_half; - //LOG_DEBUG(&Poco::Logger::get("LSNParsing"), - // "Created replication slot. upper half: {}, lower_half: {}, start lsn: {}", - // upper_half, lower_half, result); return result; } @@ -32,7 +29,6 @@ struct LSNPosition { char result[16]; std::snprintf(result, sizeof(result), "%lX/%lX", (lsn_value >> 32), lsn_value & 0xFFFFFFFF); - //assert(lsn_value == result.getValue()); std::string ans = result; return ans; } @@ -79,7 +75,7 @@ private: void insertDefaultValue(size_t column_idx); void syncIntoTable(Block & block); - void advanceLSN(std::shared_ptr ntx); + String advanceLSN(std::shared_ptr ntx); /// Methods to parse replication message data. void readTupleData(const char * message, size_t & pos, PostgreSQLQuery type, bool old_value = false); diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.cpp index 74804d0d93d..3188f271f0a 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.cpp @@ -19,29 +19,51 @@ namespace ErrorCodes PostgreSQLReplicaMetadata::PostgreSQLReplicaMetadata(const std::string & metadata_file_path) : metadata_file(metadata_file_path) , tmp_metadata_file(metadata_file_path + ".tmp") - , data_version(1) + , last_version(1) { } -void PostgreSQLReplicaMetadata::readDataVersion() +void PostgreSQLReplicaMetadata::readMetadata() { if (Poco::File(metadata_file).exists()) { ReadBufferFromFile in(metadata_file, DBMS_DEFAULT_BUFFER_SIZE); - assertString("\nData version:\t", in); - readIntText(data_version, in); + + assertString("\nLast version:\t", in); + readIntText(last_version, in); + + assertString("\nLast LSN:\t", in); + readString(last_lsn, in); + + if (checkString("\nActual LSN:\t", in)) + { + std::string actual_lsn; + readString(actual_lsn, in); + + if (!actual_lsn.empty()) + last_lsn = actual_lsn; + } LOG_DEBUG(&Poco::Logger::get("PostgreSQLReplicaMetadata"), - "Last written version is {}. (From metadata file {})", data_version, metadata_file); + "Last written version is {}. (From metadata file {})", last_version, metadata_file); } } -void PostgreSQLReplicaMetadata::writeDataVersion() +void PostgreSQLReplicaMetadata::writeMetadata(bool append_metadata) { WriteBufferFromFile out(tmp_metadata_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT); - writeString("\nData version:\t" + toString(data_version), out); + + if (!append_metadata) + { + writeString("\nLast version:\t" + toString(last_version), out); + writeString("\nLast LSN:\t" + toString(last_lsn), out); + } + else + { + writeString("\nActual LSN:\t" + toString(last_lsn), out); + } out.next(); out.sync(); @@ -51,14 +73,15 @@ void PostgreSQLReplicaMetadata::writeDataVersion() /// While data is recieved, version is updated. Before table sync, write last version to tmp file. /// Then sync data to table and rename tmp to non-tmp. -void PostgreSQLReplicaMetadata::commitVersion(const std::function & finalizeStreamFunc) +void PostgreSQLReplicaMetadata::commitMetadata(std::string & lsn, const std::function & finalizeStreamFunc) { - writeDataVersion(); + std::string actual_lsn; + last_lsn = lsn; + writeMetadata(); try { - /// TODO: return last actially written lsn and write it to file - finalizeStreamFunc(); + actual_lsn = finalizeStreamFunc(); Poco::File(tmp_metadata_file).renameTo(metadata_file); } catch (...) @@ -66,6 +89,14 @@ void PostgreSQLReplicaMetadata::commitVersion(const std::function & fina Poco::File(tmp_metadata_file).remove(); throw; } + + /// This is not supposed to happen + if (actual_lsn != last_lsn) + { + writeMetadata(true); + LOG_WARNING(&Poco::Logger::get("PostgreSQLReplicaMetadata"), + "Last written LSN {} is not equal to actual LSN {}", last_lsn, actual_lsn); + } } } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.h b/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.h index 13a53746c22..f93b74c8c65 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.h @@ -10,22 +10,22 @@ class PostgreSQLReplicaMetadata public: PostgreSQLReplicaMetadata(const std::string & metadata_file_path); - void commitVersion(const std::function & syncTableFunc); - void readDataVersion(); + void commitMetadata(std::string & lsn, const std::function & syncTableFunc); + void readMetadata(); size_t version() { - return data_version++; + return last_version++; } private: - void writeDataVersion(); + void writeMetadata(bool append_metadata = false); const std::string metadata_file; const std::string tmp_metadata_file; - size_t data_version; - + uint64_t last_version; + std::string last_lsn; }; } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index b845f697d1c..1726185ad8a 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -35,6 +35,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( , context(context_) , database_name(database_name_) , table_name(table_name_) + , connection_str(conn_str) , publication_name(publication_name_) , replication_slot(replication_slot_name_) , max_block_size(max_block_size_) @@ -70,6 +71,7 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart() { LOG_ERROR(log, "Unable to set up connection for table {}.{}. Reconnection attempt continues. Error message: {}", database_name, table_name, pqxx_error.what()); + startup_task->scheduleAfter(reschedule_ms); } catch (Exception & e) @@ -78,7 +80,6 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart() throw; } - LOG_DEBUG(log, "PostgreSQLReplica starting replication proccess"); startReplication(); } @@ -90,7 +91,7 @@ void PostgreSQLReplicationHandler::shutdown() } -bool PostgreSQLReplicationHandler::isPublicationExist() +bool PostgreSQLReplicationHandler::isPublicationExist(std::shared_ptr tx) { std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name); pqxx::result result{tx->exec(query_str)}; @@ -105,7 +106,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist() } -void PostgreSQLReplicationHandler::createPublication() +void PostgreSQLReplicationHandler::createPublication(std::shared_ptr tx) { /// 'ONLY' means just a table, without descendants. std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", publication_name, table_name); @@ -119,28 +120,29 @@ void PostgreSQLReplicationHandler::createPublication() throw Exception(fmt::format("PostgreSQL table {}.{} does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE); } - /// TODO: check replica identity + /// TODO: check replica identity? /// Requires changed replica identity for included table to be able to receive old values of updated rows. - /// (ALTER TABLE table_name REPLICA IDENTITY FULL ?) } void PostgreSQLReplicationHandler::startReplication() { + LOG_DEBUG(log, "PostgreSQLReplica starting replication proccess"); + /// used commands require a specific transaction isolation mode. replication_connection->conn()->set_variable("default_transaction_isolation", "'repeatable read'"); - tx = std::make_shared(*replication_connection->conn()); + auto tx = std::make_shared(*replication_connection->conn()); if (publication_name.empty()) { publication_name = fmt::format("{}_{}_ch_publication", database_name, table_name); /// Publication defines what tables are included into replication stream. Should be deleted only if MaterializePostgreSQL /// table is dropped. - if (!isPublicationExist()) - createPublication(); + if (!isPublicationExist(tx)) + createPublication(tx); } - else if (!isPublicationExist()) + else if (!isPublicationExist(tx)) { throw Exception( ErrorCodes::LOGICAL_ERROR, @@ -157,8 +159,7 @@ void PostgreSQLReplicationHandler::startReplication() std::string snapshot_name; LSNPosition start_lsn; - /// Non temporary replication slot should be deleted with drop table only and created only once, reused after detach. - if (!isReplicationSlotExist(ntx, replication_slot)) + auto initial_sync = [&]() { /// Temporary replication slot createTempReplicationSlot(ntx, start_lsn, snapshot_name); @@ -168,6 +169,18 @@ void PostgreSQLReplicationHandler::startReplication() dropReplicationSlot(ntx, tmp_replication_slot); /// Non-temporary replication slot createReplicationSlot(ntx); + }; + + /// Non temporary replication slot should be deleted with drop table only and created only once, reused after detach. + if (!isReplicationSlotExist(ntx, replication_slot)) + { + initial_sync(); + } + else if (!Poco::File(metadata_path).exists()) + { + /// If non-temporary slot exists and metadata file (where last synced version is written) does not exist, it is not normal. + dropReplicationSlot(ntx, replication_slot); + initial_sync(); } ntx->commit(); @@ -187,6 +200,9 @@ void PostgreSQLReplicationHandler::startReplication() LOG_DEBUG(&Poco::Logger::get("StoragePostgreSQLMetadata"), "Successfully created replication consumer"); consumer->startSynchronization(); + + /// Takes time to close + replication_connection->conn()->close(); } @@ -287,7 +303,7 @@ void PostgreSQLReplicationHandler::createReplicationSlot(NontransactionPtr ntx) void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr ntx, std::string & slot_name) { - std::string query_str = fmt::format("DROP_REPLICATION_SLOT {}", slot_name); + std::string query_str = fmt::format("SELECT pg_drop_replication_slot('{}')", slot_name); ntx->exec(query_str); LOG_TRACE(log, "Replication slot {} is dropped", slot_name); } @@ -303,14 +319,13 @@ void PostgreSQLReplicationHandler::dropPublication(NontransactionPtr ntx) } -/// Only used when MaterializePostgreSQL table is dropped. void PostgreSQLReplicationHandler::shutdownFinal() { - /// TODO: check: if metadata file does not exist and replication slot does exist, then need to drop it at startup if (Poco::File(metadata_path).exists()) Poco::File(metadata_path).remove(); - auto ntx = std::make_shared(*replication_connection->conn()); + connection = std::make_shared(connection_str); + auto ntx = std::make_shared(*connection->conn()); dropPublication(ntx); if (isReplicationSlotExist(ntx, replication_slot)) @@ -319,5 +334,4 @@ void PostgreSQLReplicationHandler::shutdownFinal() ntx->commit(); } - } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 594f57e0dc7..9d2fcf9f042 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -28,16 +28,15 @@ public: const std::string & replication_slot_name_, const size_t max_block_size_); - void startup(StoragePtr storage_); + void startup(StoragePtr storage); void shutdown(); void shutdownFinal(); private: using NontransactionPtr = std::shared_ptr; - void waitConnectionAndStart(); - bool isPublicationExist(); - void createPublication(); + bool isPublicationExist(std::shared_ptr tx); + void createPublication(std::shared_ptr tx); bool isReplicationSlotExist(NontransactionPtr ntx, std::string & slot_name); void createTempReplicationSlot(NontransactionPtr ntx, LSNPosition & start_lsn, std::string & snapshot_name); @@ -45,22 +44,19 @@ private: void dropReplicationSlot(NontransactionPtr tx, std::string & slot_name); void dropPublication(NontransactionPtr ntx); + void waitConnectionAndStart(); void startReplication(); void loadFromSnapshot(std::string & snapshot_name); - Context createQueryContext(); - void getTableOutput(const Context & query_context); Poco::Logger * log; std::shared_ptr context; - const std::string database_name, table_name; + const std::string database_name, table_name, connection_str; std::string publication_name, replication_slot; std::string tmp_replication_slot; const size_t max_block_size; PostgreSQLConnectionPtr connection, replication_connection; - std::shared_ptr tx; - const String metadata_path; BackgroundSchedulePool::TaskHolder startup_task; std::shared_ptr consumer; diff --git a/tests/integration/test_storage_postgresql_replica/test.py b/tests/integration/test_storage_postgresql_replica/test.py index 44c637cc165..5f91fd2f7b4 100644 --- a/tests/integration/test_storage_postgresql_replica/test.py +++ b/tests/integration/test_storage_postgresql_replica/test.py @@ -78,12 +78,13 @@ def test_initial_load_from_snapshot(started_cluster): PRIMARY KEY key; ''') - time.sleep(0.2) + time.sleep(1) result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;') cursor.execute('DROP TABLE postgresql_replica;') postgresql_replica_check_result(result, True) +@pytest.mark.timeout(180) def test_no_connection_at_startup(started_cluster): conn = get_postgres_conn(True) cursor = conn.cursor() @@ -102,8 +103,8 @@ def test_no_connection_at_startup(started_cluster): result = instance.query('SELECT count() FROM test.postgresql_replica;') while int(result) == 0: + time.sleep(0.5); result = instance.query('SELECT count() FROM test.postgresql_replica;') - time.sleep(1); result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;') cursor.execute('DROP TABLE postgresql_replica;') @@ -123,7 +124,11 @@ def test_detach_attach_is_ok(started_cluster): PRIMARY KEY key; ''') - time.sleep(0.2) + result = instance.query('SELECT count() FROM test.postgresql_replica;') + while (int(result) == 0): + time.sleep(0.2) + result = instance.query('SELECT count() FROM test.postgresql_replica;') + result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;') postgresql_replica_check_result(result, True) @@ -149,7 +154,11 @@ def test_replicating_insert_queries(started_cluster): PRIMARY KEY key; ''') - time.sleep(0.2) + result = instance.query('SELECT count() FROM test.postgresql_replica;') + while (int(result) == 0): + time.sleep(0.2) + result = instance.query('SELECT count() FROM test.postgresql_replica;') + result = instance.query('SELECT count() FROM test.postgresql_replica;') assert(int(result) == 10) @@ -188,7 +197,11 @@ def test_replicating_delete_queries(started_cluster): PRIMARY KEY key; ''') - time.sleep(0.2) + result = instance.query('SELECT count() FROM test.postgresql_replica;') + while (int(result) == 0): + time.sleep(0.2) + result = instance.query('SELECT count() FROM test.postgresql_replica;') + result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;') postgresql_replica_check_result(result, True) @@ -220,8 +233,11 @@ def test_replicating_update_queries(started_cluster): PRIMARY KEY key; ''') - time.sleep(0.2) result = instance.query('SELECT count() FROM test.postgresql_replica;') + while (int(result) == 0): + time.sleep(0.2) + result = instance.query('SELECT count() FROM test.postgresql_replica;') + assert(int(result) == 50) cursor.execute('UPDATE postgresql_replica SET value = value - 10;') @@ -245,6 +261,13 @@ def test_resume_from_written_version(started_cluster): PRIMARY KEY key; ''') + result = instance.query('SELECT count() FROM test.postgresql_replica;') + while (int(result) == 0): + time.sleep(0.2) + result = instance.query('SELECT count() FROM test.postgresql_replica;') + + assert(int(result) == 50) + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, 50 + number from numbers(50)") time.sleep(2) @@ -265,6 +288,56 @@ def test_resume_from_written_version(started_cluster): postgresql_replica_check_result(result, True) +@pytest.mark.timeout(180) +def test_many_replication_messages(started_cluster): + conn = get_postgres_conn(True) + cursor = conn.cursor() + create_postgres_table(cursor, 'postgresql_replica'); + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000)") + + instance.query(''' + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + ENGINE = PostgreSQLReplica( + 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') + PRIMARY KEY key; + ''') + + result = instance.query('SELECT count() FROM test.postgresql_replica;') + while (int(result) == 100000): + time.sleep(0.2) + result = instance.query('SELECT count() FROM test.postgresql_replica;') + + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000, 100000)") + + result = instance.query('SELECT count() FROM test.postgresql_replica;') + while (int(result) != 200000): + result = instance.query('SELECT count() FROM test.postgresql_replica;') + time.sleep(1) + + result = instance.query('SELECT key FROM test.postgresql_replica ORDER BY key;') + expected = instance.query("SELECT number from numbers(200000)") + assert(result == expected) + + cursor.execute('UPDATE postgresql_replica SET value = key + 1 WHERE key < 100000;') + + result = instance.query('SELECT key FROM test.postgresql_replica WHERE value = key + 1 ORDER BY key;') + expected = instance.query("SELECT number from numbers(100000)") + + while (result != expected): + result = instance.query('SELECT key FROM test.postgresql_replica WHERE value = key + 1 ORDER BY key;') + time.sleep(1) + + cursor.execute('DELETE FROM postgresql_replica WHERE key % 2 = 1;') + cursor.execute('DELETE FROM postgresql_replica WHERE key != value;') + + result = instance.query('SELECT count() FROM (SELECT * FROM test.postgresql_replica);') + while (int(result) != 50000): + result = instance.query('SELECT count() FROM (SELECT * FROM test.postgresql_replica);') + time.sleep(1) + + cursor.execute('DROP TABLE postgresql_replica;') + + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...")