diff --git a/docs/en/engines/database-engines/materialized-postgresql.md b/docs/en/engines/database-engines/materialized-postgresql.md index 3aa6dd01ea3..97185f35e1e 100644 --- a/docs/en/engines/database-engines/materialized-postgresql.md +++ b/docs/en/engines/database-engines/materialized-postgresql.md @@ -155,6 +155,12 @@ Replication of [**TOAST**](https://www.postgresql.org/docs/9.5/storage-toast.htm Sets a comma-separated list of PostgreSQL database tables, which will be replicated via [MaterializedPostgreSQL](../../engines/database-engines/materialized-postgresql.md) database engine. + Each table can have subset of replicated columns in brackets. If subset of columns is omitted, then all columns for table will be replicated. + + ``` sql + materialized_postgresql_tables_list = 'table1(co1, col2),table2,table3(co3, col5, col7) + ``` + Default value: empty list — means whole PostgreSQL database will be replicated. ### `materialized_postgresql_schema` {#materialized-postgresql-schema} diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp index b9fd9c325f8..45fd52f27ab 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -292,7 +293,7 @@ PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList( template PostgreSQLTableStructure fetchPostgreSQLTableStructure( - T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls, bool with_primary_key, bool with_replica_identity_index) + T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns) { PostgreSQLTableStructure table; @@ -302,6 +303,10 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure( ? " AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')" : fmt::format(" AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = {})", quoteString(postgres_schema)); + std::string columns_part; + if (!columns.empty()) + columns_part = fmt::format(" AND attname IN ('{}')", boost::algorithm::join(columns, "','")); + std::string query = fmt::format( "SELECT attname AS name, " /// column name "format_type(atttypid, atttypmod) AS type, " /// data type @@ -312,9 +317,9 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure( "attnum as att_num, " "attgenerated as generated " /// if column has GENERATED "FROM pg_attribute " - "WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) " + "WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) {}" "AND NOT attisdropped AND attnum > 0 " - "ORDER BY attnum ASC", where); + "ORDER BY attnum ASC", where, columns_part); auto postgres_table_with_schema = postgres_schema.empty() ? postgres_table : doubleQuoteString(postgres_schema) + '.' + doubleQuoteString(postgres_table); table.physical_columns = readNamesAndTypesList(tx, postgres_table_with_schema, query, use_nulls, false); @@ -415,7 +420,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure( PostgreSQLTableStructure fetchPostgreSQLTableStructure(pqxx::connection & connection, const String & postgres_table, const String & postgres_schema, bool use_nulls) { pqxx::ReadTransaction tx(connection); - auto result = fetchPostgreSQLTableStructure(tx, postgres_table, postgres_schema, use_nulls, false, false); + auto result = fetchPostgreSQLTableStructure(tx, postgres_table, postgres_schema, use_nulls, false, false, {}); tx.commit(); return result; } @@ -433,17 +438,17 @@ std::set fetchPostgreSQLTablesList(pqxx::connection & connection, const template PostgreSQLTableStructure fetchPostgreSQLTableStructure( pqxx::ReadTransaction & tx, const String & postgres_table, const String & postgres_schema, - bool use_nulls, bool with_primary_key, bool with_replica_identity_index); + bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns); template PostgreSQLTableStructure fetchPostgreSQLTableStructure( pqxx::ReplicationTransaction & tx, const String & postgres_table, const String & postgres_schema, - bool use_nulls, bool with_primary_key, bool with_replica_identity_index); + bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns); template PostgreSQLTableStructure fetchPostgreSQLTableStructure( pqxx::nontransaction & tx, const String & postgres_table, const String & postrges_schema, - bool use_nulls, bool with_primary_key, bool with_replica_identity_index); + bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns); std::set fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema); diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h index 25ece6909fd..6f7bae44c35 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h @@ -48,7 +48,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure( template PostgreSQLTableStructure fetchPostgreSQLTableStructure( T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls = true, - bool with_primary_key = false, bool with_replica_identity_index = false); + bool with_primary_key = false, bool with_replica_identity_index = false, const Strings & columns = {}); template std::set fetchPostgreSQLTablesList(T & tx, const String & postgres_schema); diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 01f78673ed8..2fe1fb5905a 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -348,11 +348,10 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) auto * materialized_storage = storage->as (); try { - auto [postgres_table_schema, postgres_table_name] = getSchemaAndTableName(table_name); - auto table_structure = fetchPostgreSQLTableStructure(tx, postgres_table_name, postgres_table_schema, true, true, true); - if (!table_structure.physical_columns) + auto table_structure = fetchTableStructure(tx, table_name); + if (!table_structure->physical_columns) throw Exception(ErrorCodes::LOGICAL_ERROR, "No columns"); - auto storage_info = StorageInfo(materialized_storage->getNested(), table_structure.physical_columns->attributes); + auto storage_info = StorageInfo(materialized_storage->getNested(), table_structure->physical_columns->attributes); nested_storages.emplace(table_name, std::move(storage_info)); } catch (Exception & e) @@ -399,9 +398,7 @@ ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializ postgres::Connection connection(connection_info); pqxx::nontransaction tx(connection.getRef()); - auto [postgres_table_schema, postgres_table_name] = getSchemaAndTableName(table_name); - auto table_structure = std::make_unique(fetchPostgreSQLTableStructure(tx, postgres_table_name, postgres_table_schema, true, true, true)); - + auto table_structure = fetchTableStructure(tx, table_name); auto table_override = tryGetTableOverride(current_database_name, table_name); return storage->getCreateNestedTableQuery(std::move(table_structure), table_override ? table_override->as() : nullptr); } @@ -415,16 +412,35 @@ StorageInfo PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name); tx->exec(query_str); - auto table_structure = fetchTableStructure(*tx, table_name); + PostgreSQLTableStructurePtr table_structure; + try + { + table_structure = fetchTableStructure(*tx, table_name); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + table_structure = std::make_unique(); + } if (!table_structure->physical_columns) throw Exception(ErrorCodes::LOGICAL_ERROR, "No table attributes"); auto table_attributes = table_structure->physical_columns->attributes; + auto columns = getTableAllowedColumns(table_name); /// Load from snapshot, which will show table state before creation of replication slot. /// Already connected to needed database, no need to add it to query. auto quoted_name = doubleQuoteWithSchema(table_name); - query_str = fmt::format("SELECT * FROM ONLY {}", quoted_name); + if (columns.empty()) + query_str = fmt::format("SELECT * FROM ONLY {}", quoted_name); + else + { + /// We should not use columns list from getTableAllowedColumns because it may have broken columns order + Strings allowed_columns; + for (const auto & column : table_structure->physical_columns->columns) + allowed_columns.push_back(column.name); + query_str = fmt::format("SELECT {} FROM ONLY {}", boost::algorithm::join(allowed_columns, ","), quoted_name); + } LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name); @@ -700,6 +716,37 @@ void PostgreSQLReplicationHandler::setSetting(const SettingChange & setting) } +/// Allowed columns for table from materialized_postgresql_tables_list setting +Strings PostgreSQLReplicationHandler::getTableAllowedColumns(const std::string & table_name) const +{ + Strings result; + if (tables_list.empty()) + return result; + + size_t table_pos = tables_list.find(table_name); + if (table_pos == std::string::npos) + { + return result; + } + + if (table_pos + table_name.length() + 1 > tables_list.length()) + { + return result; + } + String column_list = tables_list.substr(table_pos + table_name.length() + 1); + column_list.erase(std::remove(column_list.begin(), column_list.end(), '"'), column_list.end()); + boost::trim(column_list); + if (column_list.empty() || column_list[0] != '(') + return result; + + size_t end_bracket_pos = column_list.find(')'); + column_list = column_list.substr(1, end_bracket_pos - 1); + splitInto<','>(result, column_list); + + return result; +} + + void PostgreSQLReplicationHandler::shutdownFinal() { try @@ -749,11 +796,27 @@ std::set PostgreSQLReplicationHandler::fetchRequiredTables() Strings expected_tables; if (!tables_list.empty()) { - splitInto<','>(expected_tables, tables_list); - if (expected_tables.empty()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse tables list: {}", tables_list); - for (auto & table_name : expected_tables) - boost::trim(table_name); + /// Removing columns `table(col1, col2)` from tables_list + String cleared_tables_list = tables_list; + while (true) + { + size_t start_bracket_pos = cleared_tables_list.find('('); + size_t end_bracket_pos = cleared_tables_list.find(')'); + if (start_bracket_pos == std::string::npos || end_bracket_pos == std::string::npos) + { + break; + } + cleared_tables_list = cleared_tables_list.substr(0, start_bracket_pos) + cleared_tables_list.substr(end_bracket_pos + 1); + } + + splitInto<','>(expected_tables, cleared_tables_list); + if (expected_tables.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse tables list: {}", tables_list); + + for (auto & table_name : expected_tables) + { + boost::trim(table_name); + } } /// Try to fetch tables list from publication if there is not tables list. @@ -864,18 +927,50 @@ std::set PostgreSQLReplicationHandler::fetchRequiredTables() /// `schema1.table1, schema2.table2, ...` -> `"schema1"."table1", "schema2"."table2", ...` /// or /// `table1, table2, ...` + setting `schema` -> `"schema"."table1", "schema"."table2", ...` + /// or + /// `table1, table2(id,name), ...` + setting `schema` -> `"schema"."table1", "schema"."table2"("id","name"), ...` if (!tables_list.empty()) { - Strings tables_names; - splitInto<','>(tables_names, tables_list); - if (tables_names.empty()) + Strings parts; + splitInto<','>(parts, tables_list); + if (parts.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Empty list of tables"); + bool is_column = false; WriteBufferFromOwnString buf; - for (auto & table_name : tables_names) + for (auto & part : parts) { - boost::trim(table_name); - buf << doubleQuoteWithSchema(table_name); + boost::trim(part); + + size_t bracket_pos = part.find('('); + if (bracket_pos != std::string::npos) + { + is_column = true; + std::string table_name = part.substr(0, bracket_pos); + boost::trim(table_name); + buf << doubleQuoteWithSchema(table_name); + + part = part.substr(bracket_pos + 1); + boost::trim(part); + buf << '('; + buf << doubleQuoteString(part); + } + else if (part.back() == ')') + { + is_column = false; + part = part.substr(0, part.size() - 1); + boost::trim(part); + buf << doubleQuoteString(part); + buf << ')'; + } + else if (is_column) + { + buf << doubleQuoteString(part); + } + else + { + buf << doubleQuoteWithSchema(part); + } buf << ","; } tables_list = buf.str(); @@ -902,23 +997,28 @@ std::set PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx:: } +template PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure( - pqxx::ReplicationTransaction & tx, const std::string & table_name) const + T & tx, const std::string & table_name) const { PostgreSQLTableStructure structure; - try - { - auto [schema, table] = getSchemaAndTableName(table_name); - structure = fetchPostgreSQLTableStructure(tx, table, schema, true, true, true); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } + auto [schema, table] = getSchemaAndTableName(table_name); + structure = fetchPostgreSQLTableStructure(tx, table, schema, true, true, true, getTableAllowedColumns(table_name)); return std::make_unique(std::move(structure)); } +template +PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure( + pqxx::ReadTransaction & tx, const std::string & table_name) const; + +template +PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure( + pqxx::ReplicationTransaction & tx, const std::string & table_name) const; + +template +PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure( + pqxx::nontransaction & tx, const std::string & table_name) const; void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPostgreSQL * materialized_storage, const String & postgres_table_name) { diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 5c519053d84..8257f92ae1f 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -57,6 +57,8 @@ public: void setSetting(const SettingChange & setting); + Strings getTableAllowedColumns(const std::string & table_name) const; + void cleanupFunc(); private: @@ -94,7 +96,8 @@ private: StorageInfo loadFromSnapshot(postgres::Connection & connection, std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage); - PostgreSQLTableStructurePtr fetchTableStructure(pqxx::ReplicationTransaction & tx, const String & table_name) const; + template + PostgreSQLTableStructurePtr fetchTableStructure(T & tx, const String & table_name) const; String doubleQuoteWithSchema(const String & table_name) const; diff --git a/tests/integration/helpers/postgres_utility.py b/tests/integration/helpers/postgres_utility.py index 468c3b3bb63..76dddd7d0cf 100644 --- a/tests/integration/helpers/postgres_utility.py +++ b/tests/integration/helpers/postgres_utility.py @@ -359,6 +359,7 @@ def check_tables_are_synchronized( postgres_database="postgres_database", materialized_database="test_database", schema_name="", + columns=["*"], ): assert_nested_table_is_created( instance, table_name, materialized_database, schema_name @@ -374,7 +375,7 @@ def check_tables_are_synchronized( result_query = f"select * from {table_path} order by {order_by};" expected = instance.query( - f"select * from `{postgres_database}`.`{table_name}` order by {order_by};" + f"select {','.join(columns)} from `{postgres_database}`.`{table_name}` order by {order_by};" ) result = instance.query(result_query) diff --git a/tests/integration/test_postgresql_replica_database_engine_2/test.py b/tests/integration/test_postgresql_replica_database_engine_2/test.py index 7fdd17625a9..e64c9eb9d1e 100644 --- a/tests/integration/test_postgresql_replica_database_engine_2/test.py +++ b/tests/integration/test_postgresql_replica_database_engine_2/test.py @@ -1141,6 +1141,110 @@ def test_dependent_loading(started_cluster): instance.query(f"DROP TABLE {table} SYNC") +def test_partial_table(started_cluster): + table = "test_partial_table" + + pg_manager.create_postgres_table( + table, + "", + f"""CREATE TABLE {table} ( + key integer PRIMARY KEY, + x integer DEFAULT 0, + y integer, + z text DEFAULT 'z'); + """, + ) + pg_manager.execute(f"insert into {table} (key, x, z) values (1,1,'a');") + pg_manager.execute(f"insert into {table} (key, x, z) values (2,2,'b');") + + pg_manager.create_materialized_db( + ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + settings=[ + f"materialized_postgresql_tables_list = '{table}(z, key)'", + "materialized_postgresql_backoff_min_ms = 100", + "materialized_postgresql_backoff_max_ms = 100", + ], + ) + check_tables_are_synchronized( + instance, + table, + postgres_database=pg_manager.get_default_database(), + columns=["key", "z"], + ) + + pg_manager.execute(f"insert into {table} (key, x, z) values (3,3,'c');") + pg_manager.execute(f"insert into {table} (key, x, z) values (4,4,'d');") + + check_tables_are_synchronized( + instance, + table, + postgres_database=pg_manager.get_default_database(), + columns=["key", "z"], + ) + + +def test_partial_and_full_table(started_cluster): + table = "test_partial_and_full_table" + + pg_manager.create_postgres_table( + table, + "", + f"""CREATE TABLE {table}1 ( + key integer PRIMARY KEY, + x integer DEFAULT 0, + y integer, + z text DEFAULT 'z'); + """, + ) + pg_manager.execute(f"insert into {table}1 (key, x, y, z) values (1,1,1,'1');") + pg_manager.execute(f"insert into {table}1 (key, x, y, z) values (2,2,2,'2');") + pg_manager.create_postgres_table( + table, + "", + f"""CREATE TABLE {table}2 ( + key integer PRIMARY KEY, + x integer DEFAULT 0, + y integer, + z text DEFAULT 'z'); + """, + ) + pg_manager.execute(f"insert into {table}2 (key, x, y, z) values (3,3,3,'3');") + pg_manager.execute(f"insert into {table}2 (key, x, y, z) values (4,4,4,'4');") + + pg_manager.create_materialized_db( + ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + settings=[ + f"materialized_postgresql_tables_list = '{table}1(key, x, z), {table}2'", + "materialized_postgresql_backoff_min_ms = 100", + "materialized_postgresql_backoff_max_ms = 100", + ], + ) + check_tables_are_synchronized( + instance, + f"{table}1", + postgres_database=pg_manager.get_default_database(), + columns=["key", "x", "z"], + ) + check_tables_are_synchronized( + instance, f"{table}2", postgres_database=pg_manager.get_default_database() + ) + + pg_manager.execute(f"insert into {table}1 (key, x, z) values (3,3,'3');") + pg_manager.execute(f"insert into {table}2 (key, x, z) values (5,5,'5');") + + check_tables_are_synchronized( + instance, + f"{table}1", + postgres_database=pg_manager.get_default_database(), + columns=["key", "x", "z"], + ) + check_tables_are_synchronized( + instance, f"{table}2", postgres_database=pg_manager.get_default_database() + ) + + if __name__ == "__main__": cluster.start() input("Cluster created, press any key to destroy...")