diff --git a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp index 080069c3f44..cbedc98fc3d 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp @@ -52,6 +52,7 @@ DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL( void DatabaseMaterializePostgreSQL::startSynchronization() { replication_handler = std::make_unique( + /* replication_identifier */database_name, remote_database_name, database_name, connection_info, diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 412a38755e3..e42e70c50ef 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -22,6 +22,7 @@ namespace ErrorCodes } PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( + const String & replication_identifier, const String & remote_database_name_, const String & current_database_name_, const postgres::ConnectionInfo & connection_info_, @@ -41,8 +42,8 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( , tables_list(tables_list_) , connection(std::make_shared(connection_info_)) { - replication_slot = fmt::format("{}_ch_replication_slot", current_database_name); - publication_name = fmt::format("{}_ch_publication", current_database_name); + replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier); + publication_name = fmt::format("{}_ch_publication", replication_identifier); startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ waitConnectionAndStart(); }); consumer_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); }); @@ -402,11 +403,20 @@ void PostgreSQLReplicationHandler::shutdownFinal() pqxx::nontransaction tx(connection->getRef()); dropPublication(tx); String last_committed_lsn; - if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false)) - dropReplicationSlot(tx, /* temporary */false); - if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true)) - dropReplicationSlot(tx, /* temporary */true); - tx.commit(); + try + { + if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false)) + dropReplicationSlot(tx, /* temporary */false); + if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true)) + dropReplicationSlot(tx, /* temporary */true); + tx.commit(); + } + catch (Exception & e) + { + e.addMessage("while dropping replication slot: {}", replication_slot); + LOG_ERROR(log, "Failed to drop replication slot: {}. It must be dropped manually.", replication_slot); + throw; + } } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 61132d0c0fc..1f8d25ab32d 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -19,6 +19,7 @@ class PostgreSQLReplicationHandler { public: PostgreSQLReplicationHandler( + const String & replication_identifier, const String & remote_database_name_, const String & current_database_name_, const postgres::ConnectionInfo & connection_info_, diff --git a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp index 4a7c9655149..806b51bafba 100644 --- a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp +++ b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp @@ -63,7 +63,9 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL( setInMemoryMetadata(storage_metadata); + String replication_identifier = remote_database_name + "_" + remote_table_name_; replication_handler = std::make_unique( + replication_identifier, remote_database_name, table_id_.database_name, connection_info, @@ -351,11 +353,7 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt "No columns returned for table {}.{}", table_id.database_name, table_id.table_name); } - StorageInMemoryMetadata storage_metadata; - ordinary_columns_and_types = *table_structure->columns; - storage_metadata.setColumns(ColumnsDescription(ordinary_columns_and_types)); - setInMemoryMetadata(storage_metadata); if (!table_structure->primary_key_columns && !table_structure->replica_identity_columns) { @@ -406,6 +404,17 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt create_table_query->set(create_table_query->storage, storage); + /// Add columns _sign and _version, so that they can be accessed from nested ReplacingMergeTree table if needed. + /// TODO: add test for case of database engine, test same case after table reload. + ordinary_columns_and_types.push_back({"_sign", std::make_shared()}); + ordinary_columns_and_types.push_back({"_version", std::make_shared()}); + + StorageInMemoryMetadata metadata; + metadata.setColumns(ColumnsDescription(ordinary_columns_and_types)); + metadata.setConstraints(metadata_snapshot->getConstraints()); + + setInMemoryMetadata(metadata); + return create_table_query; } diff --git a/tests/integration/test_storage_postgresql_replica/test.py b/tests/integration/test_storage_postgresql_replica/test.py index 20d21008629..53eedbc8b7d 100644 --- a/tests/integration/test_storage_postgresql_replica/test.py +++ b/tests/integration/test_storage_postgresql_replica/test.py @@ -77,8 +77,9 @@ def test_initial_load_from_snapshot(started_cluster): create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) ENGINE = MaterializePostgreSQL( 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') PRIMARY KEY key; @@ -100,8 +101,9 @@ def test_no_connection_at_startup(started_cluster): create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) ENGINE = MaterializePostgreSQL( 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') PRIMARY KEY key; @@ -132,8 +134,9 @@ def test_detach_attach_is_ok(started_cluster): create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) ENGINE = MaterializePostgreSQL( 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') PRIMARY KEY key; @@ -167,8 +170,9 @@ def test_replicating_insert_queries(started_cluster): instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)") + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) ENGINE = MaterializePostgreSQL( 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') PRIMARY KEY key; @@ -208,8 +212,9 @@ def test_replicating_delete_queries(started_cluster): instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) ENGINE = MaterializePostgreSQL( 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') PRIMARY KEY key; @@ -246,8 +251,9 @@ def test_replicating_update_queries(started_cluster): instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number + 10 from numbers(50)") + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) ENGINE = MaterializePostgreSQL( 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') PRIMARY KEY key; @@ -276,8 +282,9 @@ def test_resume_from_written_version(started_cluster): create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number + 10 from numbers(50)") + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) ENGINE = MaterializePostgreSQL( 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') PRIMARY KEY key; @@ -318,12 +325,9 @@ def test_many_replication_messages(started_cluster): create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000)") + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') instance.query(''' - CREATE TABLE test.postgresql_replica ( - key UInt64, value UInt64, - _sign Int8 MATERIALIZED 1, - _version UInt64 MATERIALIZED 1, - PRIMARY KEY(key)) + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, PRIMARY KEY(key)) ENGINE = MaterializePostgreSQL( 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') SETTINGS materialize_postgresql_max_block_size = 50000; @@ -376,8 +380,9 @@ def test_connection_loss(started_cluster): create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) ENGINE = MaterializePostgreSQL( 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') PRIMARY KEY key; @@ -412,8 +417,9 @@ def test_clickhouse_restart(started_cluster): create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) ENGINE = MaterializePostgreSQL( 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') PRIMARY KEY key; ''') @@ -439,32 +445,59 @@ def test_rename_table(started_cluster): conn = get_postgres_conn(True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); - instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) ENGINE = MaterializePostgreSQL( 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') PRIMARY KEY key; ''') + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(25)") + result = instance.query('SELECT count() FROM test.postgresql_replica;') - while int(result) != 50: + while int(result) != 25: time.sleep(0.5) result = instance.query('SELECT count() FROM test.postgresql_replica;') instance.query('RENAME TABLE test.postgresql_replica TO test.postgresql_replica_renamed') + assert(int(instance.query('SELECT count() FROM test.postgresql_replica_renamed;')) == 25) + + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(25, 25)") result = instance.query('SELECT count() FROM test.postgresql_replica_renamed;') while int(result) != 50: time.sleep(0.5) result = instance.query('SELECT count() FROM test.postgresql_replica_renamed;') - instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50, 50)") + result = instance.query('SELECT * FROM test.postgresql_replica_renamed ORDER BY key;') + postgresql_replica_check_result(result, True) + cursor.execute('DROP TABLE postgresql_replica;') + instance.query('DROP TABLE IF EXISTS test.postgresql_replica_renamed') - result = instance.query('SELECT count() FROM test.postgresql_replica_renamed;') - while int(result) != 100: + +def test_virtual_columns(started_cluster): + conn = get_postgres_conn(True) + cursor = conn.cursor() + create_postgres_table(cursor, 'postgresql_replica'); + + instance.query('DROP TABLE IF EXISTS test.postgresql_replica') + instance.query(''' + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) + ENGINE = MaterializePostgreSQL( + 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') + PRIMARY KEY key; ''') + + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)") + result = instance.query('SELECT count() FROM test.postgresql_replica;') + while int(result) != 10: time.sleep(0.5) - result = instance.query('SELECT count() FROM test.postgresql_replica_renamed;') + result = instance.query('SELECT count() FROM test.postgresql_replica;') + + # just check that it works, no check with `expected` becuase _version is taken as LSN, which will be different each time. + result = instance.query('SELECT key, value, _sign, _version FROM test.postgresql_replica;') + print(result) + cursor.execute('DROP TABLE postgresql_replica;') if __name__ == '__main__':