From 093e53e65a9e0760d3ab9643e2e1bf8af3368fec Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 8 Apr 2021 20:39:56 +0000 Subject: [PATCH] Fix and test server restart --- .../PostgreSQLReplicationHandler.cpp | 2 ++ .../StorageMaterializePostgreSQL.cpp | 7 ++++ .../PostgreSQL/StorageMaterializePostgreSQL.h | 2 ++ .../test.py | 32 ++++++++++++++++++- .../test_storage_postgresql_replica/test.py | 32 ++++++++++++++++++- 5 files changed, 73 insertions(+), 2 deletions(-) diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index a1851cc3248..1cca362ca35 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -119,11 +119,13 @@ void PostgreSQLReplicationHandler::startSynchronization() } else { + LOG_TRACE(log, "Restoring tables..."); for (const auto & [table_name, storage] : storages) { try { nested_storages[table_name] = storage->getNested(); + storage->setStorageMetadata(); storage->setNestedLoaded(); } catch (...) diff --git a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp index e90ada126c0..8b48622da5f 100644 --- a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp +++ b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp @@ -152,6 +152,13 @@ ASTPtr StorageMaterializePostgreSQL::getColumnDeclaration(const DataTypePtr & da } +void StorageMaterializePostgreSQL::setStorageMetadata() +{ + auto storage_metadata = getNested()->getInMemoryMetadataPtr(); + setInMemoryMetadata(*storage_metadata); +} + + /// For single storage MaterializePostgreSQL get columns and primary key columns from storage definition. /// For database engine MaterializePostgreSQL get columns and primary key columns by fetching from PostgreSQL, also using the same /// transaction with snapshot, which is used for initial tables dump. diff --git a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h index 9d933e84050..feba216b4c4 100644 --- a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h +++ b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h @@ -69,6 +69,8 @@ public: void dropNested(); + void setStorageMetadata(); + protected: StorageMaterializePostgreSQL( const StorageID & table_id_, diff --git a/tests/integration/test_postgresql_replica_database_engine/test.py b/tests/integration/test_postgresql_replica_database_engine/test.py index e1c7459de91..535cb0f6a7d 100644 --- a/tests/integration/test_postgresql_replica_database_engine/test.py +++ b/tests/integration/test_postgresql_replica_database_engine/test.py @@ -10,7 +10,10 @@ from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from helpers.test_tools import TSV cluster = ClickHouseCluster(__file__) -instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml'], user_configs = ['configs/users.xml'], with_postgres=True) +instance = cluster.add_instance('instance', + main_configs=['configs/log_conf.xml'], + user_configs = ['configs/users.xml'], + with_postgres=True, stay_alive=True) postgres_table_template = """ CREATE TABLE IF NOT EXISTS {} ( @@ -55,6 +58,7 @@ def assert_nested_table_is_created(table_name): @pytest.mark.timeout(30) def check_tables_are_synchronized(table_name, order_by='key'): assert_nested_table_is_created(table_name) + print("nested ok") expected = instance.query('select * from postgres_database.{} order by {};'.format(table_name, order_by)) result = instance.query('select * from test_database.{} order by {};'.format(table_name, order_by)) @@ -364,6 +368,32 @@ def test_changing_replica_identity_value(started_cluster): check_tables_are_synchronized('postgresql_replica'); +@pytest.mark.timeout(320) +def test_clickhouse_restart(started_cluster): + instance.query("DROP DATABASE IF EXISTS test_database") + conn = get_postgres_conn(True) + cursor = conn.cursor() + NUM_TABLES = 5 + + for i in range(NUM_TABLES): + create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); + instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format(i, i)) + + instance.query("CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')") + + for i in range(NUM_TABLES): + table_name = 'postgresql_replica_{}'.format(i) + check_tables_are_synchronized(table_name); + + for i in range(NUM_TABLES): + instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 50 + number, {} from numbers(50000)".format(i, i)) + + instance.restart_clickhouse() + + for i in range(NUM_TABLES): + check_tables_are_synchronized('postgresql_replica_{}'.format(i)); + + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...") diff --git a/tests/integration/test_storage_postgresql_replica/test.py b/tests/integration/test_storage_postgresql_replica/test.py index 4a7a6592873..bca4f159cf6 100644 --- a/tests/integration/test_storage_postgresql_replica/test.py +++ b/tests/integration/test_storage_postgresql_replica/test.py @@ -9,7 +9,7 @@ from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from helpers.test_tools import TSV cluster = ClickHouseCluster(__file__) -instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml'], with_postgres=True) +instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml'], with_postgres=True, stay_alive=True) postgres_table_template = """ CREATE TABLE IF NOT EXISTS {} ( @@ -400,6 +400,36 @@ def test_connection_loss(started_cluster): assert(int(result) == 100050) +@pytest.mark.timeout(320) +def test_clickhouse_restart(started_cluster): + conn = get_postgres_conn(True) + cursor = conn.cursor() + create_postgres_table(cursor, 'postgresql_replica'); + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") + + instance.query(''' + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, _sign Int8 MATERIALIZED 1, _version UInt64 MATERIALIZED 1) + ENGINE = MaterializePostgreSQL( + 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') + PRIMARY KEY key; ''') + + i = 50 + while i < 100000: + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT {} + number, number from numbers(10000)".format(i)) + i += 10000 + + instance.restart_clickhouse() + + result = instance.query('SELECT count() FROM test.postgresql_replica;') + while int(result) < 100050: + time.sleep(1) + result = instance.query('SELECT count() FROM test.postgresql_replica;') + + cursor.execute('DROP TABLE postgresql_replica;') + print(result) + assert(int(result) == 100050) + + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...")