Checking tests

This commit is contained in:
kssenii 2021-06-03 19:45:27 +00:00
parent e23bc14582
commit 00e76ca372
3 changed files with 307 additions and 238 deletions

View File

@ -23,7 +23,6 @@
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/ReadFinalForExternalReplicaStorage.h>
namespace DB
{
@ -38,7 +37,7 @@ StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_stora
Pipe StorageMaterializeMySQL::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
@ -48,15 +47,60 @@ Pipe StorageMaterializeMySQL::read(
/// If the background synchronization thread has exception.
rethrowSyncExceptionIfNeed(database);
return readFinalFromNestedStorage(
nested_storage,
column_names,
metadata_snapshot,
query_info,
context,
processed_stage,
max_block_size,
num_streams);
NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
auto lock = nested_storage->lockForShare(context->getCurrentQueryId(), context->getSettingsRef().lock_acquire_timeout);
const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr();
Block nested_header = nested_metadata->getSampleBlock();
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1);
if (ASTSelectQuery * select_query = query_info.query->as<ASTSelectQuery>(); select_query && !column_names_set.count(version_column.name))
{
auto & tables_in_select_query = select_query->tables()->as<ASTTablesInSelectQuery &>();
if (!tables_in_select_query.children.empty())
{
auto & tables_element = tables_in_select_query.children[0]->as<ASTTablesInSelectQueryElement &>();
if (tables_element.table_expression)
tables_element.table_expression->as<ASTTableExpression &>().final = true;
}
}
String filter_column_name;
Names require_columns_name = column_names;
ASTPtr expressions = std::make_shared<ASTExpressionList>();
if (column_names_set.empty() || !column_names_set.count(sign_column.name))
{
require_columns_name.emplace_back(sign_column.name);
const auto & sign_column_name = std::make_shared<ASTIdentifier>(sign_column.name);
const auto & fetch_sign_value = std::make_shared<ASTLiteral>(Field(Int8(1)));
expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value));
filter_column_name = expressions->children.back()->getColumnName();
for (const auto & column_name : column_names)
expressions->children.emplace_back(std::make_shared<ASTIdentifier>(column_name));
}
Pipe pipe = nested_storage->read(require_columns_name, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams);
pipe.addTableLock(lock);
if (!expressions->children.empty() && !pipe.empty())
{
Block pipe_header = pipe.getHeader();
auto syntax = TreeRewriter(context).analyze(expressions, pipe_header.getNamesAndTypesList());
ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expressions, syntax, context).getActions(true /* add_aliases */, false /* project_result */);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(header, expression_actions, filter_column_name, false);
});
}
return pipe;
}
NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const

View File

@ -31,11 +31,12 @@ postgres_table_template_3 = """
key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL)
"""
def get_postgres_conn(database=False, auto_commit=True, database_name='postgres_database'):
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database'):
if database == True:
conn_string = "host='localhost' dbname='{}' user='postgres' password='mysecretpassword'".format(database_name)
conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name)
else:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port)
conn = psycopg2.connect(conn_string)
if auto_commit:
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
@ -47,22 +48,32 @@ def create_postgres_db(cursor, name='postgres_database'):
cursor.execute("CREATE DATABASE {}".format(name))
def drop_postgres_db(cursor, name='postgres_database'):
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
cursor.execute("DROP DATABASE {}".format(name))
def create_clickhouse_postgres_db(name='postgres_database'):
def create_clickhouse_postgres_db(ip, port, name='postgres_database'):
instance.query('''
CREATE DATABASE {}
ENGINE = PostgreSQL('postgres1:5432', '{}', 'postgres', 'mysecretpassword')'''.format(name, name))
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')'''.format(name, ip, port, name))
def drop_clickhouse_postgres_db(name='postgres_database'):
instance.query('DROP DATABASE IF EXISTS {}'.format(name))
instance.query('DROP DATABASE {}'.format(name))
def create_materialized_db(materialized_database='test_database', postgres_database='postgres_database'):
instance.query("CREATE DATABASE {} ENGINE = MaterializePostgreSQL('postgres1:5432', '{}', 'postgres', 'mysecretpassword')".format(materialized_database, postgres_database))
def create_materialized_db(ip, port,
materialized_database='test_database',
postgres_database='postgres_database',
settings=[]):
create_query = "CREATE DATABASE {} ENGINE = MaterializePostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')".format(materialized_database, ip, port, postgres_database)
if len(settings) > 0:
create_query += " SETTINGS "
for i in range(len(settings)):
if i != 0:
create_query += ', '
create_query += settings[i]
instance.query(create_query)
assert materialized_database in instance.query('SHOW DATABASES')
def drop_materialized_db(materialized_database='test_database'):
instance.query('DROP DATABASE IF EXISTS {}'.format(materialized_database))
instance.query('DROP DATABASE {}'.format(materialized_database))
assert materialized_database not in instance.query('SHOW DATABASES')
def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template):
@ -120,10 +131,12 @@ def check_tables_are_synchronized(table_name, order_by='key', postgres_database=
def started_cluster():
try:
cluster.start()
conn = get_postgres_conn()
conn = get_postgres_conn(ip=cluster.postgres_ip,
port=cluster.postgres_port)
cursor = conn.cursor()
create_postgres_db(cursor, 'postgres_database')
create_clickhouse_postgres_db()
create_clickhouse_postgres_db(ip=cluster.postgres_ip,
port=cluster.postgres_port)
instance.query("DROP DATABASE IF EXISTS test_database")
yield cluster
@ -140,7 +153,9 @@ def postgresql_setup_teardown():
@pytest.mark.timeout(120)
def test_load_and_sync_all_database_tables(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
@ -149,7 +164,8 @@ def test_load_and_sync_all_database_tables(started_cluster):
create_postgres_table(cursor, table_name);
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(50)".format(table_name))
instance.query("CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
assert 'test_database' in instance.query('SHOW DATABASES')
for i in range(NUM_TABLES):
@ -167,7 +183,9 @@ def test_load_and_sync_all_database_tables(started_cluster):
@pytest.mark.timeout(120)
def test_replicating_dml(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
@ -175,8 +193,8 @@ def test_replicating_dml(started_cluster):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format(i, i))
instance.query(
"CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(NUM_TABLES):
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 50 + number, {} from numbers(1000)".format(i, i))
@ -210,7 +228,9 @@ def test_replicating_dml(started_cluster):
@pytest.mark.timeout(120)
def test_different_data_types(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
cursor.execute('drop table if exists test_data_types;')
cursor.execute('drop table if exists test_array_data_type;')
@ -236,8 +256,8 @@ def test_different_data_types(started_cluster):
k Char(2)[] -- Nullable(String)
)''')
instance.query(
"CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(10):
instance.query('''
@ -294,7 +314,9 @@ def test_different_data_types(started_cluster):
@pytest.mark.timeout(120)
def test_load_and_sync_subset_of_database_tables(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 10
@ -309,11 +331,9 @@ def test_load_and_sync_subset_of_database_tables(started_cluster):
publication_tables += ', '
publication_tables += table_name
instance.query('''
CREATE DATABASE test_database
ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')
SETTINGS materialize_postgresql_tables_list = '{}';
'''.format(publication_tables))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=["materialize_postgresql_tables_list = '{}'".format(publication_tables)])
assert 'test_database' in instance.query('SHOW DATABASES')
time.sleep(1)
@ -347,13 +367,15 @@ def test_load_and_sync_subset_of_database_tables(started_cluster):
@pytest.mark.timeout(120)
def test_changing_replica_identity_value(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, number from numbers(50)")
instance.query(
"CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 100 + number, number from numbers(50)")
check_tables_are_synchronized('postgresql_replica');
@ -364,7 +386,9 @@ def test_changing_replica_identity_value(started_cluster):
@pytest.mark.timeout(320)
def test_clickhouse_restart(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
@ -390,7 +414,9 @@ def test_clickhouse_restart(started_cluster):
@pytest.mark.timeout(120)
def test_replica_identity_index(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica', template=postgres_table_template_3);
@ -398,8 +424,8 @@ def test_replica_identity_index(started_cluster):
cursor.execute("ALTER TABLE postgresql_replica REPLICA IDENTITY USING INDEX idx")
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(50, 10)")
instance.query(
"CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(100, 10)")
check_tables_are_synchronized('postgresql_replica', order_by='key1');
@ -416,7 +442,9 @@ def test_replica_identity_index(started_cluster):
@pytest.mark.timeout(320)
def test_table_schema_changes(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
@ -424,11 +452,9 @@ def test_table_schema_changes(started_cluster):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i), template=postgres_table_template_2);
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {}, {}, {} from numbers(25)".format(i, i, i, i))
instance.query(
"""CREATE DATABASE test_database
ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')
SETTINGS materialize_postgresql_allow_automatic_update = 1;
""")
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=["materialize_postgresql_allow_automatic_update = 1"])
for i in range(NUM_TABLES):
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 25 + number, {}, {}, {} from numbers(25)".format(i, i, i, i))
@ -472,7 +498,9 @@ def test_table_schema_changes(started_cluster):
@pytest.mark.timeout(120)
def test_many_concurrent_queries(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
@ -522,7 +550,8 @@ def test_many_concurrent_queries(started_cluster):
for i in range(threads_num):
threads.append(threading.Thread(target=attack, args=(i,)))
create_materialized_db()
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for thread in threads:
time.sleep(random.uniform(0, 1))
@ -549,13 +578,16 @@ def test_many_concurrent_queries(started_cluster):
@pytest.mark.timeout(120)
def test_single_transaction(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(database=True, auto_commit=False)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True, auto_commit=False)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica_0');
conn.commit()
create_materialized_db()
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
assert_nested_table_is_created('postgresql_replica_0')
for query in queries:
@ -573,14 +605,15 @@ def test_single_transaction(started_cluster):
def test_virtual_columns(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica_0');
instance.query(
"""CREATE DATABASE test_database
ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')
SETTINGS materialize_postgresql_allow_automatic_update = 1; """)
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=["materialize_postgresql_allow_automatic_update = 1"])
assert_nested_table_is_created('postgresql_replica_0')
instance.query("INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number from numbers(10)")
check_tables_are_synchronized('postgresql_replica_0');
@ -604,93 +637,93 @@ def test_virtual_columns(started_cluster):
drop_materialized_db()
def test_multiple_databases(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database_1")
instance.query("DROP DATABASE IF EXISTS test_database_2")
NUM_TABLES = 5
conn = get_postgres_conn()
cursor = conn.cursor()
create_postgres_db(cursor, 'postgres_database_1')
create_postgres_db(cursor, 'postgres_database_2')
conn1 = get_postgres_conn(True, True, 'postgres_database_1')
conn2 = get_postgres_conn(True, True, 'postgres_database_2')
cursor1 = conn1.cursor()
cursor2 = conn2.cursor()
create_clickhouse_postgres_db('postgres_database_1')
create_clickhouse_postgres_db('postgres_database_2')
cursors = [cursor1, cursor2]
for cursor_id in range(len(cursors)):
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
create_postgres_table(cursors[cursor_id], table_name);
instance.query("INSERT INTO postgres_database_{}.{} SELECT number, number from numbers(50)".format(cursor_id + 1, table_name))
print('database 1 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_1';'''))
print('database 2 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_2';'''))
create_materialized_db('test_database_1', 'postgres_database_1')
create_materialized_db('test_database_2', 'postgres_database_2')
cursors = [cursor1, cursor2]
for cursor_id in range(len(cursors)):
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
instance.query("INSERT INTO postgres_database_{}.{} SELECT 50 + number, number from numbers(50)".format(cursor_id + 1, table_name))
for cursor_id in range(len(cursors)):
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(
table_name, 'key', 'postgres_database_{}'.format(cursor_id + 1), 'test_database_{}'.format(cursor_id + 1));
drop_clickhouse_postgres_db('postgres_database_1')
drop_clickhouse_postgres_db('postgres_database_2')
drop_materialized_db('test_database_1')
drop_materialized_db('test_database_2')
@pytest.mark.timeout(320)
def test_concurrent_transactions(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
cursor = conn.cursor()
NUM_TABLES = 6
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
def transaction(thread_id):
conn_ = get_postgres_conn(True, auto_commit=False)
cursor_ = conn.cursor()
for query in queries:
cursor_.execute(query.format(thread_id))
print('thread {}, query {}'.format(thread_id, query))
conn_.commit()
threads = []
threads_num = 6
for i in range(threads_num):
threads.append(threading.Thread(target=transaction, args=(i,)))
create_materialized_db()
for thread in threads:
time.sleep(random.uniform(0, 0.5))
thread.start()
for thread in threads:
thread.join()
for i in range(NUM_TABLES):
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
count1 = instance.query('SELECT count() FROM postgres_database.postgresql_replica_{}'.format(i))
count2 = instance.query('SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_{})'.format(i))
print(int(count1), int(count2), sep=' ')
assert(int(count1) == int(count2))
drop_materialized_db()
#def test_multiple_databases(started_cluster):
# instance.query("DROP DATABASE IF EXISTS test_database_1")
# instance.query("DROP DATABASE IF EXISTS test_database_2")
# NUM_TABLES = 5
#
# conn = get_postgres_conn()
# cursor = conn.cursor()
# create_postgres_db(cursor, 'postgres_database_1')
# create_postgres_db(cursor, 'postgres_database_2')
#
# conn1 = get_postgres_conn(True, True, 'postgres_database_1')
# conn2 = get_postgres_conn(True, True, 'postgres_database_2')
#
# cursor1 = conn1.cursor()
# cursor2 = conn2.cursor()
#
# create_clickhouse_postgres_db('postgres_database_1')
# create_clickhouse_postgres_db('postgres_database_2')
#
# cursors = [cursor1, cursor2]
# for cursor_id in range(len(cursors)):
# for i in range(NUM_TABLES):
# table_name = 'postgresql_replica_{}'.format(i)
# create_postgres_table(cursors[cursor_id], table_name);
# instance.query("INSERT INTO postgres_database_{}.{} SELECT number, number from numbers(50)".format(cursor_id + 1, table_name))
# print('database 1 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_1';'''))
# print('database 2 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_2';'''))
#
# create_materialized_db('test_database_1', 'postgres_database_1')
# create_materialized_db('test_database_2', 'postgres_database_2')
#
# cursors = [cursor1, cursor2]
# for cursor_id in range(len(cursors)):
# for i in range(NUM_TABLES):
# table_name = 'postgresql_replica_{}'.format(i)
# instance.query("INSERT INTO postgres_database_{}.{} SELECT 50 + number, number from numbers(50)".format(cursor_id + 1, table_name))
#
# for cursor_id in range(len(cursors)):
# for i in range(NUM_TABLES):
# table_name = 'postgresql_replica_{}'.format(i)
# check_tables_are_synchronized(
# table_name, 'key', 'postgres_database_{}'.format(cursor_id + 1), 'test_database_{}'.format(cursor_id + 1));
#
# drop_clickhouse_postgres_db('postgres_database_1')
# drop_clickhouse_postgres_db('postgres_database_2')
# drop_materialized_db('test_database_1')
# drop_materialized_db('test_database_2')
#
#
#@pytest.mark.timeout(320)
#def test_concurrent_transactions(started_cluster):
# instance.query("DROP DATABASE IF EXISTS test_database")
# conn = get_postgres_conn(True)
# cursor = conn.cursor()
# NUM_TABLES = 6
#
# for i in range(NUM_TABLES):
# create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
#
# def transaction(thread_id):
# conn_ = get_postgres_conn(True, auto_commit=False)
# cursor_ = conn.cursor()
# for query in queries:
# cursor_.execute(query.format(thread_id))
# print('thread {}, query {}'.format(thread_id, query))
# conn_.commit()
#
# threads = []
# threads_num = 6
# for i in range(threads_num):
# threads.append(threading.Thread(target=transaction, args=(i,)))
#
# create_materialized_db()
#
# for thread in threads:
# time.sleep(random.uniform(0, 0.5))
# thread.start()
# for thread in threads:
# thread.join()
#
# for i in range(NUM_TABLES):
# check_tables_are_synchronized('postgresql_replica_{}'.format(i));
# count1 = instance.query('SELECT count() FROM postgres_database.postgresql_replica_{}'.format(i))
# count2 = instance.query('SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_{})'.format(i))
# print(int(count1), int(count2), sep=' ')
# assert(int(count1) == int(count2))
# drop_materialized_db()
if __name__ == '__main__':

View File

@ -16,21 +16,33 @@ postgres_table_template = """
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
def get_postgres_conn(database=False):
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database'):
if database == True:
conn_string = "host='localhost' dbname='postgres_database' user='postgres' password='mysecretpassword'"
conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name)
else:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port)
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
if auto_commit:
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(cursor, name):
cursor.execute("CREATE DATABASE {}".format(name))
def create_clickhouse_postgres_db(ip, port, name='postgres_database'):
instance.query('''
CREATE DATABASE {}
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')'''.format(name, ip, port, name))
def create_materialized_table(ip, port):
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'{}:{}', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key; '''.format(ip, port))
def create_postgres_table(cursor, table_name, replica_identity_full=False):
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
@ -52,12 +64,13 @@ def postgresql_replica_check_result(result, check=False, ref_file='test_postgres
def started_cluster():
try:
cluster.start()
conn = get_postgres_conn()
conn = get_postgres_conn(ip=cluster.postgres_ip,
port=cluster.postgres_port)
cursor = conn.cursor()
create_postgres_db(cursor, 'postgres_database')
instance.query('''
CREATE DATABASE postgres_database
ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')''')
create_clickhouse_postgres_db(ip=cluster.postgres_ip,
port=cluster.postgres_port)
instance.query('CREATE DATABASE test')
yield cluster
@ -65,25 +78,23 @@ def started_cluster():
cluster.shutdown()
@pytest.fixture(autouse=True)
def rabbitmq_setup_teardown():
def postgresql_setup_teardown():
yield # run test
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
@pytest.mark.timeout(320)
def test_initial_load_from_snapshot(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;')
while postgresql_replica_check_result(result) == False:
@ -96,18 +107,16 @@ def test_initial_load_from_snapshot(started_cluster):
@pytest.mark.timeout(320)
def test_no_connection_at_startup(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
time.sleep(3)
instance.query('DETACH TABLE test.postgresql_replica')
@ -129,18 +138,16 @@ def test_no_connection_at_startup(started_cluster):
@pytest.mark.timeout(320)
def test_detach_attach_is_ok(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) == 0):
@ -164,19 +171,17 @@ def test_detach_attach_is_ok(started_cluster):
@pytest.mark.timeout(320)
def test_replicating_insert_queries(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) != 10):
@ -206,19 +211,17 @@ def test_replicating_insert_queries(started_cluster):
@pytest.mark.timeout(320)
def test_replicating_delete_queries(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;')
while postgresql_replica_check_result(result) == False:
@ -245,19 +248,17 @@ def test_replicating_delete_queries(started_cluster):
@pytest.mark.timeout(320)
def test_replicating_update_queries(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number + 10 from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) != 50):
@ -277,18 +278,16 @@ def test_replicating_update_queries(started_cluster):
@pytest.mark.timeout(320)
def test_resume_from_written_version(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number + 10 from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) != 50):
@ -320,18 +319,16 @@ def test_resume_from_written_version(started_cluster):
@pytest.mark.timeout(320)
def test_many_replication_messages(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64, PRIMARY KEY(key))
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
SETTINGS materialize_postgresql_max_block_size = 50000;
''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
while (int(result) != 100000):
@ -375,18 +372,16 @@ def test_many_replication_messages(started_cluster):
@pytest.mark.timeout(320)
def test_connection_loss(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
i = 50
while i < 100000:
@ -412,17 +407,16 @@ def test_connection_loss(started_cluster):
@pytest.mark.timeout(320)
def test_clickhouse_restart(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key; ''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
i = 50
while i < 100000:
@ -442,16 +436,15 @@ def test_clickhouse_restart(started_cluster):
def test_rename_table(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key; ''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(25)")
@ -477,16 +470,15 @@ def test_rename_table(started_cluster):
def test_virtual_columns(started_cluster):
conn = get_postgres_conn(True)
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key; ''')
create_materialized_table(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)")
result = instance.query('SELECT count() FROM test.postgresql_replica;')