mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #69092 from 1on/master
Ability to limit columns for tables in MaterializedPostgreSQL
This commit is contained in:
commit
0369aaea87
@ -155,6 +155,12 @@ Replication of [**TOAST**](https://www.postgresql.org/docs/9.5/storage-toast.htm
|
|||||||
|
|
||||||
Sets a comma-separated list of PostgreSQL database tables, which will be replicated via [MaterializedPostgreSQL](../../engines/database-engines/materialized-postgresql.md) database engine.
|
Sets a comma-separated list of PostgreSQL database tables, which will be replicated via [MaterializedPostgreSQL](../../engines/database-engines/materialized-postgresql.md) database engine.
|
||||||
|
|
||||||
|
Each table can have subset of replicated columns in brackets. If subset of columns is omitted, then all columns for table will be replicated.
|
||||||
|
|
||||||
|
``` sql
|
||||||
|
materialized_postgresql_tables_list = 'table1(co1, col2),table2,table3(co3, col5, col7)
|
||||||
|
```
|
||||||
|
|
||||||
Default value: empty list — means whole PostgreSQL database will be replicated.
|
Default value: empty list — means whole PostgreSQL database will be replicated.
|
||||||
|
|
||||||
### `materialized_postgresql_schema` {#materialized-postgresql-schema}
|
### `materialized_postgresql_schema` {#materialized-postgresql-schema}
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
#include <DataTypes/DataTypeDateTime64.h>
|
#include <DataTypes/DataTypeDateTime64.h>
|
||||||
#include <boost/algorithm/string/split.hpp>
|
#include <boost/algorithm/string/split.hpp>
|
||||||
#include <boost/algorithm/string/trim.hpp>
|
#include <boost/algorithm/string/trim.hpp>
|
||||||
|
#include <boost/algorithm/string/join.hpp>
|
||||||
#include <Common/quoteString.h>
|
#include <Common/quoteString.h>
|
||||||
#include <Core/PostgreSQL/Utils.h>
|
#include <Core/PostgreSQL/Utils.h>
|
||||||
#include <base/FnTraits.h>
|
#include <base/FnTraits.h>
|
||||||
@ -292,7 +293,7 @@ PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList(
|
|||||||
|
|
||||||
template<typename T>
|
template<typename T>
|
||||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||||
T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls, bool with_primary_key, bool with_replica_identity_index)
|
T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns)
|
||||||
{
|
{
|
||||||
PostgreSQLTableStructure table;
|
PostgreSQLTableStructure table;
|
||||||
|
|
||||||
@ -302,6 +303,10 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
|||||||
? " AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')"
|
? " AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')"
|
||||||
: fmt::format(" AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = {})", quoteString(postgres_schema));
|
: fmt::format(" AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = {})", quoteString(postgres_schema));
|
||||||
|
|
||||||
|
std::string columns_part;
|
||||||
|
if (!columns.empty())
|
||||||
|
columns_part = fmt::format(" AND attname IN ('{}')", boost::algorithm::join(columns, "','"));
|
||||||
|
|
||||||
std::string query = fmt::format(
|
std::string query = fmt::format(
|
||||||
"SELECT attname AS name, " /// column name
|
"SELECT attname AS name, " /// column name
|
||||||
"format_type(atttypid, atttypmod) AS type, " /// data type
|
"format_type(atttypid, atttypmod) AS type, " /// data type
|
||||||
@ -312,9 +317,9 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
|||||||
"attnum as att_num, "
|
"attnum as att_num, "
|
||||||
"attgenerated as generated " /// if column has GENERATED
|
"attgenerated as generated " /// if column has GENERATED
|
||||||
"FROM pg_attribute "
|
"FROM pg_attribute "
|
||||||
"WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) "
|
"WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) {}"
|
||||||
"AND NOT attisdropped AND attnum > 0 "
|
"AND NOT attisdropped AND attnum > 0 "
|
||||||
"ORDER BY attnum ASC", where);
|
"ORDER BY attnum ASC", where, columns_part);
|
||||||
|
|
||||||
auto postgres_table_with_schema = postgres_schema.empty() ? postgres_table : doubleQuoteString(postgres_schema) + '.' + doubleQuoteString(postgres_table);
|
auto postgres_table_with_schema = postgres_schema.empty() ? postgres_table : doubleQuoteString(postgres_schema) + '.' + doubleQuoteString(postgres_table);
|
||||||
table.physical_columns = readNamesAndTypesList(tx, postgres_table_with_schema, query, use_nulls, false);
|
table.physical_columns = readNamesAndTypesList(tx, postgres_table_with_schema, query, use_nulls, false);
|
||||||
@ -415,7 +420,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
|||||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(pqxx::connection & connection, const String & postgres_table, const String & postgres_schema, bool use_nulls)
|
PostgreSQLTableStructure fetchPostgreSQLTableStructure(pqxx::connection & connection, const String & postgres_table, const String & postgres_schema, bool use_nulls)
|
||||||
{
|
{
|
||||||
pqxx::ReadTransaction tx(connection);
|
pqxx::ReadTransaction tx(connection);
|
||||||
auto result = fetchPostgreSQLTableStructure(tx, postgres_table, postgres_schema, use_nulls, false, false);
|
auto result = fetchPostgreSQLTableStructure(tx, postgres_table, postgres_schema, use_nulls, false, false, {});
|
||||||
tx.commit();
|
tx.commit();
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
@ -433,17 +438,17 @@ std::set<String> fetchPostgreSQLTablesList(pqxx::connection & connection, const
|
|||||||
template
|
template
|
||||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||||
pqxx::ReadTransaction & tx, const String & postgres_table, const String & postgres_schema,
|
pqxx::ReadTransaction & tx, const String & postgres_table, const String & postgres_schema,
|
||||||
bool use_nulls, bool with_primary_key, bool with_replica_identity_index);
|
bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns);
|
||||||
|
|
||||||
template
|
template
|
||||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||||
pqxx::ReplicationTransaction & tx, const String & postgres_table, const String & postgres_schema,
|
pqxx::ReplicationTransaction & tx, const String & postgres_table, const String & postgres_schema,
|
||||||
bool use_nulls, bool with_primary_key, bool with_replica_identity_index);
|
bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns);
|
||||||
|
|
||||||
template
|
template
|
||||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||||
pqxx::nontransaction & tx, const String & postgres_table, const String & postrges_schema,
|
pqxx::nontransaction & tx, const String & postgres_table, const String & postrges_schema,
|
||||||
bool use_nulls, bool with_primary_key, bool with_replica_identity_index);
|
bool use_nulls, bool with_primary_key, bool with_replica_identity_index, const Strings & columns);
|
||||||
|
|
||||||
std::set<String> fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema);
|
std::set<String> fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema);
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
|||||||
template<typename T>
|
template<typename T>
|
||||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||||
T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls = true,
|
T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls = true,
|
||||||
bool with_primary_key = false, bool with_replica_identity_index = false);
|
bool with_primary_key = false, bool with_replica_identity_index = false, const Strings & columns = {});
|
||||||
|
|
||||||
template<typename T>
|
template<typename T>
|
||||||
std::set<String> fetchPostgreSQLTablesList(T & tx, const String & postgres_schema);
|
std::set<String> fetchPostgreSQLTablesList(T & tx, const String & postgres_schema);
|
||||||
|
@ -348,11 +348,10 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
|
|||||||
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
|
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto [postgres_table_schema, postgres_table_name] = getSchemaAndTableName(table_name);
|
auto table_structure = fetchTableStructure(tx, table_name);
|
||||||
auto table_structure = fetchPostgreSQLTableStructure(tx, postgres_table_name, postgres_table_schema, true, true, true);
|
if (!table_structure->physical_columns)
|
||||||
if (!table_structure.physical_columns)
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "No columns");
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "No columns");
|
||||||
auto storage_info = StorageInfo(materialized_storage->getNested(), table_structure.physical_columns->attributes);
|
auto storage_info = StorageInfo(materialized_storage->getNested(), table_structure->physical_columns->attributes);
|
||||||
nested_storages.emplace(table_name, std::move(storage_info));
|
nested_storages.emplace(table_name, std::move(storage_info));
|
||||||
}
|
}
|
||||||
catch (Exception & e)
|
catch (Exception & e)
|
||||||
@ -399,9 +398,7 @@ ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializ
|
|||||||
postgres::Connection connection(connection_info);
|
postgres::Connection connection(connection_info);
|
||||||
pqxx::nontransaction tx(connection.getRef());
|
pqxx::nontransaction tx(connection.getRef());
|
||||||
|
|
||||||
auto [postgres_table_schema, postgres_table_name] = getSchemaAndTableName(table_name);
|
auto table_structure = fetchTableStructure(tx, table_name);
|
||||||
auto table_structure = std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, postgres_table_name, postgres_table_schema, true, true, true));
|
|
||||||
|
|
||||||
auto table_override = tryGetTableOverride(current_database_name, table_name);
|
auto table_override = tryGetTableOverride(current_database_name, table_name);
|
||||||
return storage->getCreateNestedTableQuery(std::move(table_structure), table_override ? table_override->as<ASTTableOverride>() : nullptr);
|
return storage->getCreateNestedTableQuery(std::move(table_structure), table_override ? table_override->as<ASTTableOverride>() : nullptr);
|
||||||
}
|
}
|
||||||
@ -415,16 +412,35 @@ StorageInfo PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection
|
|||||||
std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name);
|
std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name);
|
||||||
tx->exec(query_str);
|
tx->exec(query_str);
|
||||||
|
|
||||||
auto table_structure = fetchTableStructure(*tx, table_name);
|
PostgreSQLTableStructurePtr table_structure;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
table_structure = fetchTableStructure(*tx, table_name);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
table_structure = std::make_unique<PostgreSQLTableStructure>();
|
||||||
|
}
|
||||||
if (!table_structure->physical_columns)
|
if (!table_structure->physical_columns)
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "No table attributes");
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "No table attributes");
|
||||||
|
|
||||||
auto table_attributes = table_structure->physical_columns->attributes;
|
auto table_attributes = table_structure->physical_columns->attributes;
|
||||||
|
auto columns = getTableAllowedColumns(table_name);
|
||||||
|
|
||||||
/// Load from snapshot, which will show table state before creation of replication slot.
|
/// Load from snapshot, which will show table state before creation of replication slot.
|
||||||
/// Already connected to needed database, no need to add it to query.
|
/// Already connected to needed database, no need to add it to query.
|
||||||
auto quoted_name = doubleQuoteWithSchema(table_name);
|
auto quoted_name = doubleQuoteWithSchema(table_name);
|
||||||
query_str = fmt::format("SELECT * FROM ONLY {}", quoted_name);
|
if (columns.empty())
|
||||||
|
query_str = fmt::format("SELECT * FROM ONLY {}", quoted_name);
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// We should not use columns list from getTableAllowedColumns because it may have broken columns order
|
||||||
|
Strings allowed_columns;
|
||||||
|
for (const auto & column : table_structure->physical_columns->columns)
|
||||||
|
allowed_columns.push_back(column.name);
|
||||||
|
query_str = fmt::format("SELECT {} FROM ONLY {}", boost::algorithm::join(allowed_columns, ","), quoted_name);
|
||||||
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name);
|
LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name);
|
||||||
|
|
||||||
@ -700,6 +716,37 @@ void PostgreSQLReplicationHandler::setSetting(const SettingChange & setting)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Allowed columns for table from materialized_postgresql_tables_list setting
|
||||||
|
Strings PostgreSQLReplicationHandler::getTableAllowedColumns(const std::string & table_name) const
|
||||||
|
{
|
||||||
|
Strings result;
|
||||||
|
if (tables_list.empty())
|
||||||
|
return result;
|
||||||
|
|
||||||
|
size_t table_pos = tables_list.find(table_name);
|
||||||
|
if (table_pos == std::string::npos)
|
||||||
|
{
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (table_pos + table_name.length() + 1 > tables_list.length())
|
||||||
|
{
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
String column_list = tables_list.substr(table_pos + table_name.length() + 1);
|
||||||
|
column_list.erase(std::remove(column_list.begin(), column_list.end(), '"'), column_list.end());
|
||||||
|
boost::trim(column_list);
|
||||||
|
if (column_list.empty() || column_list[0] != '(')
|
||||||
|
return result;
|
||||||
|
|
||||||
|
size_t end_bracket_pos = column_list.find(')');
|
||||||
|
column_list = column_list.substr(1, end_bracket_pos - 1);
|
||||||
|
splitInto<','>(result, column_list);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void PostgreSQLReplicationHandler::shutdownFinal()
|
void PostgreSQLReplicationHandler::shutdownFinal()
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
@ -749,11 +796,27 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
|
|||||||
Strings expected_tables;
|
Strings expected_tables;
|
||||||
if (!tables_list.empty())
|
if (!tables_list.empty())
|
||||||
{
|
{
|
||||||
splitInto<','>(expected_tables, tables_list);
|
/// Removing columns `table(col1, col2)` from tables_list
|
||||||
if (expected_tables.empty())
|
String cleared_tables_list = tables_list;
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse tables list: {}", tables_list);
|
while (true)
|
||||||
for (auto & table_name : expected_tables)
|
{
|
||||||
boost::trim(table_name);
|
size_t start_bracket_pos = cleared_tables_list.find('(');
|
||||||
|
size_t end_bracket_pos = cleared_tables_list.find(')');
|
||||||
|
if (start_bracket_pos == std::string::npos || end_bracket_pos == std::string::npos)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
cleared_tables_list = cleared_tables_list.substr(0, start_bracket_pos) + cleared_tables_list.substr(end_bracket_pos + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
splitInto<','>(expected_tables, cleared_tables_list);
|
||||||
|
if (expected_tables.empty())
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse tables list: {}", tables_list);
|
||||||
|
|
||||||
|
for (auto & table_name : expected_tables)
|
||||||
|
{
|
||||||
|
boost::trim(table_name);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Try to fetch tables list from publication if there is not tables list.
|
/// Try to fetch tables list from publication if there is not tables list.
|
||||||
@ -864,18 +927,50 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
|
|||||||
/// `schema1.table1, schema2.table2, ...` -> `"schema1"."table1", "schema2"."table2", ...`
|
/// `schema1.table1, schema2.table2, ...` -> `"schema1"."table1", "schema2"."table2", ...`
|
||||||
/// or
|
/// or
|
||||||
/// `table1, table2, ...` + setting `schema` -> `"schema"."table1", "schema"."table2", ...`
|
/// `table1, table2, ...` + setting `schema` -> `"schema"."table1", "schema"."table2", ...`
|
||||||
|
/// or
|
||||||
|
/// `table1, table2(id,name), ...` + setting `schema` -> `"schema"."table1", "schema"."table2"("id","name"), ...`
|
||||||
if (!tables_list.empty())
|
if (!tables_list.empty())
|
||||||
{
|
{
|
||||||
Strings tables_names;
|
Strings parts;
|
||||||
splitInto<','>(tables_names, tables_list);
|
splitInto<','>(parts, tables_list);
|
||||||
if (tables_names.empty())
|
if (parts.empty())
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Empty list of tables");
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Empty list of tables");
|
||||||
|
|
||||||
|
bool is_column = false;
|
||||||
WriteBufferFromOwnString buf;
|
WriteBufferFromOwnString buf;
|
||||||
for (auto & table_name : tables_names)
|
for (auto & part : parts)
|
||||||
{
|
{
|
||||||
boost::trim(table_name);
|
boost::trim(part);
|
||||||
buf << doubleQuoteWithSchema(table_name);
|
|
||||||
|
size_t bracket_pos = part.find('(');
|
||||||
|
if (bracket_pos != std::string::npos)
|
||||||
|
{
|
||||||
|
is_column = true;
|
||||||
|
std::string table_name = part.substr(0, bracket_pos);
|
||||||
|
boost::trim(table_name);
|
||||||
|
buf << doubleQuoteWithSchema(table_name);
|
||||||
|
|
||||||
|
part = part.substr(bracket_pos + 1);
|
||||||
|
boost::trim(part);
|
||||||
|
buf << '(';
|
||||||
|
buf << doubleQuoteString(part);
|
||||||
|
}
|
||||||
|
else if (part.back() == ')')
|
||||||
|
{
|
||||||
|
is_column = false;
|
||||||
|
part = part.substr(0, part.size() - 1);
|
||||||
|
boost::trim(part);
|
||||||
|
buf << doubleQuoteString(part);
|
||||||
|
buf << ')';
|
||||||
|
}
|
||||||
|
else if (is_column)
|
||||||
|
{
|
||||||
|
buf << doubleQuoteString(part);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
buf << doubleQuoteWithSchema(part);
|
||||||
|
}
|
||||||
buf << ",";
|
buf << ",";
|
||||||
}
|
}
|
||||||
tables_list = buf.str();
|
tables_list = buf.str();
|
||||||
@ -902,23 +997,28 @@ std::set<String> PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
|
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
|
||||||
pqxx::ReplicationTransaction & tx, const std::string & table_name) const
|
T & tx, const std::string & table_name) const
|
||||||
{
|
{
|
||||||
PostgreSQLTableStructure structure;
|
PostgreSQLTableStructure structure;
|
||||||
try
|
auto [schema, table] = getSchemaAndTableName(table_name);
|
||||||
{
|
structure = fetchPostgreSQLTableStructure(tx, table, schema, true, true, true, getTableAllowedColumns(table_name));
|
||||||
auto [schema, table] = getSchemaAndTableName(table_name);
|
|
||||||
structure = fetchPostgreSQLTableStructure(tx, table, schema, true, true, true);
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
||||||
}
|
|
||||||
|
|
||||||
return std::make_unique<PostgreSQLTableStructure>(std::move(structure));
|
return std::make_unique<PostgreSQLTableStructure>(std::move(structure));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template
|
||||||
|
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
|
||||||
|
pqxx::ReadTransaction & tx, const std::string & table_name) const;
|
||||||
|
|
||||||
|
template
|
||||||
|
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
|
||||||
|
pqxx::ReplicationTransaction & tx, const std::string & table_name) const;
|
||||||
|
|
||||||
|
template
|
||||||
|
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
|
||||||
|
pqxx::nontransaction & tx, const std::string & table_name) const;
|
||||||
|
|
||||||
void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPostgreSQL * materialized_storage, const String & postgres_table_name)
|
void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPostgreSQL * materialized_storage, const String & postgres_table_name)
|
||||||
{
|
{
|
||||||
|
@ -57,6 +57,8 @@ public:
|
|||||||
|
|
||||||
void setSetting(const SettingChange & setting);
|
void setSetting(const SettingChange & setting);
|
||||||
|
|
||||||
|
Strings getTableAllowedColumns(const std::string & table_name) const;
|
||||||
|
|
||||||
void cleanupFunc();
|
void cleanupFunc();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -94,7 +96,8 @@ private:
|
|||||||
|
|
||||||
StorageInfo loadFromSnapshot(postgres::Connection & connection, std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage);
|
StorageInfo loadFromSnapshot(postgres::Connection & connection, std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage);
|
||||||
|
|
||||||
PostgreSQLTableStructurePtr fetchTableStructure(pqxx::ReplicationTransaction & tx, const String & table_name) const;
|
template<typename T>
|
||||||
|
PostgreSQLTableStructurePtr fetchTableStructure(T & tx, const String & table_name) const;
|
||||||
|
|
||||||
String doubleQuoteWithSchema(const String & table_name) const;
|
String doubleQuoteWithSchema(const String & table_name) const;
|
||||||
|
|
||||||
|
@ -363,6 +363,7 @@ def check_tables_are_synchronized(
|
|||||||
postgres_database="postgres_database",
|
postgres_database="postgres_database",
|
||||||
materialized_database="test_database",
|
materialized_database="test_database",
|
||||||
schema_name="",
|
schema_name="",
|
||||||
|
columns=["*"],
|
||||||
):
|
):
|
||||||
assert_nested_table_is_created(
|
assert_nested_table_is_created(
|
||||||
instance, table_name, materialized_database, schema_name
|
instance, table_name, materialized_database, schema_name
|
||||||
@ -378,7 +379,7 @@ def check_tables_are_synchronized(
|
|||||||
result_query = f"select * from {table_path} order by {order_by};"
|
result_query = f"select * from {table_path} order by {order_by};"
|
||||||
|
|
||||||
expected = instance.query(
|
expected = instance.query(
|
||||||
f"select * from `{postgres_database}`.`{table_name}` order by {order_by};"
|
f"select {','.join(columns)} from `{postgres_database}`.`{table_name}` order by {order_by};"
|
||||||
)
|
)
|
||||||
result = instance.query(result_query)
|
result = instance.query(result_query)
|
||||||
|
|
||||||
|
@ -1148,6 +1148,110 @@ def test_dependent_loading(started_cluster):
|
|||||||
instance.query(f"DROP TABLE {table} SYNC")
|
instance.query(f"DROP TABLE {table} SYNC")
|
||||||
|
|
||||||
|
|
||||||
|
def test_partial_table(started_cluster):
|
||||||
|
table = "test_partial_table"
|
||||||
|
|
||||||
|
pg_manager.create_postgres_table(
|
||||||
|
table,
|
||||||
|
"",
|
||||||
|
f"""CREATE TABLE {table} (
|
||||||
|
key integer PRIMARY KEY,
|
||||||
|
x integer DEFAULT 0,
|
||||||
|
y integer,
|
||||||
|
z text DEFAULT 'z');
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
pg_manager.execute(f"insert into {table} (key, x, z) values (1,1,'a');")
|
||||||
|
pg_manager.execute(f"insert into {table} (key, x, z) values (2,2,'b');")
|
||||||
|
|
||||||
|
pg_manager.create_materialized_db(
|
||||||
|
ip=started_cluster.postgres_ip,
|
||||||
|
port=started_cluster.postgres_port,
|
||||||
|
settings=[
|
||||||
|
f"materialized_postgresql_tables_list = '{table}(z, key)'",
|
||||||
|
"materialized_postgresql_backoff_min_ms = 100",
|
||||||
|
"materialized_postgresql_backoff_max_ms = 100",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
table,
|
||||||
|
postgres_database=pg_manager.get_default_database(),
|
||||||
|
columns=["key", "z"],
|
||||||
|
)
|
||||||
|
|
||||||
|
pg_manager.execute(f"insert into {table} (key, x, z) values (3,3,'c');")
|
||||||
|
pg_manager.execute(f"insert into {table} (key, x, z) values (4,4,'d');")
|
||||||
|
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
table,
|
||||||
|
postgres_database=pg_manager.get_default_database(),
|
||||||
|
columns=["key", "z"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_partial_and_full_table(started_cluster):
|
||||||
|
table = "test_partial_and_full_table"
|
||||||
|
|
||||||
|
pg_manager.create_postgres_table(
|
||||||
|
table,
|
||||||
|
"",
|
||||||
|
f"""CREATE TABLE {table}1 (
|
||||||
|
key integer PRIMARY KEY,
|
||||||
|
x integer DEFAULT 0,
|
||||||
|
y integer,
|
||||||
|
z text DEFAULT 'z');
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
pg_manager.execute(f"insert into {table}1 (key, x, y, z) values (1,1,1,'1');")
|
||||||
|
pg_manager.execute(f"insert into {table}1 (key, x, y, z) values (2,2,2,'2');")
|
||||||
|
pg_manager.create_postgres_table(
|
||||||
|
table,
|
||||||
|
"",
|
||||||
|
f"""CREATE TABLE {table}2 (
|
||||||
|
key integer PRIMARY KEY,
|
||||||
|
x integer DEFAULT 0,
|
||||||
|
y integer,
|
||||||
|
z text DEFAULT 'z');
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
pg_manager.execute(f"insert into {table}2 (key, x, y, z) values (3,3,3,'3');")
|
||||||
|
pg_manager.execute(f"insert into {table}2 (key, x, y, z) values (4,4,4,'4');")
|
||||||
|
|
||||||
|
pg_manager.create_materialized_db(
|
||||||
|
ip=started_cluster.postgres_ip,
|
||||||
|
port=started_cluster.postgres_port,
|
||||||
|
settings=[
|
||||||
|
f"materialized_postgresql_tables_list = '{table}1(key, x, z), {table}2'",
|
||||||
|
"materialized_postgresql_backoff_min_ms = 100",
|
||||||
|
"materialized_postgresql_backoff_max_ms = 100",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
f"{table}1",
|
||||||
|
postgres_database=pg_manager.get_default_database(),
|
||||||
|
columns=["key", "x", "z"],
|
||||||
|
)
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance, f"{table}2", postgres_database=pg_manager.get_default_database()
|
||||||
|
)
|
||||||
|
|
||||||
|
pg_manager.execute(f"insert into {table}1 (key, x, z) values (3,3,'3');")
|
||||||
|
pg_manager.execute(f"insert into {table}2 (key, x, z) values (5,5,'5');")
|
||||||
|
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
f"{table}1",
|
||||||
|
postgres_database=pg_manager.get_default_database(),
|
||||||
|
columns=["key", "x", "z"],
|
||||||
|
)
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance, f"{table}2", postgres_database=pg_manager.get_default_database()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
cluster.start()
|
cluster.start()
|
||||||
input("Cluster created, press any key to destroy...")
|
input("Cluster created, press any key to destroy...")
|
||||||
|
Loading…
Reference in New Issue
Block a user