Merge pull request #28933 from kssenii/materialized-postgresql-fix

MaterializedPostgreSQL: fully support non-default postgres schema
This commit is contained in:
Kseniia Sumarokova 2021-12-01 09:37:12 +03:00 committed by GitHub
commit a97938e2d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 942 additions and 276 deletions

View File

@ -39,12 +39,16 @@ DETACH TABLE postgres_database.table_to_remove;
## Settings {#settings}
- [materialized_postgresql_max_block_size](../../operations/settings/settings.md#materialized-postgresql-max-block-size)
- [materialized_postgresql_tables_list](../../operations/settings/settings.md#materialized-postgresql-tables-list)
- [materialized_postgresql_schema](../../operations/settings/settings.md#materialized-postgresql-schema)
- [materialized_postgresql_schema_list](../../operations/settings/settings.md#materialized-postgresql-schema-list)
- [materialized_postgresql_allow_automatic_update](../../operations/settings/settings.md#materialized-postgresql-allow-automatic-update)
- [materialized_postgresql_max_block_size](../../operations/settings/settings.md#materialized-postgresql-max-block-size)
- [materialized_postgresql_replication_slot](../../operations/settings/settings.md#materialized-postgresql-replication-slot)
- [materialized_postgresql_snapshot](../../operations/settings/settings.md#materialized-postgresql-snapshot)
@ -52,8 +56,7 @@ DETACH TABLE postgres_database.table_to_remove;
``` sql
CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_max_block_size = 65536,
materialized_postgresql_tables_list = 'table1,table2,table3';
SETTINGS materialized_postgresql_tables_list = 'table1,table2,table3';
SELECT * FROM database1.table1;
```
@ -64,6 +67,55 @@ The settings can be changed, if necessary, using a DDL query. But it is impossib
ALTER DATABASE postgres_database MODIFY SETTING materialized_postgresql_max_block_size = <new_size>;
```
## PostgreSQL schema {#schema}
PostgreSQL [schema](https://www.postgresql.org/docs/9.1/ddl-schemas.html) can be used in two ways.
1. One schema for one `MaterializedPostgreSQL` database engine. Requires to use setting `materialized_postgresql_schema`.
Tables are accessed via table name only:
``` sql
CREATE DATABASE postgres_database
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_schema = 'postgres_schema';
SELECT * FROM postgres_database.table1;
```
2. Any number of schemas with specified set of tables for one `MaterializedPostgreSQL` database engine. Requires to use setting `materialized_postgresql_tables_list`. Each table is written along with its schema.
Tables are accessed via schema name and table name at the same time:
``` sql
CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_tables_list = 'schema1.table1,schema2.table2,schema1.table3';
materialized_postgresql_tables_list_with_schema = 1;
SELECT * FROM database1.`schema1.table1`;
SELECT * FROM database1.`schema2.table2`;
```
But in this case all tables in `materialized_postgresql_tables_list` must be written with its schema name.
Requires `materialized_postgresql_tables_list_with_schema = 1`.
Warning: for this case dots in table name are not allowed.
3. Any number of schemas with full set of tables for one `MaterializedPostgreSQL` database engine. Requires to use setting `materialized_postgresql_schema_list`.
``` sql
CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_schema_list = 'schema1,schema2,schema3';
SELECT * FROM database1.`schema1.table1`;
SELECT * FROM database1.`schema1.table2`;
SELECT * FROM database1.`schema2.table2`;
```
Warning: for this case dots in table name are not allowed.
## Requirements {#requirements}
1. The [wal_level](https://www.postgresql.org/docs/current/runtime-config-wal.html) setting must have a value `logical` and `max_replication_slots` parameter must have a value at least `2` in the PostgreSQL config file.

View File

@ -3691,6 +3691,14 @@ Sets a comma-separated list of PostgreSQL database tables, which will be replica
Default value: empty list — means whole PostgreSQL database will be replicated.
## materialized_postgresql_schema {#materialized-postgresql-schema}
Default value: empty string. (Default schema is used)
## materialized_postgresql_schema_list {#materialized-postgresql-schema-list}
Default value: empty list. (Default schema is used)
## materialized_postgresql_allow_automatic_update {#materialized-postgresql-allow-automatic-update}
Allows reloading table in the background, when schema changes are detected. DDL queries on the PostgreSQL side are not replicated via ClickHouse [MaterializedPostgreSQL](../../engines/database-engines/materialized-postgresql.md) engine, because it is not allowed with PostgreSQL logical replication protocol, but the fact of DDL changes is detected transactionally. In this case, the default behaviour is to stop replicating those tables once DDL is detected. However, if this setting is enabled, then, instead of stopping the replication of those tables, they will be reloaded in the background via database snapshot without data losses and replication will continue for them.

View File

@ -78,7 +78,7 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
}
if (tables_to_replicate.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty list of tables to replicate");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Got empty list of tables to replicate");
for (const auto & table_name : tables_to_replicate)
{

View File

@ -30,13 +30,34 @@ namespace ErrorCodes
template<typename T>
std::set<String> fetchPostgreSQLTablesList(T & tx, const String & postgres_schema)
{
std::set<String> tables;
std::string query = fmt::format("SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND {}",
postgres_schema.empty() ? "schemaname != 'information_schema'" : "schemaname = " + quoteString(postgres_schema));
Names schemas;
boost::split(schemas, postgres_schema, [](char c){ return c == ','; });
for (String & key : schemas)
boost::trim(key);
for (auto table_name : tx.template stream<std::string>(query))
tables.insert(std::get<0>(table_name));
std::set<std::string> tables;
if (schemas.size() <= 1)
{
std::string query = fmt::format("SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND {}",
postgres_schema.empty() ? "schemaname != 'information_schema'" : "schemaname = " + quoteString(postgres_schema));
for (auto table_name : tx.template stream<std::string>(query))
tables.insert(std::get<0>(table_name));
return tables;
}
/// We add schema to table name only in case of multiple schemas for the whole database engine.
/// Because there is no need to add it if there is only one schema.
/// If we add schema to table name then table can be accessed only this way: database_name.`schema_name.table_name`
for (const auto & schema : schemas)
{
std::string query = fmt::format("SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND {}",
postgres_schema.empty() ? "schemaname != 'information_schema'" : "schemaname = " + quoteString(schema));
for (auto table_name : tx.template stream<std::string>(query))
tables.insert(schema + '.' + std::get<0>(table_name));
}
return tables;
}
@ -308,7 +329,6 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::nontransaction & tx, const String & postgres_table, const String & postrges_schema,
bool use_nulls, bool with_primary_key, bool with_replica_identity_index);
template
std::set<String> fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema);
template

View File

@ -19,6 +19,7 @@ struct PostgreSQLTableStructure
using PostgreSQLTableStructurePtr = std::unique_ptr<PostgreSQLTableStructure>;
/// We need order for materialized version.
std::set<String> fetchPostgreSQLTablesList(pqxx::connection & connection, const String & postgres_schema);
PostgreSQLTableStructure fetchPostgreSQLTableStructure(

View File

@ -27,6 +27,7 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
const std::string & publication_name_,
const std::string & start_lsn,
const size_t max_block_size_,
bool schema_as_a_part_of_table_name_,
bool allow_automatic_update_,
Storages storages_,
const String & name_for_logger)
@ -38,6 +39,7 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
, current_lsn(start_lsn)
, lsn_value(getLSNValue(start_lsn))
, max_block_size(max_block_size_)
, schema_as_a_part_of_table_name(schema_as_a_part_of_table_name_)
, allow_automatic_update(allow_automatic_update_)
, storages(storages_)
{
@ -274,7 +276,9 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
{
Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
assert(!table_name.empty());
/// FIXME:If table name is empty here, it means we failed to load it, but it was included in publication. Need to remove?
if (table_name.empty())
LOG_WARNING(log, "No table mapping for relation id: {}. Probably table failed to be loaded", relation_id);
if (!isSyncAllowed(relation_id, table_name))
return;
@ -292,7 +296,9 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
{
Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
assert(!table_name.empty());
/// FIXME:If table name is empty here, it means we failed to load it, but it was included in publication. Need to remove?
if (table_name.empty())
LOG_WARNING(log, "No table mapping for relation id: {}. Probably table failed to be loaded", relation_id);
if (!isSyncAllowed(relation_id, table_name))
return;
@ -341,7 +347,9 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
{
Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
assert(!table_name.empty());
/// FIXME:If table name is empty here, it means we failed to load it, but it was included in publication. Need to remove?
if (table_name.empty())
LOG_WARNING(log, "No table mapping for relation id: {}. Probably table failed to be loaded", relation_id);
if (!isSyncAllowed(relation_id, table_name))
return;
@ -375,19 +383,27 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
readString(replication_message, pos, size, relation_namespace);
readString(replication_message, pos, size, relation_name);
String table_name;
if (!relation_namespace.empty() && schema_as_a_part_of_table_name)
table_name = relation_namespace + '.' + relation_name;
else
table_name = relation_name;
if (!isSyncAllowed(relation_id, relation_name))
return;
if (storages.find(relation_name) == storages.end())
if (storages.find(table_name) == storages.end())
{
markTableAsSkipped(relation_id, relation_name);
markTableAsSkipped(relation_id, table_name);
/// TODO: This can happen if we created a publication with this table but then got an exception that this
/// table has primary key or something else.
LOG_ERROR(log,
"Storage for table {} does not exist, but is included in replication stream. (Storages number: {})",
relation_name, storages.size());
table_name, storages.size());
return;
}
assert(buffers.count(relation_name));
assert(buffers.contains(table_name));
/// 'd' - default (primary key if any)
@ -401,7 +417,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
{
LOG_WARNING(log,
"Table has replica identity {} - not supported. A table must have a primary key or a replica identity index");
markTableAsSkipped(relation_id, relation_name);
markTableAsSkipped(relation_id, table_name);
return;
}
@ -413,7 +429,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
bool new_relation_definition = false;
if (schema_data.find(relation_id) == schema_data.end())
{
relation_id_to_name[relation_id] = relation_name;
relation_id_to_name[relation_id] = table_name;
schema_data.emplace(relation_id, SchemaData(num_columns));
new_relation_definition = true;
}
@ -422,7 +438,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
if (current_schema_data.number_of_columns != num_columns)
{
markTableAsSkipped(relation_id, relation_name);
markTableAsSkipped(relation_id, table_name);
return;
}
@ -444,13 +460,13 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
if (current_schema_data.column_identifiers[i].first != data_type_id
|| current_schema_data.column_identifiers[i].second != type_modifier)
{
markTableAsSkipped(relation_id, relation_name);
markTableAsSkipped(relation_id, table_name);
return;
}
}
}
tables_to_sync.insert(relation_name);
tables_to_sync.insert(table_name);
break;
}
@ -774,7 +790,6 @@ bool MaterializedPostgreSQLConsumer::consume(std::vector<std::pair<Int32, String
/// false: no data was read, reschedule.
/// true: some data was read, schedule as soon as possible.
auto read_next = readFromReplicationSlot();
LOG_TRACE(log, "LSN: {}", final_lsn);
return read_next;
}

View File

@ -26,6 +26,7 @@ public:
const String & publication_name_,
const String & start_lsn,
const size_t max_block_size_,
bool schema_as_a_part_of_table_name_,
bool allow_automatic_update_,
Storages storages_,
const String & name_for_logger);
@ -113,6 +114,9 @@ private:
UInt64 lsn_value;
size_t max_block_size;
bool schema_as_a_part_of_table_name;
bool allow_automatic_update;
String table_to_insert;

View File

@ -14,9 +14,14 @@ namespace DB
#define LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS(M) \
M(UInt64, materialized_postgresql_max_block_size, 65536, "Number of row collected before flushing data into table.", 0) \
M(String, materialized_postgresql_tables_list, "", "List of tables for MaterializedPostgreSQL database engine", 0) \
M(String, materialized_postgresql_schema_list, "", "List of schemas for MaterializedPostgreSQL database engine", 0) \
M(Bool, materialized_postgresql_allow_automatic_update, false, "Allow to reload table in the background, when schema changes are detected", 0) \
M(String, materialized_postgresql_replication_slot, "", "A user-created replication slot", 0) \
M(String, materialized_postgresql_snapshot, "", "User provided snapshot in case he manages replication slots himself", 0) \
M(String, materialized_postgresql_schema, "", "PostgreSQL schema", 0) \
M(Bool, materialized_postgresql_tables_list_with_schema, false, \
"Consider by default that if there is a dot in tables list 'name.name', " \
"then the first name is postgres schema and second is postgres table. This setting is needed to allow table names with dots", 0) \
DECLARE_SETTINGS_TRAITS(MaterializedPostgreSQLSettingsTraits, LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS)

View File

@ -28,7 +28,7 @@ namespace ErrorCodes
PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const String & replication_identifier,
const String & remote_database_name_,
const String & postgres_database_,
const String & current_database_name_,
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
@ -38,16 +38,25 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
: log(&Poco::Logger::get("PostgreSQLReplicationHandler"))
, context(context_)
, is_attach(is_attach_)
, remote_database_name(remote_database_name_)
, postgres_database(postgres_database_)
, postgres_schema(replication_settings.materialized_postgresql_schema)
, current_database_name(current_database_name_)
, connection_info(connection_info_)
, max_block_size(replication_settings.materialized_postgresql_max_block_size)
, allow_automatic_update(replication_settings.materialized_postgresql_allow_automatic_update)
, is_materialized_postgresql_database(is_materialized_postgresql_database_)
, tables_list(replication_settings.materialized_postgresql_tables_list)
, schema_list(replication_settings.materialized_postgresql_schema_list)
, schema_as_a_part_of_table_name(!schema_list.empty() || replication_settings.materialized_postgresql_tables_list_with_schema)
, user_provided_snapshot(replication_settings.materialized_postgresql_snapshot)
, milliseconds_to_wait(RESCHEDULE_MS)
{
if (!schema_list.empty() && !tables_list.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot have schema list and tables list at the same time");
if (!schema_list.empty() && !postgres_schema.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot have schema list and common schema at the same time");
replication_slot = replication_settings.materialized_postgresql_replication_slot;
if (replication_slot.empty())
{
@ -56,7 +65,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
}
publication_name = fmt::format("{}_ch_publication", replication_identifier);
startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ waitConnectionAndStart(); });
startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ checkConnectionAndStart(); });
consumer_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
}
@ -69,26 +78,65 @@ void PostgreSQLReplicationHandler::addStorage(const std::string & table_name, St
void PostgreSQLReplicationHandler::startup()
{
/// We load tables in a separate thread, because this database is not created yet.
/// (will get "database is currently dropped or renamed")
startup_task->activateAndSchedule();
}
void PostgreSQLReplicationHandler::waitConnectionAndStart()
std::pair<String, String> PostgreSQLReplicationHandler::getSchemaAndTableName(const String & table_name) const
{
/// !schema_list.empty() -- We replicate all tables from specifies schemas.
/// In this case when tables list is fetched, we append schema with dot. But without quotes.
/// If there is a setting `tables_list`, then table names can be put there along with schema,
/// separated by dot and with no quotes. We add double quotes in this case.
if (!postgres_schema.empty())
return std::make_pair(postgres_schema, table_name);
if (auto pos = table_name.find('.'); schema_as_a_part_of_table_name && pos != std::string::npos)
return std::make_pair(table_name.substr(0, pos), table_name.substr(pos + 1));
return std::make_pair("", table_name);
}
String PostgreSQLReplicationHandler::doubleQuoteWithSchema(const String & table_name) const
{
auto [schema, table] = getSchemaAndTableName(table_name);
if (schema.empty())
return doubleQuoteString(table);
return doubleQuoteString(schema) + '.' + doubleQuoteString(table);
}
void PostgreSQLReplicationHandler::checkConnectionAndStart()
{
try
{
postgres::Connection connection(connection_info);
connection.connect(); /// Will throw pqxx::broken_connection if no connection at the moment
startSynchronization(false);
startSynchronization(is_attach);
}
catch (const pqxx::broken_connection & pqxx_error)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
if (!is_attach)
throw;
LOG_ERROR(log, "Unable to set up connection. Reconnection attempt will continue. Error message: {}", pqxx_error.what());
startup_task->scheduleAfter(RESCHEDULE_MS);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
if (!is_attach)
throw;
}
}
@ -124,7 +172,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
auto initial_sync = [&]()
{
LOG_TRACE(log, "Starting tables sync load");
LOG_DEBUG(log, "Starting tables sync load");
if (user_managed_slot)
{
@ -147,7 +195,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
}
catch (Exception & e)
{
e.addMessage("while loading table {}.{}", remote_database_name, table_name);
e.addMessage("while loading table `{}`.`{}`", postgres_database, table_name);
tryLogCurrentException(__PRETTY_FUNCTION__);
/// Throw in case of single MaterializedPostgreSQL storage, because initial setup is done immediately
@ -185,21 +233,18 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
try
{
/// FIXME: Looks like it is possible we might get here if there is no nested storage or at least nested storage id field might be empty.
/// Caught it somehow when doing something else incorrectly, but do not see any reason how it could happen.
/// Try load nested table, set materialized table metadata.
nested_storages[table_name] = materialized_storage->prepare();
nested_storages[table_name] = materialized_storage->getNested();
}
catch (Exception & e)
{
e.addMessage("while loading table {}.{}", remote_database_name, table_name);
e.addMessage("while loading table {}.{}", postgres_database, table_name);
tryLogCurrentException(__PRETTY_FUNCTION__);
if (throw_on_error)
throw;
}
}
LOG_TRACE(log, "Loaded {} tables", nested_storages.size());
LOG_DEBUG(log, "Loaded {} tables", nested_storages.size());
}
tx.commit();
@ -215,9 +260,10 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
publication_name,
start_lsn,
max_block_size,
schema_as_a_part_of_table_name,
allow_automatic_update,
nested_storages,
(is_materialized_postgresql_database ? remote_database_name : remote_database_name + '.' + tables_list));
(is_materialized_postgresql_database ? postgres_database : postgres_database + '.' + tables_list));
consumer_task->activateAndSchedule();
@ -247,7 +293,9 @@ StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection &
/// Load from snapshot, which will show table state before creation of replication slot.
/// Already connected to needed database, no need to add it to query.
query_str = fmt::format("SELECT * FROM {}", doubleQuoteString(table_name));
auto quoted_name = doubleQuoteWithSchema(table_name);
query_str = fmt::format("SELECT * FROM {}", quoted_name);
LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name);
materialized_storage->createNestedIfNeeded(fetchTableStructure(*tx, table_name));
auto nested_storage = materialized_storage->getNested();
@ -270,9 +318,9 @@ StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection &
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
nested_storage = materialized_storage->prepare();
materialized_storage->set(nested_storage);
auto nested_table_id = nested_storage->getStorageID();
LOG_TRACE(log, "Loaded table {}.{} (uuid: {})", nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid));
LOG_DEBUG(log, "Loaded table {}.{} (uuid: {})", nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid));
return nested_storage;
}
@ -284,6 +332,7 @@ void PostgreSQLReplicationHandler::consumerFunc()
bool schedule_now = consumer->consume(skipped_tables);
LOG_DEBUG(log, "checking for skipped tables: {}", skipped_tables.size());
if (!skipped_tables.empty())
{
try
@ -298,7 +347,7 @@ void PostgreSQLReplicationHandler::consumerFunc()
if (stop_synchronization)
{
LOG_TRACE(log, "Replication thread is stopped");
LOG_DEBUG(log, "Replication thread is stopped");
return;
}
@ -315,7 +364,7 @@ void PostgreSQLReplicationHandler::consumerFunc()
if (milliseconds_to_wait < BACKOFF_TRESHOLD_MS)
milliseconds_to_wait *= 2;
LOG_TRACE(log, "Scheduling replication thread: after {} ms", milliseconds_to_wait);
LOG_DEBUG(log, "Scheduling replication thread: after {} ms", milliseconds_to_wait);
}
}
@ -347,12 +396,17 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransactio
{
if (tables_list.empty())
{
if (materialized_storages.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No tables to replicate");
WriteBufferFromOwnString buf;
for (const auto & storage_data : materialized_storages)
{
if (!tables_list.empty())
tables_list += ", ";
tables_list += doubleQuoteString(storage_data.first);
buf << doubleQuoteWithSchema(storage_data.first);
buf << ",";
}
tables_list = buf.str();
tables_list.resize(tables_list.size() - 1);
}
if (tables_list.empty())
@ -363,7 +417,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransactio
try
{
tx.exec(query_str);
LOG_TRACE(log, "Created publication {} with tables list: {}", publication_name, tables_list);
LOG_DEBUG(log, "Created publication {} with tables list: {}", publication_name, tables_list);
}
catch (Exception & e)
{
@ -373,7 +427,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransactio
}
else
{
LOG_TRACE(log, "Using existing publication ({}) version", publication_name);
LOG_DEBUG(log, "Using existing publication ({}) version", publication_name);
}
}
@ -395,7 +449,7 @@ bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction &
start_lsn = result[0][2].as<std::string>();
LOG_TRACE(log, "Replication slot {} already exists (active: {}). Restart lsn position: {}, confirmed flush lsn: {}",
LOG_DEBUG(log, "Replication slot {} already exists (active: {}). Restart lsn position: {}, confirmed flush lsn: {}",
slot_name, result[0][0].as<bool>(), result[0][1].as<std::string>(), start_lsn);
return true;
@ -443,7 +497,7 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(pqxx::nontransaction & tx
std::string query_str = fmt::format("SELECT pg_drop_replication_slot('{}')", slot_name);
tx.exec(query_str);
LOG_TRACE(log, "Dropped replication slot: {}", slot_name);
LOG_DEBUG(log, "Dropped replication slot: {}", slot_name);
}
@ -451,7 +505,7 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
{
std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name);
tx.exec(query_str);
LOG_TRACE(log, "Dropped publication: {}", publication_name);
LOG_DEBUG(log, "Dropped publication: {}", publication_name);
}
@ -537,6 +591,8 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
boost::trim(table_name);
}
/// Try to fetch tables list from publication if there is not tables list.
/// If there is a tables list -- check that lists are consistent and if not -- remove publication, it will be recreated.
if (publication_exists_before_startup)
{
if (!is_attach)
@ -557,7 +613,7 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
{
pqxx::nontransaction tx(connection.getRef());
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
result_tables = fetchPostgreSQLTablesList(tx, schema_list.empty() ? postgres_schema : schema_list);
}
}
/// Check tables list from publication is the same as expected tables list.
@ -625,22 +681,48 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
/// startSynchronization method.
{
pqxx::nontransaction tx(connection.getRef());
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
result_tables = fetchPostgreSQLTablesList(tx, schema_list.empty() ? postgres_schema : schema_list);
}
}
}
/// `schema1.table1, schema2.table2, ...` -> `"schema1"."table1", "schema2"."table2", ...`
/// or
/// `table1, table2, ...` + setting `schema` -> `"schema"."table1", "schema"."table2", ...`
if (!tables_list.empty())
{
Strings tables_names;
splitInto<','>(tables_names, tables_list);
if (tables_names.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Empty list of tables");
WriteBufferFromOwnString buf;
for (auto & table_name : tables_names)
{
boost::trim(table_name);
buf << doubleQuoteWithSchema(table_name);
buf << ",";
}
tables_list = buf.str();
tables_list.resize(tables_list.size() - 1);
}
/// Also we make sure that queries in postgres always use quoted version "table_schema"."table_name".
/// But tables in ClickHouse in case of multi-schame database are never double-quoted.
/// It is ok, because they are accessed with backticks: postgres_database.`table_schema.table_name`.
/// We do quote tables_list table AFTER collected expected_tables, because expected_tables are future clickhouse tables.
return result_tables;
}
std::set<String> PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::work & tx)
{
std::string query = fmt::format("SELECT tablename FROM pg_publication_tables WHERE pubname = '{}'", publication_name);
std::string query = fmt::format("SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = '{}'", publication_name);
std::set<String> tables;
for (auto table_name : tx.stream<std::string>(query))
tables.insert(std::get<0>(table_name));
for (const auto & [schema, table] : tx.stream<std::string, std::string>(query))
tables.insert(schema_as_a_part_of_table_name ? schema + '.' + table : table);
return tables;
}
@ -655,7 +737,8 @@ PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
PostgreSQLTableStructure structure;
try
{
structure = fetchPostgreSQLTableStructure(tx, table_name, postgres_schema, true, true, true);
auto [schema, table] = getSchemaAndTableName(table_name);
structure = fetchPostgreSQLTableStructure(tx, table, schema, true, true, true);
}
catch (...)
{
@ -689,13 +772,9 @@ void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPost
if (!nested)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Internal table was not created");
{
postgres::Connection tmp_connection(connection_info);
nested_storage = loadFromSnapshot(tmp_connection, snapshot_name, postgres_table_name, materialized_storage);
}
auto nested_table_id = nested_storage->getStorageID();
materialized_storage->setNestedStorageID(nested_table_id);
nested_storage = materialized_storage->prepare();
postgres::Connection tmp_connection(connection_info);
nested_storage = loadFromSnapshot(tmp_connection, snapshot_name, postgres_table_name, materialized_storage);
materialized_storage->set(nested_storage);
}
{
@ -771,6 +850,7 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
{
auto storage = DatabaseCatalog::instance().getTable(StorageID(current_database_name, table_name), context);
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
auto materialized_table_lock = materialized_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
/// If for some reason this temporary table already exists - also drop it.
auto temp_materialized_storage = materialized_storage->createTemporary();
@ -782,8 +862,8 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
auto table_id = materialized_storage->getNestedStorageID();
auto temp_table_id = temp_nested_storage->getStorageID();
LOG_TRACE(log, "Starting background update of table {} with table {}",
table_id.getNameForLogs(), temp_table_id.getNameForLogs());
LOG_DEBUG(log, "Starting background update of table {} ({} with {})",
table_name, table_id.getNameForLogs(), temp_table_id.getNameForLogs());
auto ast_rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Element elem
@ -798,34 +878,30 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
try
{
auto materialized_table_lock = materialized_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
InterpreterRenameQuery(ast_rename, nested_context).execute();
{
auto nested_storage = DatabaseCatalog::instance().getTable(StorageID(table_id.database_name, table_id.table_name),
nested_context);
auto nested_table_lock = nested_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
auto nested_table_id = nested_storage->getStorageID();
auto nested_storage = DatabaseCatalog::instance().getTable(StorageID(table_id.database_name, table_id.table_name, temp_table_id.uuid), nested_context);
materialized_storage->set(nested_storage);
materialized_storage->setNestedStorageID(nested_table_id);
nested_storage = materialized_storage->prepare();
auto nested_sample_block = nested_storage->getInMemoryMetadataPtr()->getSampleBlock();
auto materialized_sample_block = materialized_storage->getInMemoryMetadataPtr()->getSampleBlock();
assertBlocksHaveEqualStructure(nested_sample_block, materialized_sample_block, "while reloading table in the background");
auto nested_storage_metadata = nested_storage->getInMemoryMetadataPtr();
auto nested_sample_block = nested_storage_metadata->getSampleBlock();
LOG_TRACE(log, "Updated table {}. New structure: {}",
nested_table_id.getNameForLogs(), nested_sample_block.dumpStructure());
LOG_INFO(log, "Updated table {}. New structure: {}",
nested_storage->getStorageID().getNameForLogs(), nested_sample_block.dumpStructure());
auto materialized_storage_metadata = nested_storage->getInMemoryMetadataPtr();
auto materialized_sample_block = materialized_storage_metadata->getSampleBlock();
/// Pass pointer to new nested table into replication consumer, remove current table from skip list and set start lsn position.
consumer->updateNested(table_name, nested_storage, relation_id, start_lsn);
assertBlocksHaveEqualStructure(nested_sample_block, materialized_sample_block, "while reloading table in the background");
auto table_to_drop = DatabaseCatalog::instance().getTable(StorageID(temp_table_id.database_name, temp_table_id.table_name, table_id.uuid), nested_context);
auto drop_table_id = table_to_drop->getStorageID();
/// Pass pointer to new nested table into replication consumer, remove current table from skip list and set start lsn position.
consumer->updateNested(table_name, nested_storage, relation_id, start_lsn);
}
if (drop_table_id == nested_storage->getStorageID())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot drop table because is has the same uuid as new table: {}", drop_table_id.getNameForLogs());
LOG_DEBUG(log, "Dropping table {}", temp_table_id.getNameForLogs());
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, nested_context, nested_context, temp_table_id, true);
LOG_DEBUG(log, "Dropping table {}", drop_table_id.getNameForLogs());
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, nested_context, nested_context, drop_table_id, true);
}
catch (...)
{

View File

@ -18,7 +18,7 @@ class PostgreSQLReplicationHandler
public:
PostgreSQLReplicationHandler(
const String & replication_identifier,
const String & remote_database_name_,
const String & postgres_database_,
const String & current_database_name_,
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
@ -79,7 +79,7 @@ private:
/// Methods to manage replication.
void waitConnectionAndStart();
void checkConnectionAndStart();
void consumerFunc();
@ -89,13 +89,19 @@ private:
PostgreSQLTableStructurePtr fetchTableStructure(pqxx::ReplicationTransaction & tx, const String & table_name) const;
String doubleQuoteWithSchema(const String & table_name) const;
std::pair<String, String> getSchemaAndTableName(const String & table_name) const;
Poco::Logger * log;
ContextPtr context;
/// If it is not attach, i.e. a create query, then if publication already exists - always drop it.
bool is_attach;
const String remote_database_name, current_database_name;
String postgres_database;
String postgres_schema;
String current_database_name;
/// Connection string and address for logs.
postgres::ConnectionInfo connection_info;
@ -113,6 +119,12 @@ private:
/// A coma-separated list of tables, which are going to be replicated for database engine. By default, a whole database is replicated.
String tables_list;
String schema_list;
/// Schema can be as a part of table name, i.e. as a clickhouse table it is accessed like db.`schema.table`.
/// This is possible to allow replicating tables from multiple schemas in the same MaterializedPostgreSQL database engine.
mutable bool schema_as_a_part_of_table_name = false;
bool user_managed_slot = true;
String user_provided_snapshot;
@ -129,8 +141,6 @@ private:
MaterializedStorages materialized_storages;
UInt64 milliseconds_to_wait;
String postgres_schema;
};
}

View File

@ -115,7 +115,7 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
ContextPtr context_,
const String & postgres_database_name,
const String & postgres_table_name)
: IStorage(nested_storage_->getStorageID())
: IStorage(StorageID(nested_storage_->getStorageID().database_name, nested_storage_->getStorageID().table_name))
, WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("StorageMaterializedPostgreSQL(" + postgres::formatNameForLogs(postgres_database_name, postgres_table_name) + ")"))
, is_materialized_postgresql_database(true)
@ -216,12 +216,11 @@ std::shared_ptr<Context> StorageMaterializedPostgreSQL::makeNestedTableContext(C
}
StoragePtr StorageMaterializedPostgreSQL::prepare()
void StorageMaterializedPostgreSQL::set(StoragePtr nested_storage)
{
auto nested_table = getNested();
setInMemoryMetadata(nested_table->getInMemoryMetadata());
nested_table_id = nested_storage->getStorageID();
setInMemoryMetadata(nested_storage->getInMemoryMetadata());
has_nested.store(true);
return nested_table;
}

View File

@ -19,6 +19,11 @@
namespace DB
{
/** TODO list:
* - Actually I think we can support ddl even though logical replication does not fully support it.
* But some basic ddl like adding/dropping columns, changing column type, column names -- is manageable.
*/
/** Case of single MaterializedPostgreSQL table engine.
*
* A user creates a table with engine MaterializedPostgreSQL. Order by expression must be specified (needed for
@ -109,14 +114,10 @@ public:
StorageID getNestedStorageID() const;
void setNestedStorageID(const StorageID & id) { nested_table_id.emplace(id); }
void set(StoragePtr nested_storage);
static std::shared_ptr<Context> makeNestedTableContext(ContextPtr from_context);
/// Get nested table (or throw if it does not exist), set in-memory metadata (taken from nested table)
/// for current table, set has_nested = true.
StoragePtr prepare();
bool supportsFinal() const override { return true; }
ASTPtr getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure);

View File

@ -30,6 +30,10 @@ postgres_table_template_3 = """
CREATE TABLE IF NOT EXISTS "{}" (
key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL)
"""
postgres_table_template_4 = """
CREATE TABLE IF NOT EXISTS "{}"."{}" (
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False):
if database == True:
@ -65,13 +69,26 @@ def create_postgres_db(cursor, name='postgres_database'):
def drop_postgres_db(cursor, name='postgres_database'):
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
def create_clickhouse_postgres_db(ip, port, name='postgres_database'):
instance.query('''
CREATE DATABASE {}
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')'''.format(name, ip, port, name))
def drop_postgres_schema(cursor, schema_name):
cursor.execute('DROP SCHEMA IF EXISTS {} CASCADE'.format(schema_name))
def create_postgres_schema(cursor, schema_name):
drop_postgres_schema(cursor, schema_name)
cursor.execute('CREATE SCHEMA {}'.format(schema_name))
def create_clickhouse_postgres_db(ip, port, name='postgres_database', database_name='postgres_database', schema_name=''):
drop_clickhouse_postgres_db(name)
if len(schema_name) == 0:
instance.query('''
CREATE DATABASE {}
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')'''.format(name, ip, port, database_name))
else:
instance.query('''
CREATE DATABASE {}
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword', '{}')'''.format(name, ip, port, database_name, schema_name))
def drop_clickhouse_postgres_db(name='postgres_database'):
instance.query('DROP DATABASE {}'.format(name))
instance.query('DROP DATABASE IF EXISTS {}'.format(name))
def create_materialized_db(ip, port,
materialized_database='test_database',
@ -95,12 +112,19 @@ def drop_materialized_db(materialized_database='test_database'):
def drop_postgres_table(cursor, table_name):
cursor.execute("""DROP TABLE IF EXISTS "{}" """.format(table_name))
def drop_postgres_table_with_schema(cursor, schema_name, table_name):
cursor.execute("""DROP TABLE IF EXISTS "{}"."{}" """.format(schema_name, table_name))
def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template):
drop_postgres_table(cursor, table_name)
cursor.execute(template.format(table_name))
if replica_identity_full:
cursor.execute('ALTER TABLE {} REPLICA IDENTITY FULL;'.format(table_name))
def create_postgres_table_with_schema(cursor, schema_name, table_name):
drop_postgres_table_with_schema(cursor, schema_name, table_name)
cursor.execute(postgres_table_template_4.format(schema_name, table_name))
queries = [
'INSERT INTO postgresql_replica_{} select i, i from generate_series(0, 10000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;',
@ -124,32 +148,44 @@ queries = [
]
def assert_nested_table_is_created(table_name, materialized_database='test_database'):
def assert_nested_table_is_created(table_name, materialized_database='test_database', schema_name=''):
if len(schema_name) == 0:
table = table_name
else:
table = schema_name + "." + table_name
print(f'Checking table {table} exists in {materialized_database}')
database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database))
while table_name not in database_tables:
while table not in database_tables:
time.sleep(0.2)
database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database))
assert(table_name in database_tables)
assert(table in database_tables)
@pytest.mark.timeout(320)
def assert_number_of_columns(expected, table_name, database_name='test_database'):
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
while (int(result) != expected):
time.sleep(1)
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
print('Number of columns ok')
@pytest.mark.timeout(320)
def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database'):
assert_nested_table_is_created(table_name, materialized_database)
def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''):
assert_nested_table_is_created(table_name, materialized_database, schema_name)
print("Checking table is synchronized:", table_name)
expected = instance.query('select * from {}.{} order by {};'.format(postgres_database, table_name, order_by))
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
if len(schema_name) == 0:
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
else:
result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by))
while result != expected:
time.sleep(0.5)
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
if len(schema_name) == 0:
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
else:
result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by))
assert(result == expected)
@ -436,9 +472,7 @@ def test_clickhouse_restart(started_cluster):
def test_replica_identity_index(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica', template=postgres_table_template_3);
@ -446,8 +480,7 @@ def test_replica_identity_index(started_cluster):
cursor.execute("ALTER TABLE postgresql_replica REPLICA IDENTITY USING INDEX idx")
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(50, 10)")
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(100, 10)")
check_tables_are_synchronized('postgresql_replica', order_by='key1');
@ -695,8 +728,8 @@ def test_multiple_databases(started_cluster):
cursor1 = conn1.cursor()
cursor2 = conn2.cursor()
create_clickhouse_postgres_db(cluster.postgres_ip, cluster.postgres_port, 'postgres_database_1')
create_clickhouse_postgres_db(cluster.postgres_ip, cluster.postgres_port, 'postgres_database_2')
create_clickhouse_postgres_db(cluster.postgres_ip, cluster.postgres_port, 'postgres_database_1', 'postgres_database_1')
create_clickhouse_postgres_db(cluster.postgres_ip, cluster.postgres_port, 'postgres_database_2', 'postgres_database_2')
cursors = [cursor1, cursor2]
for cursor_id in range(len(cursors)):
@ -995,160 +1028,6 @@ def test_user_managed_slots(started_cluster):
cursor.execute('DROP TABLE IF EXISTS test_table')
def test_add_new_table_to_replication(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
table_name = 'postgresql_replica_5'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(") # Check without ip
assert(result[-59:] == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n")
result = instance.query_and_get_error("ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables_list='tabl1'")
assert('Changing setting `materialized_postgresql_tables_list` is not allowed' in result)
result = instance.query_and_get_error("ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables='tabl1'")
assert('Database engine MaterializedPostgreSQL does not support setting' in result)
instance.query("ATTACH TABLE test_database.{}".format(table_name));
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\n")
check_tables_are_synchronized(table_name);
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format(table_name))
check_tables_are_synchronized(table_name);
result = instance.query_and_get_error("ATTACH TABLE test_database.{}".format(table_name));
assert('Table test_database.postgresql_replica_5 already exists' in result)
result = instance.query_and_get_error("ATTACH TABLE test_database.unknown_table");
assert('PostgreSQL table unknown_table does not exist' in result)
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-180:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5\\'\n")
table_name = 'postgresql_replica_6'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
instance.query("ATTACH TABLE test_database.{}".format(table_name));
instance.restart_clickhouse()
table_name = 'postgresql_replica_7'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
instance.query("ATTACH TABLE test_database.{}".format(table_name));
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-222:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5,postgresql_replica_6,postgresql_replica_7\\'\n")
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\npostgresql_replica_6\npostgresql_replica_7\n")
for i in range(NUM_TABLES + 3):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
for i in range(NUM_TABLES + 3):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
def test_remove_table_from_replication(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-59:] == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n")
table_name = 'postgresql_replica_4'
instance.query('DETACH TABLE test_database.{}'.format(table_name));
result = instance.query_and_get_error('SELECT * FROM test_database.{}'.format(table_name))
assert("doesn't exist" in result)
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\n")
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3\\'\n")
instance.query('ATTACH TABLE test_database.{}'.format(table_name));
check_tables_are_synchronized(table_name);
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-159:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
table_name = 'postgresql_replica_1'
instance.query('DETACH TABLE test_database.{}'.format(table_name));
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
for i in range(NUM_TABLES):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
def test_predefined_connection_configuration(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor.execute(f'DROP TABLE IF EXISTS test_table')
cursor.execute(f'CREATE TABLE test_table (key integer PRIMARY KEY, value integer)')
instance.query("CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1)")
check_tables_are_synchronized("test_table");
drop_materialized_db()
cursor.execute('DROP TABLE IF EXISTS test_table')
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -0,0 +1,29 @@
<clickhouse>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
<named_collections>
<postgres1>
<user>postgres</user>
<password>mysecretpassword</password>
<host>postgres1</host>
<port>5432</port>
<database>postgres_database</database>
<table>test_table</table>
</postgres1>
<postgres2>
<user>postgres</user>
<password>mysecretpassword</password>
<host>postgres1</host>
<port>1111</port>
<database>postgres_database</database>
<table>test_table</table>
</postgres2>
</named_collections>
</clickhouse>

View File

@ -0,0 +1,8 @@
<?xml version="1.0"?>
<clickhouse>
<profiles>
<default>
<allow_experimental_database_materialized_postgresql>1</allow_experimental_database_materialized_postgresql>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,559 @@
import pytest
import time
import psycopg2
import os.path as p
import random
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from helpers.test_tools import TSV
from random import randrange
import threading
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs = ['configs/log_conf.xml'],
user_configs = ['configs/users.xml'],
with_postgres=True, stay_alive=True)
postgres_table_template = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
postgres_table_template_2 = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value1 Integer, value2 Integer, value3 Integer, PRIMARY KEY(key))
"""
postgres_table_template_3 = """
CREATE TABLE IF NOT EXISTS "{}" (
key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL)
"""
postgres_table_template_4 = """
CREATE TABLE IF NOT EXISTS "{}"."{}" (
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False):
if database == True:
conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name)
else:
conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port)
if replication:
conn_string += " replication='database'"
conn = psycopg2.connect(conn_string)
if auto_commit:
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_replication_slot(conn, slot_name='user_slot'):
cursor = conn.cursor()
cursor.execute('CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT'.format(slot_name))
result = cursor.fetchall()
print(result[0][0]) # slot name
print(result[0][1]) # start lsn
print(result[0][2]) # snapshot
return result[0][2]
def drop_replication_slot(conn, slot_name='user_slot'):
cursor = conn.cursor()
cursor.execute("select pg_drop_replication_slot('{}')".format(slot_name))
def create_postgres_db(cursor, name='postgres_database'):
cursor.execute("CREATE DATABASE {}".format(name))
def drop_postgres_db(cursor, name='postgres_database'):
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
def drop_postgres_schema(cursor, schema_name):
cursor.execute('DROP SCHEMA IF EXISTS {} CASCADE'.format(schema_name))
def create_postgres_schema(cursor, schema_name):
drop_postgres_schema(cursor, schema_name)
cursor.execute('CREATE SCHEMA {}'.format(schema_name))
def create_clickhouse_postgres_db(ip, port, name='postgres_database', database_name='postgres_database', schema_name=''):
drop_clickhouse_postgres_db(name)
if len(schema_name) == 0:
instance.query('''
CREATE DATABASE {}
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')'''.format(name, ip, port, database_name))
else:
instance.query('''
CREATE DATABASE {}
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword', '{}')'''.format(name, ip, port, database_name, schema_name))
def drop_clickhouse_postgres_db(name='postgres_database'):
instance.query('DROP DATABASE IF EXISTS {}'.format(name))
def create_materialized_db(ip, port,
materialized_database='test_database',
postgres_database='postgres_database',
settings=[]):
instance.query(f"DROP DATABASE IF EXISTS {materialized_database}")
create_query = f"CREATE DATABASE {materialized_database} ENGINE = MaterializedPostgreSQL('{ip}:{port}', '{postgres_database}', 'postgres', 'mysecretpassword')"
if len(settings) > 0:
create_query += " SETTINGS "
for i in range(len(settings)):
if i != 0:
create_query += ', '
create_query += settings[i]
instance.query(create_query)
assert materialized_database in instance.query('SHOW DATABASES')
def drop_materialized_db(materialized_database='test_database'):
instance.query('DROP DATABASE IF EXISTS {}'.format(materialized_database))
assert materialized_database not in instance.query('SHOW DATABASES')
def drop_postgres_table(cursor, table_name):
cursor.execute("""DROP TABLE IF EXISTS "{}" """.format(table_name))
def drop_postgres_table_with_schema(cursor, schema_name, table_name):
cursor.execute("""DROP TABLE IF EXISTS "{}"."{}" """.format(schema_name, table_name))
def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template):
drop_postgres_table(cursor, table_name)
cursor.execute(template.format(table_name))
if replica_identity_full:
cursor.execute('ALTER TABLE {} REPLICA IDENTITY FULL;'.format(table_name))
def create_postgres_table_with_schema(cursor, schema_name, table_name):
drop_postgres_table_with_schema(cursor, schema_name, table_name)
cursor.execute(postgres_table_template_4.format(schema_name, table_name))
queries = [
'INSERT INTO postgresql_replica_{} select i, i from generate_series(0, 10000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;',
'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;',
"UPDATE postgresql_replica_{} SET key=key+20000 WHERE key%2=0",
'INSERT INTO postgresql_replica_{} select i, i from generate_series(40000, 50000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;',
'UPDATE postgresql_replica_{} SET value = value + 101 WHERE key % 2 = 1;',
"UPDATE postgresql_replica_{} SET key=key+80000 WHERE key%2=1",
'DELETE FROM postgresql_replica_{} WHERE value % 2 = 0;',
'UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key % 5 = 0;',
'INSERT INTO postgresql_replica_{} select i, i from generate_series(200000, 250000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE value % 3 = 0;',
'UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 = 0;',
"UPDATE postgresql_replica_{} SET key=key+500000 WHERE key%2=1",
'INSERT INTO postgresql_replica_{} select i, i from generate_series(1000000, 1050000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;',
"UPDATE postgresql_replica_{} SET key=key+10000000",
'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;',
'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;'
]
def assert_nested_table_is_created(table_name, materialized_database='test_database', schema_name=''):
if len(schema_name) == 0:
table = table_name
else:
table = schema_name + "." + table_name
print(f'Checking table {table} exists in {materialized_database}')
database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database))
while table not in database_tables:
time.sleep(0.2)
database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database))
assert(table in database_tables)
def assert_number_of_columns(expected, table_name, database_name='test_database'):
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
while (int(result) != expected):
time.sleep(1)
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
print('Number of columns ok')
@pytest.mark.timeout(320)
def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''):
assert_nested_table_is_created(table_name, materialized_database, schema_name)
print("Checking table is synchronized:", table_name)
expected = instance.query('select * from {}.{} order by {};'.format(postgres_database, table_name, order_by))
if len(schema_name) == 0:
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
else:
result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by))
while result != expected:
time.sleep(0.5)
if len(schema_name) == 0:
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
else:
result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by))
assert(result == expected)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
conn = get_postgres_conn(ip=cluster.postgres_ip, port=cluster.postgres_port)
cursor = conn.cursor()
create_postgres_db(cursor, 'postgres_database')
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port)
instance.query("DROP DATABASE IF EXISTS test_database")
yield cluster
finally:
cluster.shutdown()
def test_add_new_table_to_replication(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
table_name = 'postgresql_replica_5'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(") # Check without ip
assert(result[-59:] == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n")
result = instance.query_and_get_error("ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables_list='tabl1'")
assert('Changing setting `materialized_postgresql_tables_list` is not allowed' in result)
result = instance.query_and_get_error("ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables='tabl1'")
assert('Database engine MaterializedPostgreSQL does not support setting' in result)
instance.query("ATTACH TABLE test_database.{}".format(table_name));
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\n")
check_tables_are_synchronized(table_name);
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format(table_name))
check_tables_are_synchronized(table_name);
result = instance.query_and_get_error("ATTACH TABLE test_database.{}".format(table_name));
assert('Table test_database.postgresql_replica_5 already exists' in result)
result = instance.query_and_get_error("ATTACH TABLE test_database.unknown_table");
assert('PostgreSQL table unknown_table does not exist' in result)
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-180:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5\\'\n")
table_name = 'postgresql_replica_6'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
instance.query("ATTACH TABLE test_database.{}".format(table_name));
instance.restart_clickhouse()
table_name = 'postgresql_replica_7'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
instance.query("ATTACH TABLE test_database.{}".format(table_name));
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-222:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5,postgresql_replica_6,postgresql_replica_7\\'\n")
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\npostgresql_replica_6\npostgresql_replica_7\n")
for i in range(NUM_TABLES + 3):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
for i in range(NUM_TABLES + 3):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
def test_remove_table_from_replication(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-59:] == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n")
table_name = 'postgresql_replica_4'
instance.query('DETACH TABLE test_database.{}'.format(table_name));
result = instance.query_and_get_error('SELECT * FROM test_database.{}'.format(table_name))
assert("doesn't exist" in result)
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\n")
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3\\'\n")
instance.query('ATTACH TABLE test_database.{}'.format(table_name));
check_tables_are_synchronized(table_name);
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-159:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
table_name = 'postgresql_replica_1'
instance.query('DETACH TABLE test_database.{}'.format(table_name));
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
for i in range(NUM_TABLES):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
def test_predefined_connection_configuration(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor.execute(f'DROP TABLE IF EXISTS test_table')
cursor.execute(f'CREATE TABLE test_table (key integer PRIMARY KEY, value integer)')
instance.query("CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1)")
check_tables_are_synchronized("test_table");
drop_materialized_db()
cursor.execute('DROP TABLE IF EXISTS test_table')
insert_counter = 0
def test_database_with_single_non_default_schema(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
NUM_TABLES=5
schema_name = 'test_schema'
clickhouse_postgres_db = 'postgres_database_with_schema'
global insert_counter
insert_counter = 0
def insert_into_tables():
global insert_counter
clickhouse_postgres_db = 'postgres_database_with_schema'
for i in range(NUM_TABLES):
table_name = f'postgresql_replica_{i}'
instance.query(f"INSERT INTO {clickhouse_postgres_db}.{table_name} SELECT number, number from numbers(1000 * {insert_counter}, 1000)")
insert_counter += 1
def assert_show_tables(expected):
result = instance.query('SHOW TABLES FROM test_database')
assert(result == expected)
print('assert show tables Ok')
def check_all_tables_are_synchronized():
for i in range(NUM_TABLES):
print('checking table', i)
check_tables_are_synchronized("postgresql_replica_{}".format(i), postgres_database=clickhouse_postgres_db);
print('synchronization Ok')
create_postgres_schema(cursor, schema_name)
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
create_postgres_table_with_schema(cursor, schema_name, table_name);
insert_into_tables()
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_schema = '{schema_name}'", "materialized_postgresql_allow_automatic_update = 1"])
insert_into_tables()
check_all_tables_are_synchronized()
assert_show_tables("postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
instance.restart_clickhouse()
check_all_tables_are_synchronized()
assert_show_tables("postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
insert_into_tables()
check_all_tables_are_synchronized()
print('ALTER')
altered_table = random.randint(0, NUM_TABLES-1)
cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)")
assert_number_of_columns(3, f'postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", postgres_database=clickhouse_postgres_db);
drop_materialized_db()
def test_database_with_multiple_non_default_schemas_1(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
NUM_TABLES = 5
schema_name = 'test_schema'
clickhouse_postgres_db = 'postgres_database_with_schema'
publication_tables = ''
global insert_counter
insert_counter = 0
def insert_into_tables():
global insert_counter
clickhouse_postgres_db = 'postgres_database_with_schema'
for i in range(NUM_TABLES):
table_name = f'postgresql_replica_{i}'
instance.query(f"INSERT INTO {clickhouse_postgres_db}.{table_name} SELECT number, number from numbers(1000 * {insert_counter}, 1000)")
insert_counter += 1
def assert_show_tables(expected):
result = instance.query('SHOW TABLES FROM test_database')
assert(result == expected)
print('assert show tables Ok')
def check_all_tables_are_synchronized():
for i in range(NUM_TABLES):
print('checking table', i)
check_tables_are_synchronized("postgresql_replica_{}".format(i), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
print('synchronization Ok')
create_postgres_schema(cursor, schema_name)
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
create_postgres_table_with_schema(cursor, schema_name, table_name);
if publication_tables != '':
publication_tables += ', '
publication_tables += schema_name + '.' + table_name
insert_into_tables()
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_tables_list = '{publication_tables}'", "materialized_postgresql_tables_list_with_schema=1", "materialized_postgresql_allow_automatic_update = 1"])
check_all_tables_are_synchronized()
assert_show_tables("test_schema.postgresql_replica_0\ntest_schema.postgresql_replica_1\ntest_schema.postgresql_replica_2\ntest_schema.postgresql_replica_3\ntest_schema.postgresql_replica_4\n")
instance.restart_clickhouse()
check_all_tables_are_synchronized()
assert_show_tables("test_schema.postgresql_replica_0\ntest_schema.postgresql_replica_1\ntest_schema.postgresql_replica_2\ntest_schema.postgresql_replica_3\ntest_schema.postgresql_replica_4\n")
insert_into_tables()
check_all_tables_are_synchronized()
print('ALTER')
altered_table = random.randint(0, NUM_TABLES-1)
cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)")
assert_number_of_columns(3, f'{schema_name}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db);
drop_materialized_db()
def test_database_with_multiple_non_default_schemas_2(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
NUM_TABLES = 2
schemas_num = 2
schema_list = 'schema0, schema1'
global insert_counter
insert_counter = 0
def check_all_tables_are_synchronized():
for i in range(schemas_num):
schema_name = f'schema{i}'
clickhouse_postgres_db = f'clickhouse_postgres_db{i}'
for ti in range(NUM_TABLES):
table_name = f'postgresql_replica_{ti}'
print(f'checking table {schema_name}.{table_name}')
check_tables_are_synchronized(f'{table_name}', schema_name=schema_name, postgres_database=clickhouse_postgres_db);
print('synchronized Ok')
def insert_into_tables():
global insert_counter
for i in range(schemas_num):
clickhouse_postgres_db = f'clickhouse_postgres_db{i}'
for ti in range(NUM_TABLES):
table_name = f'postgresql_replica_{ti}'
instance.query(f'INSERT INTO {clickhouse_postgres_db}.{table_name} SELECT number, number from numbers(1000 * {insert_counter}, 1000)')
insert_counter += 1
def assert_show_tables(expected):
result = instance.query('SHOW TABLES FROM test_database')
assert(result == expected)
print('assert show tables Ok')
for i in range(schemas_num):
schema_name = f'schema{i}'
clickhouse_postgres_db = f'clickhouse_postgres_db{i}'
create_postgres_schema(cursor, schema_name)
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
for ti in range(NUM_TABLES):
table_name = f'postgresql_replica_{ti}'
create_postgres_table_with_schema(cursor, schema_name, table_name);
insert_into_tables()
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_schema_list = '{schema_list}'", "materialized_postgresql_allow_automatic_update = 1"])
check_all_tables_are_synchronized()
insert_into_tables()
assert_show_tables("schema0.postgresql_replica_0\nschema0.postgresql_replica_1\nschema1.postgresql_replica_0\nschema1.postgresql_replica_1\n")
instance.restart_clickhouse()
assert_show_tables("schema0.postgresql_replica_0\nschema0.postgresql_replica_1\nschema1.postgresql_replica_0\nschema1.postgresql_replica_1\n")
check_all_tables_are_synchronized()
insert_into_tables()
check_all_tables_are_synchronized()
print('ALTER')
altered_schema = random.randint(0, schemas_num-1)
altered_table = random.randint(0, NUM_TABLES-1)
cursor.execute(f"ALTER TABLE schema{altered_schema}.postgresql_replica_{altered_table} ADD COLUMN value2 integer")
instance.query(f"INSERT INTO clickhouse_postgres_db{altered_schema}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(1000 * {insert_counter}, 1000)")
assert_number_of_columns(3, f'schema{altered_schema}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db);
drop_materialized_db()
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()