mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-18 13:42:02 +00:00
764 lines
25 KiB
Python
764 lines
25 KiB
Python
import pytest
|
|
import time
|
|
import psycopg2
|
|
import os.path as p
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.test_tools import assert_eq_with_retry
|
|
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
|
from helpers.test_tools import TSV
|
|
|
|
import threading
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
instance = cluster.add_instance(
|
|
"instance",
|
|
main_configs=["configs/log_conf.xml"],
|
|
with_postgres=True,
|
|
stay_alive=True,
|
|
)
|
|
|
|
postgres_table_template = """
|
|
CREATE TABLE IF NOT EXISTS {} (
|
|
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
|
|
"""
|
|
|
|
queries = [
|
|
"INSERT INTO {} select i, i from generate_series(0, 10000) as t(i);",
|
|
"DELETE FROM {} WHERE (value*value) % 3 = 0;",
|
|
"UPDATE {} SET value = value + 125 WHERE key % 2 = 0;",
|
|
"UPDATE {} SET key=key+20000 WHERE key%2=0",
|
|
"INSERT INTO {} select i, i from generate_series(40000, 50000) as t(i);",
|
|
"DELETE FROM {} WHERE key % 10 = 0;",
|
|
"UPDATE {} SET value = value + 101 WHERE key % 2 = 1;",
|
|
"UPDATE {} SET key=key+80000 WHERE key%2=1",
|
|
"DELETE FROM {} WHERE value % 2 = 0;",
|
|
"UPDATE {} SET value = value + 2000 WHERE key % 5 = 0;",
|
|
"INSERT INTO {} select i, i from generate_series(200000, 250000) as t(i);",
|
|
"DELETE FROM {} WHERE value % 3 = 0;",
|
|
"UPDATE {} SET value = value * 2 WHERE key % 3 = 0;",
|
|
"UPDATE {} SET key=key+500000 WHERE key%2=1",
|
|
"INSERT INTO {} select i, i from generate_series(1000000, 1050000) as t(i);",
|
|
"DELETE FROM {} WHERE value % 9 = 2;",
|
|
"UPDATE {} SET key=key+10000000",
|
|
"UPDATE {} SET value = value + 2 WHERE key % 3 = 1;",
|
|
"DELETE FROM {} WHERE value%5 = 0;",
|
|
]
|
|
|
|
|
|
@pytest.mark.timeout(30)
|
|
def check_tables_are_synchronized(
|
|
table_name, order_by="key", postgres_database="postgres_database"
|
|
):
|
|
expected = instance.query(
|
|
"select * from {}.{} order by {};".format(
|
|
postgres_database, table_name, order_by
|
|
)
|
|
)
|
|
result = instance.query(
|
|
"select * from test.{} order by {};".format(table_name, order_by)
|
|
)
|
|
|
|
while result != expected:
|
|
time.sleep(0.5)
|
|
result = instance.query(
|
|
"select * from test.{} order by {};".format(table_name, order_by)
|
|
)
|
|
|
|
assert result == expected
|
|
|
|
|
|
def get_postgres_conn(
|
|
ip, port, database=False, auto_commit=True, database_name="postgres_database"
|
|
):
|
|
if database == True:
|
|
conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(
|
|
ip, port, database_name
|
|
)
|
|
else:
|
|
conn_string = (
|
|
"host={} port={} user='postgres' password='mysecretpassword'".format(
|
|
ip, port
|
|
)
|
|
)
|
|
|
|
conn = psycopg2.connect(conn_string)
|
|
if auto_commit:
|
|
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
|
conn.autocommit = True
|
|
return conn
|
|
|
|
|
|
def create_postgres_db(cursor, name):
|
|
cursor.execute("CREATE DATABASE {}".format(name))
|
|
|
|
|
|
def create_clickhouse_postgres_db(ip, port, name="postgres_database"):
|
|
instance.query(
|
|
"""
|
|
CREATE DATABASE {}
|
|
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')""".format(
|
|
name, ip, port, name
|
|
)
|
|
)
|
|
|
|
|
|
def create_materialized_table(ip, port, table_name='postgresql_replica'):
|
|
instance.query(
|
|
f"""
|
|
CREATE TABLE test.{table_name} (key Int64, value Int64)
|
|
ENGINE = MaterializedPostgreSQL(
|
|
'{ip}:{port}', 'postgres_database', '{table_name}', 'postgres', 'mysecretpassword')
|
|
PRIMARY KEY key; """
|
|
)
|
|
|
|
|
|
def create_postgres_table(cursor, table_name, replica_identity_full=False):
|
|
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
|
|
cursor.execute(postgres_table_template.format(table_name))
|
|
if replica_identity_full:
|
|
cursor.execute("ALTER TABLE {} REPLICA IDENTITY FULL;".format(table_name))
|
|
|
|
|
|
def postgresql_replica_check_result(
|
|
result, check=False, ref_file="test_postgresql_replica.reference"
|
|
):
|
|
fpath = p.join(p.dirname(__file__), ref_file)
|
|
with open(fpath) as reference:
|
|
if check:
|
|
assert TSV(result) == TSV(reference)
|
|
else:
|
|
return TSV(result) == TSV(reference)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
conn = get_postgres_conn(ip=cluster.postgres_ip, port=cluster.postgres_port)
|
|
cursor = conn.cursor()
|
|
create_postgres_db(cursor, "postgres_database")
|
|
create_clickhouse_postgres_db(
|
|
ip=cluster.postgres_ip, port=cluster.postgres_port
|
|
)
|
|
|
|
instance.query("CREATE DATABASE test")
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
@pytest.mark.timeout(320)
|
|
def test_initial_load_from_snapshot(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)"
|
|
)
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
while postgresql_replica_check_result(result) == False:
|
|
time.sleep(0.2)
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
postgresql_replica_check_result(result, True)
|
|
|
|
|
|
@pytest.mark.timeout(320)
|
|
def test_no_connection_at_startup(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)"
|
|
)
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
time.sleep(3)
|
|
|
|
instance.query("DETACH TABLE test.postgresql_replica")
|
|
started_cluster.pause_container("postgres1")
|
|
|
|
instance.query("ATTACH TABLE test.postgresql_replica")
|
|
time.sleep(3)
|
|
started_cluster.unpause_container("postgres1")
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) == 0:
|
|
time.sleep(0.5)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
postgresql_replica_check_result(result, True)
|
|
|
|
|
|
@pytest.mark.timeout(320)
|
|
def test_detach_attach_is_ok(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)"
|
|
)
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) == 0:
|
|
time.sleep(0.2)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
postgresql_replica_check_result(result, True)
|
|
|
|
instance.query("DETACH TABLE test.postgresql_replica")
|
|
instance.query("ATTACH TABLE test.postgresql_replica")
|
|
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
while postgresql_replica_check_result(result) == False:
|
|
time.sleep(0.5)
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
postgresql_replica_check_result(result, True)
|
|
|
|
|
|
@pytest.mark.timeout(320)
|
|
def test_replicating_insert_queries(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)"
|
|
)
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) != 10:
|
|
time.sleep(0.2)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT 10 + number, 10 + number from numbers(10)"
|
|
)
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT 20 + number, 20 + number from numbers(10)"
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) != 30:
|
|
time.sleep(0.2)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT 30 + number, 30 + number from numbers(10)"
|
|
)
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT 40 + number, 40 + number from numbers(10)"
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) != 50:
|
|
time.sleep(0.2)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
postgresql_replica_check_result(result, True)
|
|
|
|
|
|
@pytest.mark.timeout(320)
|
|
def test_replicating_delete_queries(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)"
|
|
)
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
while postgresql_replica_check_result(result) == False:
|
|
time.sleep(0.2)
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, 50 + number from numbers(50)"
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) != 100:
|
|
time.sleep(0.5)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
cursor.execute("DELETE FROM postgresql_replica WHERE key > 49;")
|
|
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
while postgresql_replica_check_result(result) == False:
|
|
time.sleep(0.5)
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
postgresql_replica_check_result(result, True)
|
|
|
|
|
|
@pytest.mark.timeout(320)
|
|
def test_replicating_update_queries(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number + 10 from numbers(50)"
|
|
)
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) != 50:
|
|
time.sleep(0.2)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
cursor.execute("UPDATE postgresql_replica SET value = value - 10;")
|
|
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
while postgresql_replica_check_result(result) == False:
|
|
time.sleep(0.5)
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
postgresql_replica_check_result(result, True)
|
|
|
|
|
|
@pytest.mark.timeout(320)
|
|
def test_resume_from_written_version(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number + 10 from numbers(50)"
|
|
)
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) != 50:
|
|
time.sleep(0.2)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, 50 + number from numbers(50)"
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) != 100:
|
|
time.sleep(0.2)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
instance.query("DETACH TABLE test.postgresql_replica")
|
|
|
|
cursor.execute("DELETE FROM postgresql_replica WHERE key > 49;")
|
|
cursor.execute("UPDATE postgresql_replica SET value = value - 10;")
|
|
|
|
instance.query("ATTACH TABLE test.postgresql_replica")
|
|
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
while postgresql_replica_check_result(result) == False:
|
|
time.sleep(0.5)
|
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
|
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
postgresql_replica_check_result(result, True)
|
|
|
|
|
|
@pytest.mark.timeout(320)
|
|
def test_many_replication_messages(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000)"
|
|
)
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) != 100000:
|
|
time.sleep(0.2)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
print("SYNC OK")
|
|
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000, 100000)"
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) != 200000:
|
|
time.sleep(1)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
print("INSERT OK")
|
|
|
|
result = instance.query("SELECT key FROM test.postgresql_replica ORDER BY key;")
|
|
expected = instance.query("SELECT number from numbers(200000)")
|
|
assert result == expected
|
|
|
|
cursor.execute("UPDATE postgresql_replica SET value = key + 1 WHERE key < 100000;")
|
|
|
|
result = instance.query(
|
|
"SELECT key FROM test.postgresql_replica WHERE value = key + 1 ORDER BY key;"
|
|
)
|
|
expected = instance.query("SELECT number from numbers(100000)")
|
|
|
|
while result != expected:
|
|
time.sleep(1)
|
|
result = instance.query(
|
|
"SELECT key FROM test.postgresql_replica WHERE value = key + 1 ORDER BY key;"
|
|
)
|
|
print("UPDATE OK")
|
|
|
|
cursor.execute("DELETE FROM postgresql_replica WHERE key % 2 = 1;")
|
|
cursor.execute("DELETE FROM postgresql_replica WHERE key != value;")
|
|
|
|
result = instance.query(
|
|
"SELECT count() FROM (SELECT * FROM test.postgresql_replica);"
|
|
)
|
|
while int(result) != 50000:
|
|
time.sleep(1)
|
|
result = instance.query(
|
|
"SELECT count() FROM (SELECT * FROM test.postgresql_replica);"
|
|
)
|
|
print("DELETE OK")
|
|
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
|
|
|
|
@pytest.mark.timeout(320)
|
|
def test_connection_loss(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)"
|
|
)
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
i = 50
|
|
while i < 100000:
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT {} + number, number from numbers(10000)".format(
|
|
i
|
|
)
|
|
)
|
|
i += 10000
|
|
|
|
started_cluster.pause_container("postgres1")
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
print(int(result))
|
|
time.sleep(6)
|
|
|
|
started_cluster.unpause_container("postgres1")
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) < 100050:
|
|
time.sleep(1)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
assert int(result) == 100050
|
|
|
|
|
|
@pytest.mark.timeout(320)
|
|
def test_clickhouse_restart(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)"
|
|
)
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
i = 50
|
|
while i < 100000:
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT {} + number, number from numbers(10000)".format(
|
|
i
|
|
)
|
|
)
|
|
i += 10000
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) < 100050:
|
|
time.sleep(1)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
print(result)
|
|
assert int(result) == 100050
|
|
|
|
|
|
def test_rename_table(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(25)"
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) != 25:
|
|
time.sleep(0.5)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
instance.query(
|
|
"RENAME TABLE test.postgresql_replica TO test.postgresql_replica_renamed"
|
|
)
|
|
assert (
|
|
int(instance.query("SELECT count() FROM test.postgresql_replica_renamed;"))
|
|
== 25
|
|
)
|
|
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(25, 25)"
|
|
)
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica_renamed;")
|
|
while int(result) != 50:
|
|
time.sleep(0.5)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica_renamed;")
|
|
|
|
result = instance.query(
|
|
"SELECT * FROM test.postgresql_replica_renamed ORDER BY key;"
|
|
)
|
|
postgresql_replica_check_result(result, True)
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica_renamed")
|
|
|
|
|
|
def test_virtual_columns(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)"
|
|
)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
while int(result) != 10:
|
|
time.sleep(0.5)
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica;")
|
|
|
|
# just check that it works, no check with `expected` becuase _version is taken as LSN, which will be different each time.
|
|
result = instance.query(
|
|
"SELECT key, value, _sign, _version FROM test.postgresql_replica;"
|
|
)
|
|
print(result)
|
|
cursor.execute("DROP TABLE postgresql_replica;")
|
|
|
|
|
|
def test_abrupt_connection_loss_while_heavy_replication(started_cluster):
|
|
instance.query("DROP DATABASE IF EXISTS test_database")
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
table_name = "postgresql_replica"
|
|
create_postgres_table(cursor, table_name)
|
|
|
|
instance.query(f"DROP TABLE IF EXISTS test.{table_name}")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
|
|
for i in range(len(queries)):
|
|
query = queries[i].format(table_name)
|
|
cursor.execute(query)
|
|
print("query {}".format(query.format(table_name)))
|
|
|
|
started_cluster.pause_container("postgres1")
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
|
print(result) # Just debug
|
|
|
|
started_cluster.unpause_container("postgres1")
|
|
|
|
check_tables_are_synchronized("postgresql_replica")
|
|
|
|
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
|
print(result) # Just debug
|
|
|
|
|
|
def test_abrupt_server_restart_while_heavy_replication(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
table_name = "postgresql_replica_697"
|
|
create_postgres_table(cursor, table_name)
|
|
|
|
instance.query(
|
|
f"INSERT INTO postgres_database.{table_name} SELECT -1, 1"
|
|
)
|
|
instance.query(f"DROP TABLE IF EXISTS test.{table_name}")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, table_name=table_name
|
|
)
|
|
|
|
n = 1
|
|
while int(instance.query(f"select count() from test.{table_name}")) != 1:
|
|
sleep(1)
|
|
n += 1
|
|
if n > 10:
|
|
break;
|
|
|
|
for query in queries:
|
|
cursor.execute(query.format(table_name))
|
|
print("query {}".format(query.format(table_name)))
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
result = instance.query(f"SELECT count() FROM test.{table_name}")
|
|
print(result) # Just debug
|
|
|
|
check_tables_are_synchronized(table_name)
|
|
|
|
result = instance.query(f"SELECT count() FROM test.{table_name}")
|
|
print(result) # Just debug
|
|
|
|
|
|
def test_drop_table_immediately(started_cluster):
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
create_postgres_table(cursor, "postgresql_replica")
|
|
instance.query(
|
|
"INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(100000)"
|
|
)
|
|
|
|
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
instance.query("DROP TABLE test.postgresql_replica")
|
|
create_materialized_table(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
check_tables_are_synchronized("postgresql_replica")
|
|
instance.query("DROP TABLE test.postgresql_replica")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cluster.start()
|
|
input("Cluster created, press any key to destroy...")
|
|
cluster.shutdown()
|