Ability to limit columns for tables in MaterializedPostgreSQL

This commit is contained in:
1on 2024-08-30 16:47:41 +03:00
parent 8fbfc61c55
commit 51d770fa7a
7 changed files with 259 additions and 40 deletions

View File

@ -155,6 +155,12 @@ Replication of [**TOAST**](https://www.postgresql.org/docs/9.5/storage-toast.htm
Sets a comma-separated list of PostgreSQL database tables, which will be replicated via [MaterializedPostgreSQL](../../engines/database-engines/materialized-postgresql.md) database engine.
Each table can have subset of replicated columns in brackets. If subset of columns is omitted, then all columns for table will be replicated.
``` sql
materialized_postgresql_tables_list = 'table1(co1, col2),table2,table3(co3, col5, col7)
```
Default value: empty list — means whole PostgreSQL database will be replicated.
### `materialized_postgresql_schema` {#materialized-postgresql-schema}

View File

@ -13,6 +13,7 @@
#include <DataTypes/DataTypeDateTime64.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <boost/algorithm/string/join.hpp>
#include <Common/quoteString.h>
#include <Core/PostgreSQL/Utils.h>
#include <base/FnTraits.h>
@ -292,7 +293,7 @@ PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList(
template<typename T>
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls, bool with_primary_key, bool with_replica_identity_index)
T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns)
{
PostgreSQLTableStructure table;
@ -302,6 +303,10 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
? " AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')"
: fmt::format(" AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = {})", quoteString(postgres_schema));
std::string columns_part;
if (!columns.empty())
columns_part = fmt::format(" AND attname IN ('{}')", boost::algorithm::join(columns, "','"));
std::string query = fmt::format(
"SELECT attname AS name, " /// column name
"format_type(atttypid, atttypmod) AS type, " /// data type
@ -312,9 +317,9 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
"attnum as att_num, "
"attgenerated as generated " /// if column has GENERATED
"FROM pg_attribute "
"WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) "
"WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) {}"
"AND NOT attisdropped AND attnum > 0 "
"ORDER BY attnum ASC", where);
"ORDER BY attnum ASC", where, columns_part);
auto postgres_table_with_schema = postgres_schema.empty() ? postgres_table : doubleQuoteString(postgres_schema) + '.' + doubleQuoteString(postgres_table);
table.physical_columns = readNamesAndTypesList(tx, postgres_table_with_schema, query, use_nulls, false);
@ -415,7 +420,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
PostgreSQLTableStructure fetchPostgreSQLTableStructure(pqxx::connection & connection, const String & postgres_table, const String & postgres_schema, bool use_nulls)
{
pqxx::ReadTransaction tx(connection);
auto result = fetchPostgreSQLTableStructure(tx, postgres_table, postgres_schema, use_nulls, false, false);
auto result = fetchPostgreSQLTableStructure(tx, postgres_table, postgres_schema, use_nulls, false, false, {});
tx.commit();
return result;
}
@ -433,17 +438,17 @@ std::set<String> fetchPostgreSQLTablesList(pqxx::connection & connection, const
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::ReadTransaction & tx, const String & postgres_table, const String & postgres_schema,
bool use_nulls, bool with_primary_key, bool with_replica_identity_index);
bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns);
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::ReplicationTransaction & tx, const String & postgres_table, const String & postgres_schema,
bool use_nulls, bool with_primary_key, bool with_replica_identity_index);
bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns);
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::nontransaction & tx, const String & postgres_table, const String & postrges_schema,
bool use_nulls, bool with_primary_key, bool with_replica_identity_index);
bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns);
std::set<String> fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema);

View File

@ -48,7 +48,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
template<typename T>
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls = true,
bool with_primary_key = false, bool with_replica_identity_index = false);
bool with_primary_key = false, bool with_replica_identity_index = false, const Strings & columns = {});
template<typename T>
std::set<String> fetchPostgreSQLTablesList(T & tx, const String & postgres_schema);

View File

@ -348,11 +348,10 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
try
{
auto [postgres_table_schema, postgres_table_name] = getSchemaAndTableName(table_name);
auto table_structure = fetchPostgreSQLTableStructure(tx, postgres_table_name, postgres_table_schema, true, true, true);
if (!table_structure.physical_columns)
auto table_structure = fetchTableStructure(tx, table_name);
if (!table_structure->physical_columns)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No columns");
auto storage_info = StorageInfo(materialized_storage->getNested(), table_structure.physical_columns->attributes);
auto storage_info = StorageInfo(materialized_storage->getNested(), table_structure->physical_columns->attributes);
nested_storages.emplace(table_name, std::move(storage_info));
}
catch (Exception & e)
@ -399,9 +398,7 @@ ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializ
postgres::Connection connection(connection_info);
pqxx::nontransaction tx(connection.getRef());
auto [postgres_table_schema, postgres_table_name] = getSchemaAndTableName(table_name);
auto table_structure = std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, postgres_table_name, postgres_table_schema, true, true, true));
auto table_structure = fetchTableStructure(tx, table_name);
auto table_override = tryGetTableOverride(current_database_name, table_name);
return storage->getCreateNestedTableQuery(std::move(table_structure), table_override ? table_override->as<ASTTableOverride>() : nullptr);
}
@ -415,16 +412,35 @@ StorageInfo PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection
std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name);
tx->exec(query_str);
auto table_structure = fetchTableStructure(*tx, table_name);
PostgreSQLTableStructurePtr table_structure;
try
{
table_structure = fetchTableStructure(*tx, table_name);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
table_structure = std::make_unique<PostgreSQLTableStructure>();
}
if (!table_structure->physical_columns)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No table attributes");
auto table_attributes = table_structure->physical_columns->attributes;
auto columns = getTableAllowedColumns(table_name);
/// Load from snapshot, which will show table state before creation of replication slot.
/// Already connected to needed database, no need to add it to query.
auto quoted_name = doubleQuoteWithSchema(table_name);
query_str = fmt::format("SELECT * FROM ONLY {}", quoted_name);
if (columns.empty())
query_str = fmt::format("SELECT * FROM ONLY {}", quoted_name);
else
{
/// We should not use columns list from getTableAllowedColumns because it may have broken columns order
Strings allowed_columns;
for (const auto & column : table_structure->physical_columns->columns)
allowed_columns.push_back(column.name);
query_str = fmt::format("SELECT {} FROM ONLY {}", boost::algorithm::join(allowed_columns, ","), quoted_name);
}
LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name);
@ -700,6 +716,37 @@ void PostgreSQLReplicationHandler::setSetting(const SettingChange & setting)
}
/// Allowed columns for table from materialized_postgresql_tables_list setting
Strings PostgreSQLReplicationHandler::getTableAllowedColumns(const std::string & table_name) const
{
Strings result;
if (tables_list.empty())
return result;
size_t table_pos = tables_list.find(table_name);
if (table_pos == std::string::npos)
{
return result;
}
if (table_pos + table_name.length() + 1 > tables_list.length())
{
return result;
}
String column_list = tables_list.substr(table_pos + table_name.length() + 1);
column_list.erase(std::remove(column_list.begin(), column_list.end(), '"'), column_list.end());
boost::trim(column_list);
if (column_list.empty() || column_list[0] != '(')
return result;
size_t end_bracket_pos = column_list.find(')');
column_list = column_list.substr(1, end_bracket_pos - 1);
splitInto<','>(result, column_list);
return result;
}
void PostgreSQLReplicationHandler::shutdownFinal()
{
try
@ -749,11 +796,27 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
Strings expected_tables;
if (!tables_list.empty())
{
splitInto<','>(expected_tables, tables_list);
if (expected_tables.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse tables list: {}", tables_list);
for (auto & table_name : expected_tables)
boost::trim(table_name);
/// Removing columns `table(col1, col2)` from tables_list
String cleared_tables_list = tables_list;
while (true)
{
size_t start_bracket_pos = cleared_tables_list.find('(');
size_t end_bracket_pos = cleared_tables_list.find(')');
if (start_bracket_pos == std::string::npos || end_bracket_pos == std::string::npos)
{
break;
}
cleared_tables_list = cleared_tables_list.substr(0, start_bracket_pos) + cleared_tables_list.substr(end_bracket_pos + 1);
}
splitInto<','>(expected_tables, cleared_tables_list);
if (expected_tables.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse tables list: {}", tables_list);
for (auto & table_name : expected_tables)
{
boost::trim(table_name);
}
}
/// Try to fetch tables list from publication if there is not tables list.
@ -864,18 +927,50 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
/// `schema1.table1, schema2.table2, ...` -> `"schema1"."table1", "schema2"."table2", ...`
/// or
/// `table1, table2, ...` + setting `schema` -> `"schema"."table1", "schema"."table2", ...`
/// or
/// `table1, table2(id,name), ...` + setting `schema` -> `"schema"."table1", "schema"."table2"("id","name"), ...`
if (!tables_list.empty())
{
Strings tables_names;
splitInto<','>(tables_names, tables_list);
if (tables_names.empty())
Strings parts;
splitInto<','>(parts, tables_list);
if (parts.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Empty list of tables");
bool is_column = false;
WriteBufferFromOwnString buf;
for (auto & table_name : tables_names)
for (auto & part : parts)
{
boost::trim(table_name);
buf << doubleQuoteWithSchema(table_name);
boost::trim(part);
size_t bracket_pos = part.find('(');
if (bracket_pos != std::string::npos)
{
is_column = true;
std::string table_name = part.substr(0, bracket_pos);
boost::trim(table_name);
buf << doubleQuoteWithSchema(table_name);
part = part.substr(bracket_pos + 1);
boost::trim(part);
buf << '(';
buf << doubleQuoteString(part);
}
else if (part.back() == ')')
{
is_column = false;
part = part.substr(0, part.size() - 1);
boost::trim(part);
buf << doubleQuoteString(part);
buf << ')';
}
else if (is_column)
{
buf << doubleQuoteString(part);
}
else
{
buf << doubleQuoteWithSchema(part);
}
buf << ",";
}
tables_list = buf.str();
@ -902,23 +997,28 @@ std::set<String> PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::
}
template<typename T>
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
pqxx::ReplicationTransaction & tx, const std::string & table_name) const
T & tx, const std::string & table_name) const
{
PostgreSQLTableStructure structure;
try
{
auto [schema, table] = getSchemaAndTableName(table_name);
structure = fetchPostgreSQLTableStructure(tx, table, schema, true, true, true);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
auto [schema, table] = getSchemaAndTableName(table_name);
structure = fetchPostgreSQLTableStructure(tx, table, schema, true, true, true, getTableAllowedColumns(table_name));
return std::make_unique<PostgreSQLTableStructure>(std::move(structure));
}
template
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
pqxx::ReadTransaction & tx, const std::string & table_name) const;
template
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
pqxx::ReplicationTransaction & tx, const std::string & table_name) const;
template
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
pqxx::nontransaction & tx, const std::string & table_name) const;
void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPostgreSQL * materialized_storage, const String & postgres_table_name)
{

View File

@ -57,6 +57,8 @@ public:
void setSetting(const SettingChange & setting);
Strings getTableAllowedColumns(const std::string & table_name) const;
void cleanupFunc();
private:
@ -94,7 +96,8 @@ private:
StorageInfo loadFromSnapshot(postgres::Connection & connection, std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage);
PostgreSQLTableStructurePtr fetchTableStructure(pqxx::ReplicationTransaction & tx, const String & table_name) const;
template<typename T>
PostgreSQLTableStructurePtr fetchTableStructure(T & tx, const String & table_name) const;
String doubleQuoteWithSchema(const String & table_name) const;

View File

@ -359,6 +359,7 @@ def check_tables_are_synchronized(
postgres_database="postgres_database",
materialized_database="test_database",
schema_name="",
columns=["*"],
):
assert_nested_table_is_created(
instance, table_name, materialized_database, schema_name
@ -374,7 +375,7 @@ def check_tables_are_synchronized(
result_query = f"select * from {table_path} order by {order_by};"
expected = instance.query(
f"select * from `{postgres_database}`.`{table_name}` order by {order_by};"
f"select {','.join(columns)} from `{postgres_database}`.`{table_name}` order by {order_by};"
)
result = instance.query(result_query)

View File

@ -1141,6 +1141,110 @@ def test_dependent_loading(started_cluster):
instance.query(f"DROP TABLE {table} SYNC")
def test_partial_table(started_cluster):
table = "test_partial_table"
pg_manager.create_postgres_table(
table,
"",
f"""CREATE TABLE {table} (
key integer PRIMARY KEY,
x integer DEFAULT 0,
y integer,
z text DEFAULT 'z');
""",
)
pg_manager.execute(f"insert into {table} (key, x, z) values (1,1,'a');")
pg_manager.execute(f"insert into {table} (key, x, z) values (2,2,'b');")
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=[
f"materialized_postgresql_tables_list = '{table}(z, key)'",
"materialized_postgresql_backoff_min_ms = 100",
"materialized_postgresql_backoff_max_ms = 100",
],
)
check_tables_are_synchronized(
instance,
table,
postgres_database=pg_manager.get_default_database(),
columns=["key", "z"],
)
pg_manager.execute(f"insert into {table} (key, x, z) values (3,3,'c');")
pg_manager.execute(f"insert into {table} (key, x, z) values (4,4,'d');")
check_tables_are_synchronized(
instance,
table,
postgres_database=pg_manager.get_default_database(),
columns=["key", "z"],
)
def test_partial_and_full_table(started_cluster):
table = "test_partial_and_full_table"
pg_manager.create_postgres_table(
table,
"",
f"""CREATE TABLE {table}1 (
key integer PRIMARY KEY,
x integer DEFAULT 0,
y integer,
z text DEFAULT 'z');
""",
)
pg_manager.execute(f"insert into {table}1 (key, x, y, z) values (1,1,1,'1');")
pg_manager.execute(f"insert into {table}1 (key, x, y, z) values (2,2,2,'2');")
pg_manager.create_postgres_table(
table,
"",
f"""CREATE TABLE {table}2 (
key integer PRIMARY KEY,
x integer DEFAULT 0,
y integer,
z text DEFAULT 'z');
""",
)
pg_manager.execute(f"insert into {table}2 (key, x, y, z) values (3,3,3,'3');")
pg_manager.execute(f"insert into {table}2 (key, x, y, z) values (4,4,4,'4');")
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=[
f"materialized_postgresql_tables_list = '{table}1(key, x, z), {table}2'",
"materialized_postgresql_backoff_min_ms = 100",
"materialized_postgresql_backoff_max_ms = 100",
],
)
check_tables_are_synchronized(
instance,
f"{table}1",
postgres_database=pg_manager.get_default_database(),
columns=["key", "x", "z"],
)
check_tables_are_synchronized(
instance, f"{table}2", postgres_database=pg_manager.get_default_database()
)
pg_manager.execute(f"insert into {table}1 (key, x, z) values (3,3,'3');")
pg_manager.execute(f"insert into {table}2 (key, x, z) values (5,5,'5');")
check_tables_are_synchronized(
instance,
f"{table}1",
postgres_database=pg_manager.get_default_database(),
columns=["key", "x", "z"],
)
check_tables_are_synchronized(
instance, f"{table}2", postgres_database=pg_manager.get_default_database()
)
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")