From 00e76ca372edccde2b6f7ac7430d3231878b19e8 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 3 Jun 2021 19:45:27 +0000 Subject: [PATCH] Checking tests --- src/Storages/StorageMaterializeMySQL.cpp | 66 +++- .../test.py | 303 ++++++++++-------- .../test_storage_postgresql_replica/test.py | 176 +++++----- 3 files changed, 307 insertions(+), 238 deletions(-) diff --git a/src/Storages/StorageMaterializeMySQL.cpp b/src/Storages/StorageMaterializeMySQL.cpp index 6352b62d6f4..8e6f2e1ad63 100644 --- a/src/Storages/StorageMaterializeMySQL.cpp +++ b/src/Storages/StorageMaterializeMySQL.cpp @@ -23,7 +23,6 @@ #include #include -#include namespace DB { @@ -38,7 +37,7 @@ StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_stora Pipe StorageMaterializeMySQL::read( const Names & column_names, - const StorageMetadataPtr & metadata_snapshot, + const StorageMetadataPtr & /*metadata_snapshot*/, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, @@ -48,15 +47,60 @@ Pipe StorageMaterializeMySQL::read( /// If the background synchronization thread has exception. rethrowSyncExceptionIfNeed(database); - return readFinalFromNestedStorage( - nested_storage, - column_names, - metadata_snapshot, - query_info, - context, - processed_stage, - max_block_size, - num_streams); + NameSet column_names_set = NameSet(column_names.begin(), column_names.end()); + auto lock = nested_storage->lockForShare(context->getCurrentQueryId(), context->getSettingsRef().lock_acquire_timeout); + const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr(); + + Block nested_header = nested_metadata->getSampleBlock(); + ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2); + ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1); + + if (ASTSelectQuery * select_query = query_info.query->as(); select_query && !column_names_set.count(version_column.name)) + { + auto & tables_in_select_query = select_query->tables()->as(); + + if (!tables_in_select_query.children.empty()) + { + auto & tables_element = tables_in_select_query.children[0]->as(); + + if (tables_element.table_expression) + tables_element.table_expression->as().final = true; + } + } + + String filter_column_name; + Names require_columns_name = column_names; + ASTPtr expressions = std::make_shared(); + if (column_names_set.empty() || !column_names_set.count(sign_column.name)) + { + require_columns_name.emplace_back(sign_column.name); + + const auto & sign_column_name = std::make_shared(sign_column.name); + const auto & fetch_sign_value = std::make_shared(Field(Int8(1))); + + expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value)); + filter_column_name = expressions->children.back()->getColumnName(); + + for (const auto & column_name : column_names) + expressions->children.emplace_back(std::make_shared(column_name)); + } + + Pipe pipe = nested_storage->read(require_columns_name, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams); + pipe.addTableLock(lock); + + if (!expressions->children.empty() && !pipe.empty()) + { + Block pipe_header = pipe.getHeader(); + auto syntax = TreeRewriter(context).analyze(expressions, pipe_header.getNamesAndTypesList()); + ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expressions, syntax, context).getActions(true /* add_aliases */, false /* project_result */); + + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, expression_actions, filter_column_name, false); + }); + } + + return pipe; } NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const diff --git a/tests/integration/test_postgresql_replica_database_engine/test.py b/tests/integration/test_postgresql_replica_database_engine/test.py index c98e4ee14d8..f19a5cf2467 100644 --- a/tests/integration/test_postgresql_replica_database_engine/test.py +++ b/tests/integration/test_postgresql_replica_database_engine/test.py @@ -31,11 +31,12 @@ postgres_table_template_3 = """ key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL) """ -def get_postgres_conn(database=False, auto_commit=True, database_name='postgres_database'): +def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database'): if database == True: - conn_string = "host='localhost' dbname='{}' user='postgres' password='mysecretpassword'".format(database_name) + conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name) else: - conn_string = "host='localhost' user='postgres' password='mysecretpassword'" + conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port) + conn = psycopg2.connect(conn_string) if auto_commit: conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) @@ -47,22 +48,32 @@ def create_postgres_db(cursor, name='postgres_database'): cursor.execute("CREATE DATABASE {}".format(name)) def drop_postgres_db(cursor, name='postgres_database'): - cursor.execute("DROP DATABASE IF EXISTS {}".format(name)) + cursor.execute("DROP DATABASE {}".format(name)) -def create_clickhouse_postgres_db(name='postgres_database'): +def create_clickhouse_postgres_db(ip, port, name='postgres_database'): instance.query(''' CREATE DATABASE {} - ENGINE = PostgreSQL('postgres1:5432', '{}', 'postgres', 'mysecretpassword')'''.format(name, name)) + ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')'''.format(name, ip, port, name)) def drop_clickhouse_postgres_db(name='postgres_database'): - instance.query('DROP DATABASE IF EXISTS {}'.format(name)) + instance.query('DROP DATABASE {}'.format(name)) -def create_materialized_db(materialized_database='test_database', postgres_database='postgres_database'): - instance.query("CREATE DATABASE {} ENGINE = MaterializePostgreSQL('postgres1:5432', '{}', 'postgres', 'mysecretpassword')".format(materialized_database, postgres_database)) +def create_materialized_db(ip, port, + materialized_database='test_database', + postgres_database='postgres_database', + settings=[]): + create_query = "CREATE DATABASE {} ENGINE = MaterializePostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')".format(materialized_database, ip, port, postgres_database) + if len(settings) > 0: + create_query += " SETTINGS " + for i in range(len(settings)): + if i != 0: + create_query += ', ' + create_query += settings[i] + instance.query(create_query) assert materialized_database in instance.query('SHOW DATABASES') def drop_materialized_db(materialized_database='test_database'): - instance.query('DROP DATABASE IF EXISTS {}'.format(materialized_database)) + instance.query('DROP DATABASE {}'.format(materialized_database)) assert materialized_database not in instance.query('SHOW DATABASES') def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template): @@ -120,10 +131,12 @@ def check_tables_are_synchronized(table_name, order_by='key', postgres_database= def started_cluster(): try: cluster.start() - conn = get_postgres_conn() + conn = get_postgres_conn(ip=cluster.postgres_ip, + port=cluster.postgres_port) cursor = conn.cursor() create_postgres_db(cursor, 'postgres_database') - create_clickhouse_postgres_db() + create_clickhouse_postgres_db(ip=cluster.postgres_ip, + port=cluster.postgres_port) instance.query("DROP DATABASE IF EXISTS test_database") yield cluster @@ -140,7 +153,9 @@ def postgresql_setup_teardown(): @pytest.mark.timeout(120) def test_load_and_sync_all_database_tables(started_cluster): instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() NUM_TABLES = 5 @@ -149,7 +164,8 @@ def test_load_and_sync_all_database_tables(started_cluster): create_postgres_table(cursor, table_name); instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(50)".format(table_name)) - instance.query("CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')") + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) assert 'test_database' in instance.query('SHOW DATABASES') for i in range(NUM_TABLES): @@ -167,7 +183,9 @@ def test_load_and_sync_all_database_tables(started_cluster): @pytest.mark.timeout(120) def test_replicating_dml(started_cluster): instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() NUM_TABLES = 5 @@ -175,8 +193,8 @@ def test_replicating_dml(started_cluster): create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format(i, i)) - instance.query( - "CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')") + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) for i in range(NUM_TABLES): instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 50 + number, {} from numbers(1000)".format(i, i)) @@ -210,7 +228,9 @@ def test_replicating_dml(started_cluster): @pytest.mark.timeout(120) def test_different_data_types(started_cluster): instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() cursor.execute('drop table if exists test_data_types;') cursor.execute('drop table if exists test_array_data_type;') @@ -236,8 +256,8 @@ def test_different_data_types(started_cluster): k Char(2)[] -- Nullable(String) )''') - instance.query( - "CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')") + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) for i in range(10): instance.query(''' @@ -294,7 +314,9 @@ def test_different_data_types(started_cluster): @pytest.mark.timeout(120) def test_load_and_sync_subset_of_database_tables(started_cluster): instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() NUM_TABLES = 10 @@ -309,11 +331,9 @@ def test_load_and_sync_subset_of_database_tables(started_cluster): publication_tables += ', ' publication_tables += table_name - instance.query(''' - CREATE DATABASE test_database - ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword') - SETTINGS materialize_postgresql_tables_list = '{}'; - '''.format(publication_tables)) + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + settings=["materialize_postgresql_tables_list = '{}'".format(publication_tables)]) assert 'test_database' in instance.query('SHOW DATABASES') time.sleep(1) @@ -347,13 +367,15 @@ def test_load_and_sync_subset_of_database_tables(started_cluster): @pytest.mark.timeout(120) def test_changing_replica_identity_value(started_cluster): instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, number from numbers(50)") - instance.query( - "CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')") + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 100 + number, number from numbers(50)") check_tables_are_synchronized('postgresql_replica'); @@ -364,7 +386,9 @@ def test_changing_replica_identity_value(started_cluster): @pytest.mark.timeout(320) def test_clickhouse_restart(started_cluster): instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() NUM_TABLES = 5 @@ -390,7 +414,9 @@ def test_clickhouse_restart(started_cluster): @pytest.mark.timeout(120) def test_replica_identity_index(started_cluster): instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica', template=postgres_table_template_3); @@ -398,8 +424,8 @@ def test_replica_identity_index(started_cluster): cursor.execute("ALTER TABLE postgresql_replica REPLICA IDENTITY USING INDEX idx") instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(50, 10)") - instance.query( - "CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')") + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(100, 10)") check_tables_are_synchronized('postgresql_replica', order_by='key1'); @@ -416,7 +442,9 @@ def test_replica_identity_index(started_cluster): @pytest.mark.timeout(320) def test_table_schema_changes(started_cluster): instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() NUM_TABLES = 5 @@ -424,11 +452,9 @@ def test_table_schema_changes(started_cluster): create_postgres_table(cursor, 'postgresql_replica_{}'.format(i), template=postgres_table_template_2); instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {}, {}, {} from numbers(25)".format(i, i, i, i)) - instance.query( - """CREATE DATABASE test_database - ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword') - SETTINGS materialize_postgresql_allow_automatic_update = 1; - """) + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + settings=["materialize_postgresql_allow_automatic_update = 1"]) for i in range(NUM_TABLES): instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 25 + number, {}, {}, {} from numbers(25)".format(i, i, i, i)) @@ -472,7 +498,9 @@ def test_table_schema_changes(started_cluster): @pytest.mark.timeout(120) def test_many_concurrent_queries(started_cluster): instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() NUM_TABLES = 5 @@ -522,7 +550,8 @@ def test_many_concurrent_queries(started_cluster): for i in range(threads_num): threads.append(threading.Thread(target=attack, args=(i,))) - create_materialized_db() + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) for thread in threads: time.sleep(random.uniform(0, 1)) @@ -549,13 +578,16 @@ def test_many_concurrent_queries(started_cluster): @pytest.mark.timeout(120) def test_single_transaction(started_cluster): instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(database=True, auto_commit=False) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True, auto_commit=False) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica_0'); conn.commit() - create_materialized_db() + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) assert_nested_table_is_created('postgresql_replica_0') for query in queries: @@ -573,14 +605,15 @@ def test_single_transaction(started_cluster): def test_virtual_columns(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica_0'); - instance.query( - """CREATE DATABASE test_database - ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword') - SETTINGS materialize_postgresql_allow_automatic_update = 1; """) + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + settings=["materialize_postgresql_allow_automatic_update = 1"]) assert_nested_table_is_created('postgresql_replica_0') instance.query("INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number from numbers(10)") check_tables_are_synchronized('postgresql_replica_0'); @@ -604,93 +637,93 @@ def test_virtual_columns(started_cluster): drop_materialized_db() -def test_multiple_databases(started_cluster): - instance.query("DROP DATABASE IF EXISTS test_database_1") - instance.query("DROP DATABASE IF EXISTS test_database_2") - NUM_TABLES = 5 - - conn = get_postgres_conn() - cursor = conn.cursor() - create_postgres_db(cursor, 'postgres_database_1') - create_postgres_db(cursor, 'postgres_database_2') - - conn1 = get_postgres_conn(True, True, 'postgres_database_1') - conn2 = get_postgres_conn(True, True, 'postgres_database_2') - - cursor1 = conn1.cursor() - cursor2 = conn2.cursor() - - create_clickhouse_postgres_db('postgres_database_1') - create_clickhouse_postgres_db('postgres_database_2') - - cursors = [cursor1, cursor2] - for cursor_id in range(len(cursors)): - for i in range(NUM_TABLES): - table_name = 'postgresql_replica_{}'.format(i) - create_postgres_table(cursors[cursor_id], table_name); - instance.query("INSERT INTO postgres_database_{}.{} SELECT number, number from numbers(50)".format(cursor_id + 1, table_name)) - print('database 1 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_1';''')) - print('database 2 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_2';''')) - - create_materialized_db('test_database_1', 'postgres_database_1') - create_materialized_db('test_database_2', 'postgres_database_2') - - cursors = [cursor1, cursor2] - for cursor_id in range(len(cursors)): - for i in range(NUM_TABLES): - table_name = 'postgresql_replica_{}'.format(i) - instance.query("INSERT INTO postgres_database_{}.{} SELECT 50 + number, number from numbers(50)".format(cursor_id + 1, table_name)) - - for cursor_id in range(len(cursors)): - for i in range(NUM_TABLES): - table_name = 'postgresql_replica_{}'.format(i) - check_tables_are_synchronized( - table_name, 'key', 'postgres_database_{}'.format(cursor_id + 1), 'test_database_{}'.format(cursor_id + 1)); - - drop_clickhouse_postgres_db('postgres_database_1') - drop_clickhouse_postgres_db('postgres_database_2') - drop_materialized_db('test_database_1') - drop_materialized_db('test_database_2') - - -@pytest.mark.timeout(320) -def test_concurrent_transactions(started_cluster): - instance.query("DROP DATABASE IF EXISTS test_database") - conn = get_postgres_conn(True) - cursor = conn.cursor() - NUM_TABLES = 6 - - for i in range(NUM_TABLES): - create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); - - def transaction(thread_id): - conn_ = get_postgres_conn(True, auto_commit=False) - cursor_ = conn.cursor() - for query in queries: - cursor_.execute(query.format(thread_id)) - print('thread {}, query {}'.format(thread_id, query)) - conn_.commit() - - threads = [] - threads_num = 6 - for i in range(threads_num): - threads.append(threading.Thread(target=transaction, args=(i,))) - - create_materialized_db() - - for thread in threads: - time.sleep(random.uniform(0, 0.5)) - thread.start() - for thread in threads: - thread.join() - - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); - count1 = instance.query('SELECT count() FROM postgres_database.postgresql_replica_{}'.format(i)) - count2 = instance.query('SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_{})'.format(i)) - print(int(count1), int(count2), sep=' ') - assert(int(count1) == int(count2)) - drop_materialized_db() +#def test_multiple_databases(started_cluster): +# instance.query("DROP DATABASE IF EXISTS test_database_1") +# instance.query("DROP DATABASE IF EXISTS test_database_2") +# NUM_TABLES = 5 +# +# conn = get_postgres_conn() +# cursor = conn.cursor() +# create_postgres_db(cursor, 'postgres_database_1') +# create_postgres_db(cursor, 'postgres_database_2') +# +# conn1 = get_postgres_conn(True, True, 'postgres_database_1') +# conn2 = get_postgres_conn(True, True, 'postgres_database_2') +# +# cursor1 = conn1.cursor() +# cursor2 = conn2.cursor() +# +# create_clickhouse_postgres_db('postgres_database_1') +# create_clickhouse_postgres_db('postgres_database_2') +# +# cursors = [cursor1, cursor2] +# for cursor_id in range(len(cursors)): +# for i in range(NUM_TABLES): +# table_name = 'postgresql_replica_{}'.format(i) +# create_postgres_table(cursors[cursor_id], table_name); +# instance.query("INSERT INTO postgres_database_{}.{} SELECT number, number from numbers(50)".format(cursor_id + 1, table_name)) +# print('database 1 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_1';''')) +# print('database 2 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_2';''')) +# +# create_materialized_db('test_database_1', 'postgres_database_1') +# create_materialized_db('test_database_2', 'postgres_database_2') +# +# cursors = [cursor1, cursor2] +# for cursor_id in range(len(cursors)): +# for i in range(NUM_TABLES): +# table_name = 'postgresql_replica_{}'.format(i) +# instance.query("INSERT INTO postgres_database_{}.{} SELECT 50 + number, number from numbers(50)".format(cursor_id + 1, table_name)) +# +# for cursor_id in range(len(cursors)): +# for i in range(NUM_TABLES): +# table_name = 'postgresql_replica_{}'.format(i) +# check_tables_are_synchronized( +# table_name, 'key', 'postgres_database_{}'.format(cursor_id + 1), 'test_database_{}'.format(cursor_id + 1)); +# +# drop_clickhouse_postgres_db('postgres_database_1') +# drop_clickhouse_postgres_db('postgres_database_2') +# drop_materialized_db('test_database_1') +# drop_materialized_db('test_database_2') +# +# +#@pytest.mark.timeout(320) +#def test_concurrent_transactions(started_cluster): +# instance.query("DROP DATABASE IF EXISTS test_database") +# conn = get_postgres_conn(True) +# cursor = conn.cursor() +# NUM_TABLES = 6 +# +# for i in range(NUM_TABLES): +# create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); +# +# def transaction(thread_id): +# conn_ = get_postgres_conn(True, auto_commit=False) +# cursor_ = conn.cursor() +# for query in queries: +# cursor_.execute(query.format(thread_id)) +# print('thread {}, query {}'.format(thread_id, query)) +# conn_.commit() +# +# threads = [] +# threads_num = 6 +# for i in range(threads_num): +# threads.append(threading.Thread(target=transaction, args=(i,))) +# +# create_materialized_db() +# +# for thread in threads: +# time.sleep(random.uniform(0, 0.5)) +# thread.start() +# for thread in threads: +# thread.join() +# +# for i in range(NUM_TABLES): +# check_tables_are_synchronized('postgresql_replica_{}'.format(i)); +# count1 = instance.query('SELECT count() FROM postgres_database.postgresql_replica_{}'.format(i)) +# count2 = instance.query('SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_{})'.format(i)) +# print(int(count1), int(count2), sep=' ') +# assert(int(count1) == int(count2)) +# drop_materialized_db() if __name__ == '__main__': diff --git a/tests/integration/test_storage_postgresql_replica/test.py b/tests/integration/test_storage_postgresql_replica/test.py index 53eedbc8b7d..e448cfc8e99 100644 --- a/tests/integration/test_storage_postgresql_replica/test.py +++ b/tests/integration/test_storage_postgresql_replica/test.py @@ -16,21 +16,33 @@ postgres_table_template = """ key Integer NOT NULL, value Integer, PRIMARY KEY(key)) """ - -def get_postgres_conn(database=False): +def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database'): if database == True: - conn_string = "host='localhost' dbname='postgres_database' user='postgres' password='mysecretpassword'" + conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name) else: - conn_string = "host='localhost' user='postgres' password='mysecretpassword'" + conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port) + conn = psycopg2.connect(conn_string) - conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - conn.autocommit = True + if auto_commit: + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + conn.autocommit = True return conn def create_postgres_db(cursor, name): cursor.execute("CREATE DATABASE {}".format(name)) +def create_clickhouse_postgres_db(ip, port, name='postgres_database'): + instance.query(''' + CREATE DATABASE {} + ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')'''.format(name, ip, port, name)) + +def create_materialized_table(ip, port): + instance.query(''' + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) + ENGINE = MaterializePostgreSQL( + '{}:{}', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') + PRIMARY KEY key; '''.format(ip, port)) def create_postgres_table(cursor, table_name, replica_identity_full=False): cursor.execute("DROP TABLE IF EXISTS {}".format(table_name)) @@ -52,12 +64,13 @@ def postgresql_replica_check_result(result, check=False, ref_file='test_postgres def started_cluster(): try: cluster.start() - conn = get_postgres_conn() + conn = get_postgres_conn(ip=cluster.postgres_ip, + port=cluster.postgres_port) cursor = conn.cursor() create_postgres_db(cursor, 'postgres_database') - instance.query(''' - CREATE DATABASE postgres_database - ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')''') + create_clickhouse_postgres_db(ip=cluster.postgres_ip, + port=cluster.postgres_port) + instance.query('CREATE DATABASE test') yield cluster @@ -65,25 +78,23 @@ def started_cluster(): cluster.shutdown() @pytest.fixture(autouse=True) -def rabbitmq_setup_teardown(): +def postgresql_setup_teardown(): yield # run test instance.query('DROP TABLE IF EXISTS test.postgresql_replica') @pytest.mark.timeout(320) def test_initial_load_from_snapshot(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; - ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;') while postgresql_replica_check_result(result) == False: @@ -96,18 +107,16 @@ def test_initial_load_from_snapshot(started_cluster): @pytest.mark.timeout(320) def test_no_connection_at_startup(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; - ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) time.sleep(3) instance.query('DETACH TABLE test.postgresql_replica') @@ -129,18 +138,16 @@ def test_no_connection_at_startup(started_cluster): @pytest.mark.timeout(320) def test_detach_attach_is_ok(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; - ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) result = instance.query('SELECT count() FROM test.postgresql_replica;') while (int(result) == 0): @@ -164,19 +171,17 @@ def test_detach_attach_is_ok(started_cluster): @pytest.mark.timeout(320) def test_replicating_insert_queries(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)") instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; - ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) result = instance.query('SELECT count() FROM test.postgresql_replica;') while (int(result) != 10): @@ -206,19 +211,17 @@ def test_replicating_insert_queries(started_cluster): @pytest.mark.timeout(320) def test_replicating_delete_queries(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; - ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;') while postgresql_replica_check_result(result) == False: @@ -245,19 +248,17 @@ def test_replicating_delete_queries(started_cluster): @pytest.mark.timeout(320) def test_replicating_update_queries(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number + 10 from numbers(50)") instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; - ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) result = instance.query('SELECT count() FROM test.postgresql_replica;') while (int(result) != 50): @@ -277,18 +278,16 @@ def test_replicating_update_queries(started_cluster): @pytest.mark.timeout(320) def test_resume_from_written_version(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number + 10 from numbers(50)") instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; - ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) result = instance.query('SELECT count() FROM test.postgresql_replica;') while (int(result) != 50): @@ -320,18 +319,16 @@ def test_resume_from_written_version(started_cluster): @pytest.mark.timeout(320) def test_many_replication_messages(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000)") instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, PRIMARY KEY(key)) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - SETTINGS materialize_postgresql_max_block_size = 50000; - ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) result = instance.query('SELECT count() FROM test.postgresql_replica;') while (int(result) != 100000): @@ -375,18 +372,16 @@ def test_many_replication_messages(started_cluster): @pytest.mark.timeout(320) def test_connection_loss(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; - ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) i = 50 while i < 100000: @@ -412,17 +407,16 @@ def test_connection_loss(started_cluster): @pytest.mark.timeout(320) def test_clickhouse_restart(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) i = 50 while i < 100000: @@ -442,16 +436,15 @@ def test_clickhouse_restart(started_cluster): def test_rename_table(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(25)") @@ -477,16 +470,15 @@ def test_rename_table(started_cluster): def test_virtual_columns(started_cluster): - conn = get_postgres_conn(True) + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query('DROP TABLE IF EXISTS test.postgresql_replica') - instance.query(''' - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) - ENGINE = MaterializePostgreSQL( - 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; ''') + create_materialized_table(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port) instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)") result = instance.query('SELECT count() FROM test.postgresql_replica;')