From 01075677cfa53dccf5af33e4b3249a6917ab611e Mon Sep 17 00:00:00 2001 From: kssenii Date: Sun, 11 Apr 2021 19:58:33 +0000 Subject: [PATCH] Better --- .../materialize-postgresql.md | 5 +- src/Core/PostgreSQL/PostgreSQLConnection.h | 2 +- src/DataStreams/PostgreSQLBlockInputStream.h | 13 +++-- .../DatabaseMaterializePostgreSQL.cpp | 17 ++++-- .../DatabaseMaterializePostgreSQL.h | 2 + .../fetchPostgreSQLTableStructure.cpp | 54 +++++++++++-------- .../fetchPostgreSQLTableStructure.h | 9 ++-- .../PostgreSQLReplicationHandler.cpp | 41 ++++++++------ .../PostgreSQL/PostgreSQLReplicationHandler.h | 14 ++--- src/Storages/StoragePostgreSQL.h | 1 - .../test.py | 50 ----------------- 11 files changed, 92 insertions(+), 116 deletions(-) diff --git a/docs/en/engines/database-engines/materialize-postgresql.md b/docs/en/engines/database-engines/materialize-postgresql.md index b3516001929..79dccabc287 100644 --- a/docs/en/engines/database-engines/materialize-postgresql.md +++ b/docs/en/engines/database-engines/materialize-postgresql.md @@ -7,9 +7,9 @@ toc_title: MaterializePostgreSQL ## Creating a Database {#creating-a-database} -## Requirements +## Requirements {#requirements} -Each replicated table must have one of the following **replica identity**: +- Each replicated table must have one of the following **replica identity**: 1. **default** (primary key) @@ -38,3 +38,4 @@ WHERE oid = 'postgres_table'::regclass; ``` +- Setting `wal_level`to `logical` and `max_replication_slots` to at least `2` in the postgresql config file. diff --git a/src/Core/PostgreSQL/PostgreSQLConnection.h b/src/Core/PostgreSQL/PostgreSQLConnection.h index dfed426b462..f884e93669d 100644 --- a/src/Core/PostgreSQL/PostgreSQLConnection.h +++ b/src/Core/PostgreSQL/PostgreSQLConnection.h @@ -25,7 +25,7 @@ class Connection; using ConnectionPtr = std::shared_ptr; -/// Connection string and address without login/password (for error logs) +/// Connection string and address without credentials (for logs) using ConnectionInfo = std::pair; ConnectionInfo formatConnectionString( diff --git a/src/DataStreams/PostgreSQLBlockInputStream.h b/src/DataStreams/PostgreSQLBlockInputStream.h index 5c637015f18..f320e2caeb5 100644 --- a/src/DataStreams/PostgreSQLBlockInputStream.h +++ b/src/DataStreams/PostgreSQLBlockInputStream.h @@ -9,7 +9,6 @@ #include #include #include -#include #include #include @@ -28,6 +27,12 @@ public: const Block & sample_block, const UInt64 max_block_size_); + String getName() const override { return "PostgreSQL"; } + Block getHeader() const override { return description.sample_block.cloneEmpty(); } + + void readPrefix() override; + +protected: PostgreSQLBlockInputStream( std::shared_ptr tx_, const std::string & query_str_, @@ -35,12 +40,6 @@ public: const UInt64 max_block_size_, bool auto_commit_); - String getName() const override { return "PostgreSQL"; } - Block getHeader() const override { return description.sample_block.cloneEmpty(); } - - void readPrefix() override; - -protected: String query_str; std::shared_ptr tx; std::unique_ptr stream; diff --git a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp index e5d61709387..c8e93616ead 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp @@ -60,7 +60,8 @@ void DatabaseMaterializePostgreSQL::startSynchronization() metadata_path + METADATA_SUFFIX, getContext(), settings->postgresql_replica_max_block_size.value, - settings->postgresql_replica_allow_minimal_ddl, true, + settings->postgresql_replica_allow_minimal_ddl, + /* is_materialize_postgresql_database = */ true, settings->postgresql_replica_tables_list.value); std::unordered_set tables_to_replicate = replication_handler->fetchRequiredTables(connection->getRef()); @@ -123,9 +124,9 @@ StoragePtr DatabaseMaterializePostgreSQL::tryGetTable(const String & name, Conte /// Note: In select query we call MaterializePostgreSQL table and it calls tryGetTable from its nested. std::lock_guard lock(tables_mutex); auto table = materialized_tables.find(name); - /// Here it is possible that nested table is temporarily out of reach, but return storage anyway, - /// it will not allow to read if nested is unavailable at the moment - if (table != materialized_tables.end()) + + /// Nested table is not created immediately. Consider that table exists only if nested table exists. + if (table != materialized_tables.end() && table->second->as()->isNestedLoaded()) return table->second; return StoragePtr{}; @@ -177,7 +178,7 @@ DatabaseTablesIteratorPtr DatabaseMaterializePostgreSQL::getTablesIterator( Tables nested_tables; for (const auto & [table_name, storage] : materialized_tables) { - auto nested_storage = storage->template as()->tryGetNested(); + auto nested_storage = storage->as()->tryGetNested(); if (nested_storage) nested_tables[table_name] = nested_storage; @@ -186,6 +187,12 @@ DatabaseTablesIteratorPtr DatabaseMaterializePostgreSQL::getTablesIterator( return std::make_unique(nested_tables, database_name); } + +void DatabaseMaterializePostgreSQL::renameTable(ContextPtr /* context_ */, const String & /* name */, IDatabase & /* to_database */, const String & /* to_name */, bool /* exchange */, bool /* dictionary */) +{ + throw Exception("MaterializePostgreSQL database does not support rename table.", ErrorCodes::NOT_IMPLEMENTED); +} + } #endif diff --git a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h index 17288be8fb2..bdfe54ace13 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h +++ b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h @@ -51,6 +51,8 @@ public: void createTable(ContextPtr context, const String & name, const StoragePtr & table, const ASTPtr & query) override; + void renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary) override; + void drop(ContextPtr local_context) override; void shutdown() override; diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp index 7aaa7cc6f2a..1f8b08d3807 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp @@ -26,14 +26,14 @@ namespace ErrorCodes } -std::unordered_set fetchPostgreSQLTablesList(pqxx::connection & connection) +template +std::unordered_set fetchPostgreSQLTablesList(T & tx) { std::unordered_set tables; std::string query = "SELECT tablename FROM pg_catalog.pg_tables " "WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'"; - pqxx::read_transaction tx(connection); - for (auto table_name : tx.stream(query)) + for (auto table_name : tx.template stream(query)) tables.insert(std::get<0>(table_name)); return tables; @@ -112,13 +112,13 @@ static DataTypePtr convertPostgreSQLDataType(std::string & type, bool is_nullabl template std::shared_ptr readNamesAndTypesList( - std::shared_ptr tx, const String & postgres_table_name, const String & query, bool use_nulls, bool only_names_and_types) + T & tx, const String & postgres_table_name, const String & query, bool use_nulls, bool only_names_and_types) { auto columns = NamesAndTypesList(); try { - pqxx::stream_from stream(*tx, pqxx::from_query, std::string_view(query)); + pqxx::stream_from stream(tx, pqxx::from_query, std::string_view(query)); if (only_names_and_types) { @@ -158,7 +158,7 @@ std::shared_ptr readNamesAndTypesList( template PostgreSQLTableStructure fetchPostgreSQLTableStructure( - std::shared_ptr tx, const String & postgres_table_name, bool use_nulls, bool with_primary_key, bool with_replica_identity_index) + T & tx, const String & postgres_table_name, bool use_nulls, bool with_primary_key, bool with_replica_identity_index) { PostgreSQLTableStructure table; @@ -213,29 +213,37 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure( } -template -PostgreSQLTableStructure fetchPostgreSQLTableStructure( - std::shared_ptr tx, const String & postgres_table_name, bool use_nulls, - bool with_primary_key, bool with_replica_identity_index); - - -template -PostgreSQLTableStructure fetchPostgreSQLTableStructure( - std::shared_ptr tx, const String & postgres_table_name, bool use_nulls, - bool with_primary_key, bool with_replica_identity_index); - - PostgreSQLTableStructure fetchPostgreSQLTableStructure( pqxx::connection & connection, const String & postgres_table_name, bool use_nulls) { - auto tx = std::make_shared(connection); - auto table = fetchPostgreSQLTableStructure(tx, postgres_table_name, use_nulls, false, false); - tx->commit(); - - return table; + postgres::Transaction tx(connection); + return fetchPostgreSQLTableStructure(tx.getRef(), postgres_table_name, use_nulls, false, false); } +std::unordered_set fetchPostgreSQLTablesList(pqxx::connection & connection) +{ + postgres::Transaction tx(connection); + return fetchPostgreSQLTablesList(tx.getRef()); +} + + +template +PostgreSQLTableStructure fetchPostgreSQLTableStructure( + pqxx::ReadTransaction & tx, const String & postgres_table_name, bool use_nulls, + bool with_primary_key, bool with_replica_identity_index); + +template +PostgreSQLTableStructure fetchPostgreSQLTableStructure( + pqxx::ReplicationTransaction & tx, const String & postgres_table_name, bool use_nulls, + bool with_primary_key, bool with_replica_identity_index); + +template +std::unordered_set fetchPostgreSQLTablesList(pqxx::work & tx); + +template +std::unordered_set fetchPostgreSQLTablesList(pqxx::ReadTransaction & tx); + } #endif diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h index bbcb9cd192f..2853e0a8ea4 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h @@ -12,8 +12,6 @@ namespace DB { -std::unordered_set fetchPostgreSQLTablesList(pqxx::connection & connection); - struct PostgreSQLTableStructure { std::shared_ptr columns; @@ -23,14 +21,19 @@ struct PostgreSQLTableStructure using PostgreSQLTableStructurePtr = std::unique_ptr; +std::unordered_set fetchPostgreSQLTablesList(pqxx::connection & connection); + PostgreSQLTableStructure fetchPostgreSQLTableStructure( pqxx::connection & connection, const String & postgres_table_name, bool use_nulls); template PostgreSQLTableStructure fetchPostgreSQLTableStructure( - std::shared_ptr tx, const String & postgres_table_name, bool use_nulls, + T & tx, const String & postgres_table_name, bool use_nulls, bool with_primary_key = false, bool with_replica_identity_index = false); +template +std::unordered_set fetchPostgreSQLTablesList(T & tx); + } #endif diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 4ea1dad2b14..c1056f925bb 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -87,7 +87,10 @@ void PostgreSQLReplicationHandler::shutdown() void PostgreSQLReplicationHandler::startSynchronization() { - createPublicationIfNeeded(connection->getRef()); + { + postgres::Transaction tx(connection->getRef()); + createPublicationIfNeeded(tx.getRef()); + } auto replication_connection = postgres::createReplicationConnection(connection_info); postgres::Transaction tx(replication_connection->getRef()); @@ -159,7 +162,7 @@ NameSet PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_na std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name); tx->exec(query_str); - storage_data.second->createNestedIfNeeded(fetchTableStructure(tx, table_name)); + storage_data.second->createNestedIfNeeded(fetchTableStructure(*tx, table_name)); auto nested_storage = storage_data.second->getNested(); /// Load from snapshot, which will show table state before creation of replication slot. @@ -233,14 +236,12 @@ bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx) } -void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::connection & connection_) +void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bool create_without_check) { if (new_publication_created) return; - postgres::Transaction tx(connection_); - - if (!isPublicationExist(tx.getRef())) + if (create_without_check || !isPublicationExist(tx)) { if (tables_list.empty()) { @@ -349,27 +350,33 @@ void PostgreSQLReplicationHandler::shutdownFinal() } +/// Used by MaterializePostgreSQL database engine. NameSet PostgreSQLReplicationHandler::fetchRequiredTables(pqxx::connection & connection_) { - if (tables_list.empty()) + postgres::Transaction tx(connection_); + bool publication_exists = isPublicationExist(tx.getRef()); + + if (tables_list.empty() && !publication_exists) { - return fetchPostgreSQLTablesList(connection_); - } - else - { - createPublicationIfNeeded(connection_); - return fetchTablesFromPublication(connection_); + /// Fetch all tables list from database. Publication does not exist yet, which means + /// that no replication took place. Publication will be created in + /// startSynchronization method. + return fetchPostgreSQLTablesList(tx.getRef()); } + + if (!publication_exists) + createPublicationIfNeeded(tx.getRef(), /* create_without_check = */ true); + + return fetchTablesFromPublication(tx.getRef()); } -NameSet PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::connection & connection_) +NameSet PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::work & tx) { std::string query = fmt::format("SELECT tablename FROM pg_publication_tables WHERE pubname = '{}'", publication_name); std::unordered_set tables; - postgres::Transaction tx(connection_); - for (auto table_name : tx.getRef().stream(query)) + for (auto table_name : tx.stream(query)) tables.insert(std::get<0>(table_name)); return tables; @@ -377,7 +384,7 @@ NameSet PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::connectio PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure( - std::shared_ptr tx, const std::string & table_name) + pqxx::ReplicationTransaction & tx, const std::string & table_name) { if (!is_postgresql_replica_database_engine) return nullptr; diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 5a527179406..43f0067aed7 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -52,18 +52,20 @@ public: private: using Storages = std::unordered_map; - void createPublicationIfNeeded(pqxx::connection & connection_); - bool isPublicationExist(pqxx::work & tx); + void createPublicationIfNeeded(pqxx::work & tx, bool create_without_check = false); + + NameSet fetchTablesFromPublication(pqxx::work & tx); + + void dropPublication(pqxx::nontransaction & ntx); + bool isReplicationSlotExist(pqxx::nontransaction & tx, std::string & slot_name); void createReplicationSlot(pqxx::nontransaction & tx, std::string & start_lsn, std::string & snapshot_name, bool temporary = false); void dropReplicationSlot(pqxx::nontransaction & tx, bool temporary = false); - void dropPublication(pqxx::nontransaction & ntx); - void waitConnectionAndStart(); void startSynchronization(); @@ -72,11 +74,9 @@ private: NameSet loadFromSnapshot(std::string & snapshot_name, Storages & sync_storages); - NameSet fetchTablesFromPublication(pqxx::connection & connection_); - std::unordered_map reloadFromSnapshot(const std::vector> & relation_data); - PostgreSQLTableStructurePtr fetchTableStructure(std::shared_ptr tx, const std::string & table_name); + PostgreSQLTableStructurePtr fetchTableStructure(pqxx::ReplicationTransaction & tx, const std::string & table_name); Poco::Logger * log; ContextPtr context; diff --git a/src/Storages/StoragePostgreSQL.h b/src/Storages/StoragePostgreSQL.h index 9bf5a001f1b..fc57aded197 100644 --- a/src/Storages/StoragePostgreSQL.h +++ b/src/Storages/StoragePostgreSQL.h @@ -10,7 +10,6 @@ #include #include #include -#include namespace DB diff --git a/tests/integration/test_postgresql_replica_database_engine/test.py b/tests/integration/test_postgresql_replica_database_engine/test.py index cf93a3e1b1c..503e12c890f 100644 --- a/tests/integration/test_postgresql_replica_database_engine/test.py +++ b/tests/integration/test_postgresql_replica_database_engine/test.py @@ -303,56 +303,6 @@ def test_load_and_sync_subset_of_database_tables(started_cluster): assert 'test_database' not in instance.query('SHOW DATABASES') -@pytest.mark.timeout(320) -def test_table_schema_changes(started_cluster): - instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(True) - cursor = conn.cursor() - NUM_TABLES = 5 - - for i in range(NUM_TABLES): - create_postgres_table(cursor, 'postgresql_replica_{}'.format(i), template=postgres_table_template_2); - instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {}, {}, {} from numbers(25)".format(i, i, i, i)) - - instance.query( - """CREATE DATABASE test_database - ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword') - SETTINGS postgresql_replica_allow_minimal_ddl = 1; - """) - - for i in range(NUM_TABLES): - instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 25 + number, {}, {}, {} from numbers(25)".format(i, i, i, i)) - - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); - - expected = instance.query("SELECT key, value1, value3 FROM test_database.postgresql_replica_3 ORDER BY key"); - - altered_table = random.randint(0, 4) - cursor.execute("ALTER TABLE postgresql_replica_{} DROP COLUMN value2".format(altered_table)) - - for i in range(NUM_TABLES): - cursor.execute("INSERT INTO postgresql_replica_{} VALUES (50, {}, {})".format(i, i, i)) - cursor.execute("UPDATE postgresql_replica_{} SET value3 = 12 WHERE key%2=0".format(i)) - - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); - - for i in range(NUM_TABLES): - if i != altered_table: - instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {}, {} from numbers(49)".format(i, i, i, i)) - else: - instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {} from numbers(49)".format(i, i, i)) - - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); - - for i in range(NUM_TABLES): - cursor.execute('drop table postgresql_replica_{};'.format(i)) - - instance.query("DROP DATABASE test_database") - - @pytest.mark.timeout(120) def test_changing_replica_identity_value(started_cluster): instance.query("DROP DATABASE IF EXISTS test_database")