mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
1148 lines
40 KiB
Python
1148 lines
40 KiB
Python
import pytest
|
|
|
|
import time
|
|
import psycopg2
|
|
import os.path as p
|
|
import random
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.test_tools import assert_eq_with_retry
|
|
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
|
from helpers.test_tools import TSV
|
|
|
|
from random import randrange
|
|
import threading
|
|
|
|
from helpers.postgres_utility import get_postgres_conn
|
|
from helpers.postgres_utility import PostgresManager
|
|
|
|
from helpers.postgres_utility import create_replication_slot, drop_replication_slot
|
|
from helpers.postgres_utility import create_postgres_schema, drop_postgres_schema
|
|
from helpers.postgres_utility import create_postgres_table, drop_postgres_table
|
|
from helpers.postgres_utility import (
|
|
create_postgres_table_with_schema,
|
|
drop_postgres_table_with_schema,
|
|
)
|
|
from helpers.postgres_utility import check_tables_are_synchronized
|
|
from helpers.postgres_utility import check_several_tables_are_synchronized
|
|
from helpers.postgres_utility import assert_nested_table_is_created
|
|
from helpers.postgres_utility import assert_number_of_columns
|
|
from helpers.postgres_utility import (
|
|
postgres_table_template,
|
|
postgres_table_template_2,
|
|
postgres_table_template_3,
|
|
postgres_table_template_4,
|
|
postgres_table_template_5,
|
|
postgres_table_template_6,
|
|
)
|
|
from helpers.postgres_utility import queries
|
|
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
instance = cluster.add_instance(
|
|
"instance",
|
|
main_configs=["configs/log_conf.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
with_postgres=True,
|
|
stay_alive=True,
|
|
)
|
|
|
|
instance2 = cluster.add_instance(
|
|
"instance2",
|
|
main_configs=["configs/log_conf.xml", "configs/merge_tree_too_many_parts.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
with_postgres=True,
|
|
stay_alive=True,
|
|
)
|
|
|
|
|
|
pg_manager = PostgresManager()
|
|
pg_manager2 = PostgresManager()
|
|
pg_manager_instance2 = PostgresManager()
|
|
pg_manager3 = PostgresManager()
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
pg_manager.init(
|
|
instance,
|
|
cluster.postgres_ip,
|
|
cluster.postgres_port,
|
|
default_database="postgres_database",
|
|
)
|
|
pg_manager_instance2.init(
|
|
instance2,
|
|
cluster.postgres_ip,
|
|
cluster.postgres_port,
|
|
default_database="postgres_database",
|
|
postgres_db_exists=True,
|
|
)
|
|
pg_manager2.init(
|
|
instance2, cluster.postgres_ip, cluster.postgres_port, "postgres_database2"
|
|
)
|
|
pg_manager3.init(
|
|
instance,
|
|
cluster.postgres_ip,
|
|
cluster.postgres_port,
|
|
default_database="postgres-postgres",
|
|
)
|
|
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_teardown():
|
|
print("PostgreSQL is available - running test")
|
|
yield # run test
|
|
pg_manager.restart()
|
|
|
|
|
|
def test_add_new_table_to_replication(started_cluster):
|
|
NUM_TABLES = 5
|
|
|
|
pg_manager.create_and_fill_postgres_tables(NUM_TABLES, 10000)
|
|
pg_manager.create_materialized_db(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
check_several_tables_are_synchronized(instance, NUM_TABLES)
|
|
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
assert (
|
|
result
|
|
== "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n"
|
|
)
|
|
|
|
table_name = "postgresql_replica_5"
|
|
pg_manager.create_and_fill_postgres_table(table_name)
|
|
|
|
result = instance.query("SHOW CREATE DATABASE test_database")
|
|
assert (
|
|
result[:63]
|
|
== "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL("
|
|
) # Check without ip
|
|
assert result[-51:] == "\\'postgres_database\\', \\'postgres\\', \\'[HIDDEN]\\')\n"
|
|
|
|
result = instance.query_and_get_error(
|
|
"ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables_list='tabl1'"
|
|
)
|
|
assert (
|
|
"Changing setting `materialized_postgresql_tables_list` is not allowed"
|
|
in result
|
|
)
|
|
|
|
result = instance.query_and_get_error(
|
|
"ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables='tabl1'"
|
|
)
|
|
assert "Database engine MaterializedPostgreSQL does not support setting" in result
|
|
|
|
instance.query(f"ATTACH TABLE test_database.{table_name}")
|
|
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
assert (
|
|
result
|
|
== "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\n"
|
|
)
|
|
|
|
check_tables_are_synchronized(instance, table_name)
|
|
instance.query(
|
|
f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(10000, 10000)"
|
|
)
|
|
check_tables_are_synchronized(instance, table_name)
|
|
|
|
result = instance.query_and_get_error(f"ATTACH TABLE test_database.{table_name}")
|
|
assert "Table test_database.postgresql_replica_5 already exists" in result
|
|
|
|
result = instance.query_and_get_error("ATTACH TABLE test_database.unknown_table")
|
|
assert "PostgreSQL table unknown_table does not exist" in result
|
|
|
|
result = instance.query("SHOW CREATE DATABASE test_database")
|
|
assert (
|
|
result[:63]
|
|
== "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL("
|
|
)
|
|
assert (
|
|
result[-180:]
|
|
== ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5\\'\n"
|
|
)
|
|
|
|
table_name = "postgresql_replica_6"
|
|
pg_manager.create_postgres_table(table_name)
|
|
instance.query(
|
|
"INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(
|
|
table_name
|
|
)
|
|
)
|
|
instance.query(f"ATTACH TABLE test_database.{table_name}")
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
table_name = "postgresql_replica_7"
|
|
pg_manager.create_postgres_table(table_name)
|
|
instance.query(
|
|
"INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(
|
|
table_name
|
|
)
|
|
)
|
|
instance.query(f"ATTACH TABLE test_database.{table_name}")
|
|
|
|
result = instance.query("SHOW CREATE DATABASE test_database")
|
|
assert (
|
|
result[:63]
|
|
== "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL("
|
|
)
|
|
assert (
|
|
result[-222:]
|
|
== ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5,postgresql_replica_6,postgresql_replica_7\\'\n"
|
|
)
|
|
|
|
instance.query(
|
|
f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(10000, 10000)"
|
|
)
|
|
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
assert (
|
|
result
|
|
== "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\npostgresql_replica_6\npostgresql_replica_7\n"
|
|
)
|
|
check_several_tables_are_synchronized(instance, NUM_TABLES + 3)
|
|
|
|
|
|
def test_remove_table_from_replication(started_cluster):
|
|
NUM_TABLES = 5
|
|
pg_manager.create_and_fill_postgres_tables(NUM_TABLES, 10000)
|
|
pg_manager.create_materialized_db(
|
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
|
)
|
|
check_several_tables_are_synchronized(instance, NUM_TABLES)
|
|
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
assert (
|
|
result
|
|
== "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n"
|
|
)
|
|
|
|
result = instance.query("SHOW CREATE DATABASE test_database")
|
|
assert (
|
|
result[:63]
|
|
== "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL("
|
|
)
|
|
assert result[-51:] == "\\'postgres_database\\', \\'postgres\\', \\'[HIDDEN]\\')\n"
|
|
|
|
table_name = "postgresql_replica_4"
|
|
instance.query(f"DETACH TABLE test_database.{table_name} PERMANENTLY")
|
|
result = instance.query_and_get_error(f"SELECT * FROM test_database.{table_name}")
|
|
assert "UNKNOWN_TABLE" in result
|
|
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
assert (
|
|
result
|
|
== "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\n"
|
|
)
|
|
|
|
result = instance.query("SHOW CREATE DATABASE test_database")
|
|
assert (
|
|
result[:63]
|
|
== "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL("
|
|
)
|
|
assert (
|
|
result[-138:]
|
|
== ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3\\'\n"
|
|
)
|
|
|
|
instance.query(f"ATTACH TABLE test_database.{table_name}")
|
|
check_tables_are_synchronized(instance, table_name)
|
|
check_several_tables_are_synchronized(instance, NUM_TABLES)
|
|
instance.query(
|
|
f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(10000, 10000)"
|
|
)
|
|
check_tables_are_synchronized(instance, table_name)
|
|
|
|
result = instance.query("SHOW CREATE DATABASE test_database")
|
|
assert (
|
|
result[:63]
|
|
== "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL("
|
|
)
|
|
assert (
|
|
result[-159:]
|
|
== ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n"
|
|
)
|
|
|
|
table_name = "postgresql_replica_1"
|
|
instance.query(f"DETACH TABLE test_database.{table_name} PERMANENTLY")
|
|
result = instance.query("SHOW CREATE DATABASE test_database")
|
|
assert (
|
|
result[:63]
|
|
== "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL("
|
|
)
|
|
assert (
|
|
result[-138:]
|
|
== ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n"
|
|
)
|
|
|
|
pg_manager.execute(f"drop table if exists postgresql_replica_0;")
|
|
|
|
# Removing from replication table which does not exist in PostgreSQL must be ok.
|
|
instance.query("DETACH TABLE test_database.postgresql_replica_0 PERMANENTLY")
|
|
assert instance.contains_in_log(
|
|
"from publication, because table does not exist in PostgreSQL"
|
|
)
|
|
|
|
|
|
def test_predefined_connection_configuration(started_cluster):
|
|
pg_manager.execute(f"DROP TABLE IF EXISTS test_table")
|
|
pg_manager.execute(
|
|
f"CREATE TABLE test_table (key integer PRIMARY KEY, value integer)"
|
|
)
|
|
pg_manager.execute(f"INSERT INTO test_table SELECT 1, 2")
|
|
instance.query(
|
|
"CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1) SETTINGS materialized_postgresql_tables_list='test_table'"
|
|
)
|
|
check_tables_are_synchronized(instance, "test_table")
|
|
pg_manager.drop_materialized_db()
|
|
|
|
|
|
insert_counter = 0
|
|
|
|
|
|
def test_database_with_single_non_default_schema(started_cluster):
|
|
cursor = pg_manager.get_db_cursor()
|
|
NUM_TABLES = 5
|
|
schema_name = "test_schema"
|
|
materialized_db = "test_database"
|
|
clickhouse_postgres_db = "postgres_database_with_schema"
|
|
global insert_counter
|
|
insert_counter = 0
|
|
|
|
def insert_into_tables():
|
|
global insert_counter
|
|
clickhouse_postgres_db = "postgres_database_with_schema"
|
|
for i in range(NUM_TABLES):
|
|
table_name = f"postgresql_replica_{i}"
|
|
instance.query(
|
|
f"INSERT INTO {clickhouse_postgres_db}.{table_name} SELECT number, number from numbers(1000 * {insert_counter}, 1000)"
|
|
)
|
|
insert_counter += 1
|
|
|
|
def assert_show_tables(expected):
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
assert result == expected
|
|
print("assert show tables Ok")
|
|
|
|
def check_all_tables_are_synchronized():
|
|
for i in range(NUM_TABLES):
|
|
print("checking table", i)
|
|
check_tables_are_synchronized(
|
|
instance,
|
|
f"postgresql_replica_{i}",
|
|
postgres_database=clickhouse_postgres_db,
|
|
)
|
|
print("synchronization Ok")
|
|
|
|
create_postgres_schema(cursor, schema_name)
|
|
pg_manager.create_clickhouse_postgres_db(
|
|
database_name=clickhouse_postgres_db,
|
|
schema_name=schema_name,
|
|
postgres_database="postgres_database",
|
|
)
|
|
|
|
for i in range(NUM_TABLES):
|
|
create_postgres_table_with_schema(
|
|
cursor, schema_name, f"postgresql_replica_{i}"
|
|
)
|
|
|
|
insert_into_tables()
|
|
pg_manager.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[
|
|
f"materialized_postgresql_schema = '{schema_name}'",
|
|
],
|
|
)
|
|
|
|
insert_into_tables()
|
|
check_all_tables_are_synchronized()
|
|
assert_show_tables(
|
|
"postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n"
|
|
)
|
|
|
|
instance.restart_clickhouse()
|
|
check_all_tables_are_synchronized()
|
|
assert_show_tables(
|
|
"postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n"
|
|
)
|
|
insert_into_tables()
|
|
check_all_tables_are_synchronized()
|
|
|
|
altered_table = random.randint(0, NUM_TABLES - 1)
|
|
pg_manager.execute(
|
|
"ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(
|
|
altered_table
|
|
)
|
|
)
|
|
|
|
instance.query(
|
|
f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)"
|
|
)
|
|
|
|
assert instance.wait_for_log_line(
|
|
f"Table postgresql_replica_{altered_table} is skipped from replication stream"
|
|
)
|
|
instance.query(
|
|
f"DETACH TABLE test_database.postgresql_replica_{altered_table} PERMANENTLY"
|
|
)
|
|
assert not instance.contains_in_log(
|
|
"from publication, because table does not exist in PostgreSQL"
|
|
)
|
|
instance.query(f"ATTACH TABLE test_database.postgresql_replica_{altered_table}")
|
|
|
|
check_tables_are_synchronized(
|
|
instance,
|
|
f"postgresql_replica_{altered_table}",
|
|
postgres_database=clickhouse_postgres_db,
|
|
)
|
|
|
|
|
|
def test_database_with_multiple_non_default_schemas_1(started_cluster):
|
|
cursor = pg_manager.get_db_cursor()
|
|
|
|
NUM_TABLES = 5
|
|
schema_name = "test_schema"
|
|
clickhouse_postgres_db = "postgres_database_with_schema"
|
|
materialized_db = "test_database"
|
|
publication_tables = ""
|
|
global insert_counter
|
|
insert_counter = 0
|
|
|
|
def insert_into_tables():
|
|
global insert_counter
|
|
clickhouse_postgres_db = "postgres_database_with_schema"
|
|
for i in range(NUM_TABLES):
|
|
table_name = f"postgresql_replica_{i}"
|
|
instance.query(
|
|
f"INSERT INTO {clickhouse_postgres_db}.{table_name} SELECT number, number from numbers(1000 * {insert_counter}, 1000)"
|
|
)
|
|
insert_counter += 1
|
|
|
|
def assert_show_tables(expected):
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
assert result == expected
|
|
print("assert show tables Ok")
|
|
|
|
def check_all_tables_are_synchronized():
|
|
for i in range(NUM_TABLES):
|
|
print("checking table", i)
|
|
check_tables_are_synchronized(
|
|
instance,
|
|
"postgresql_replica_{}".format(i),
|
|
schema_name=schema_name,
|
|
postgres_database=clickhouse_postgres_db,
|
|
)
|
|
print("synchronization Ok")
|
|
|
|
create_postgres_schema(cursor, schema_name)
|
|
pg_manager.create_clickhouse_postgres_db(
|
|
database_name=clickhouse_postgres_db,
|
|
schema_name=schema_name,
|
|
postgres_database="postgres_database",
|
|
)
|
|
|
|
for i in range(NUM_TABLES):
|
|
table_name = "postgresql_replica_{}".format(i)
|
|
create_postgres_table_with_schema(cursor, schema_name, table_name)
|
|
if publication_tables != "":
|
|
publication_tables += ", "
|
|
publication_tables += schema_name + "." + table_name
|
|
|
|
insert_into_tables()
|
|
pg_manager.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[
|
|
f"materialized_postgresql_tables_list = '{publication_tables}'",
|
|
"materialized_postgresql_tables_list_with_schema=1",
|
|
],
|
|
)
|
|
|
|
check_all_tables_are_synchronized()
|
|
assert_show_tables(
|
|
"test_schema.postgresql_replica_0\ntest_schema.postgresql_replica_1\ntest_schema.postgresql_replica_2\ntest_schema.postgresql_replica_3\ntest_schema.postgresql_replica_4\n"
|
|
)
|
|
|
|
instance.restart_clickhouse()
|
|
check_all_tables_are_synchronized()
|
|
assert_show_tables(
|
|
"test_schema.postgresql_replica_0\ntest_schema.postgresql_replica_1\ntest_schema.postgresql_replica_2\ntest_schema.postgresql_replica_3\ntest_schema.postgresql_replica_4\n"
|
|
)
|
|
|
|
insert_into_tables()
|
|
check_all_tables_are_synchronized()
|
|
|
|
altered_table = random.randint(0, NUM_TABLES - 1)
|
|
pg_manager.execute(
|
|
"ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(
|
|
altered_table
|
|
)
|
|
)
|
|
|
|
instance.query(
|
|
f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)"
|
|
)
|
|
|
|
assert instance.wait_for_log_line(
|
|
f"Table test_schema.postgresql_replica_{altered_table} is skipped from replication stream"
|
|
)
|
|
altered_materialized_table = (
|
|
f"{materialized_db}.`test_schema.postgresql_replica_{altered_table}`"
|
|
)
|
|
instance.query(f"DETACH TABLE {altered_materialized_table} PERMANENTLY")
|
|
assert not instance.contains_in_log(
|
|
"from publication, because table does not exist in PostgreSQL"
|
|
)
|
|
instance.query(f"ATTACH TABLE {altered_materialized_table}")
|
|
|
|
assert_show_tables(
|
|
"test_schema.postgresql_replica_0\ntest_schema.postgresql_replica_1\ntest_schema.postgresql_replica_2\ntest_schema.postgresql_replica_3\ntest_schema.postgresql_replica_4\n"
|
|
)
|
|
check_tables_are_synchronized(
|
|
instance,
|
|
f"postgresql_replica_{altered_table}",
|
|
schema_name=schema_name,
|
|
postgres_database=clickhouse_postgres_db,
|
|
)
|
|
|
|
|
|
def test_database_with_multiple_non_default_schemas_2(started_cluster):
|
|
cursor = pg_manager.get_db_cursor()
|
|
NUM_TABLES = 2
|
|
schemas_num = 2
|
|
schema_list = "schema0, schema1"
|
|
materialized_db = "test_database"
|
|
global insert_counter
|
|
insert_counter = 0
|
|
|
|
def check_all_tables_are_synchronized():
|
|
for i in range(schemas_num):
|
|
schema_name = f"schema{i}"
|
|
clickhouse_postgres_db = f"clickhouse_postgres_db{i}"
|
|
for ti in range(NUM_TABLES):
|
|
table_name = f"postgresql_replica_{ti}"
|
|
print(f"checking table {schema_name}.{table_name}")
|
|
check_tables_are_synchronized(
|
|
instance,
|
|
f"{table_name}",
|
|
schema_name=schema_name,
|
|
postgres_database=clickhouse_postgres_db,
|
|
)
|
|
print("synchronized Ok")
|
|
|
|
def insert_into_tables():
|
|
global insert_counter
|
|
for i in range(schemas_num):
|
|
clickhouse_postgres_db = f"clickhouse_postgres_db{i}"
|
|
for ti in range(NUM_TABLES):
|
|
table_name = f"postgresql_replica_{ti}"
|
|
instance.query(
|
|
f"INSERT INTO {clickhouse_postgres_db}.{table_name} SELECT number, number from numbers(1000 * {insert_counter}, 1000)"
|
|
)
|
|
insert_counter += 1
|
|
|
|
def assert_show_tables(expected):
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
assert result == expected
|
|
print("assert show tables Ok")
|
|
|
|
for i in range(schemas_num):
|
|
schema_name = f"schema{i}"
|
|
clickhouse_postgres_db = f"clickhouse_postgres_db{i}"
|
|
create_postgres_schema(cursor, schema_name)
|
|
pg_manager.create_clickhouse_postgres_db(
|
|
database_name=clickhouse_postgres_db,
|
|
schema_name=schema_name,
|
|
postgres_database="postgres_database",
|
|
)
|
|
for ti in range(NUM_TABLES):
|
|
table_name = f"postgresql_replica_{ti}"
|
|
create_postgres_table_with_schema(cursor, schema_name, table_name)
|
|
|
|
insert_into_tables()
|
|
pg_manager.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[
|
|
f"materialized_postgresql_schema_list = '{schema_list}'",
|
|
],
|
|
)
|
|
|
|
check_all_tables_are_synchronized()
|
|
insert_into_tables()
|
|
assert_show_tables(
|
|
"schema0.postgresql_replica_0\nschema0.postgresql_replica_1\nschema1.postgresql_replica_0\nschema1.postgresql_replica_1\n"
|
|
)
|
|
|
|
instance.restart_clickhouse()
|
|
assert_show_tables(
|
|
"schema0.postgresql_replica_0\nschema0.postgresql_replica_1\nschema1.postgresql_replica_0\nschema1.postgresql_replica_1\n"
|
|
)
|
|
check_all_tables_are_synchronized()
|
|
insert_into_tables()
|
|
check_all_tables_are_synchronized()
|
|
|
|
print("ALTER")
|
|
altered_schema = random.randint(0, schemas_num - 1)
|
|
altered_table = random.randint(0, NUM_TABLES - 1)
|
|
clickhouse_postgres_db = f"clickhouse_postgres_db{altered_schema}"
|
|
pg_manager.execute(
|
|
f"ALTER TABLE schema{altered_schema}.postgresql_replica_{altered_table} ADD COLUMN value2 integer"
|
|
)
|
|
|
|
instance.query(
|
|
f"INSERT INTO clickhouse_postgres_db{altered_schema}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(1000 * {insert_counter}, 1000)"
|
|
)
|
|
|
|
assert instance.wait_for_log_line(
|
|
f"Table schema{altered_schema}.postgresql_replica_{altered_table} is skipped from replication stream"
|
|
)
|
|
|
|
altered_materialized_table = (
|
|
f"{materialized_db}.`schema{altered_schema}.postgresql_replica_{altered_table}`"
|
|
)
|
|
instance.query(f"DETACH TABLE {altered_materialized_table} PERMANENTLY")
|
|
assert not instance.contains_in_log(
|
|
"from publication, because table does not exist in PostgreSQL"
|
|
)
|
|
instance.query(f"ATTACH TABLE {altered_materialized_table}")
|
|
|
|
assert_show_tables(
|
|
"schema0.postgresql_replica_0\nschema0.postgresql_replica_1\nschema1.postgresql_replica_0\nschema1.postgresql_replica_1\n"
|
|
)
|
|
check_tables_are_synchronized(
|
|
instance,
|
|
f"postgresql_replica_{altered_table}",
|
|
schema_name=f"schema{altered_schema}",
|
|
postgres_database=clickhouse_postgres_db,
|
|
)
|
|
|
|
|
|
def test_table_override(started_cluster):
|
|
table_name = "table_override"
|
|
materialized_database = "test_database"
|
|
|
|
pg_manager.create_postgres_table(table_name, template=postgres_table_template_6)
|
|
instance.query(
|
|
f"insert into postgres_database.{table_name} select number, 'test' from numbers(10)"
|
|
)
|
|
|
|
table_overrides = f" TABLE OVERRIDE {table_name} (COLUMNS (key Int32, value String) PARTITION BY key)"
|
|
pg_manager.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[f"materialized_postgresql_tables_list = '{table_name}'"],
|
|
materialized_database=materialized_database,
|
|
table_overrides=table_overrides,
|
|
)
|
|
|
|
check_tables_are_synchronized(
|
|
instance, table_name, postgres_database=pg_manager.get_default_database()
|
|
)
|
|
|
|
assert 10 == int(
|
|
instance.query(f"SELECT count() FROM {materialized_database}.{table_name}")
|
|
)
|
|
|
|
expected = "CREATE TABLE test_database.table_override\\n(\\n `key` Int32,\\n `value` String,\\n `_sign` Int8 MATERIALIZED 1,\\n `_version` UInt64 MATERIALIZED 1\\n)\\nENGINE = ReplacingMergeTree(_version)\\nPARTITION BY key\\nORDER BY tuple(key)"
|
|
assert (
|
|
expected
|
|
== instance.query(
|
|
f"show create table {materialized_database}.{table_name}"
|
|
).strip()
|
|
)
|
|
|
|
assert (
|
|
"test"
|
|
== instance.query(
|
|
f"SELECT value FROM {materialized_database}.{table_name} WHERE key = 2"
|
|
).strip()
|
|
)
|
|
|
|
conn = get_postgres_conn(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
database_name="postgres_database",
|
|
database=True,
|
|
auto_commit=True,
|
|
)
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"SELECT count(*) FROM {table_name}")
|
|
assert 10 == cursor.fetchall()[0][0]
|
|
|
|
pg_manager.execute(f"UPDATE {table_name} SET value='kek' WHERE key=2")
|
|
|
|
cursor.execute(f"SELECT value FROM {table_name} WHERE key=2")
|
|
assert "kek" == cursor.fetchall()[0][0]
|
|
|
|
pg_manager.execute(f"DELETE FROM {table_name} WHERE key=2")
|
|
|
|
cursor.execute(f"SELECT count(*) FROM {table_name}")
|
|
assert 9 == cursor.fetchall()[0][0]
|
|
|
|
conn.close()
|
|
|
|
check_tables_are_synchronized(
|
|
instance, table_name, postgres_database=pg_manager.get_default_database()
|
|
)
|
|
|
|
assert (
|
|
""
|
|
== instance.query(
|
|
f"SELECT value FROM {materialized_database}.{table_name} WHERE key = 2"
|
|
).strip()
|
|
)
|
|
|
|
|
|
def test_materialized_view(started_cluster):
|
|
pg_manager.execute(f"DROP TABLE IF EXISTS test_table")
|
|
pg_manager.execute(
|
|
f"CREATE TABLE test_table (key integer PRIMARY KEY, value integer)"
|
|
)
|
|
pg_manager.execute(f"INSERT INTO test_table SELECT 1, 2")
|
|
instance.query("DROP DATABASE IF EXISTS test_database")
|
|
instance.query(
|
|
"CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1) SETTINGS materialized_postgresql_tables_list='test_table'"
|
|
)
|
|
check_tables_are_synchronized(instance, "test_table")
|
|
instance.query("DROP TABLE IF EXISTS mv")
|
|
instance.query(
|
|
"CREATE MATERIALIZED VIEW mv ENGINE=MergeTree ORDER BY tuple() POPULATE AS SELECT * FROM test_database.test_table"
|
|
)
|
|
assert "1\t2" == instance.query("SELECT * FROM mv").strip()
|
|
pg_manager.execute(f"INSERT INTO test_table SELECT 3, 4")
|
|
check_tables_are_synchronized(instance, "test_table")
|
|
assert "1\t2\n3\t4" == instance.query("SELECT * FROM mv ORDER BY 1, 2").strip()
|
|
instance.query("DROP VIEW mv")
|
|
pg_manager.drop_materialized_db()
|
|
|
|
|
|
def test_too_many_parts(started_cluster):
|
|
table = "test_table"
|
|
pg_manager2.create_and_fill_postgres_table(table)
|
|
pg_manager2.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[
|
|
f"materialized_postgresql_tables_list = 'test_table', materialized_postgresql_backoff_min_ms = 100, materialized_postgresql_backoff_max_ms = 100"
|
|
],
|
|
)
|
|
check_tables_are_synchronized(
|
|
instance2, "test_table", postgres_database=pg_manager2.get_default_database()
|
|
)
|
|
assert (
|
|
"50" == instance2.query("SELECT count() FROM test_database.test_table").strip()
|
|
)
|
|
|
|
instance2.query("SYSTEM STOP MERGES")
|
|
num = 50
|
|
for i in range(10):
|
|
instance2.query(
|
|
f"""
|
|
INSERT INTO {pg_manager2.get_default_database()}.test_table SELECT {num}, {num};
|
|
"""
|
|
)
|
|
num = num + 1
|
|
for i in range(30):
|
|
if num == int(
|
|
instance2.query("SELECT count() FROM test_database.test_table")
|
|
) or instance2.contains_in_log("DB::Exception: Too many parts"):
|
|
break
|
|
time.sleep(1)
|
|
print(f"wait sync try {i}")
|
|
instance2.query("SYSTEM FLUSH LOGS")
|
|
if instance2.contains_in_log("DB::Exception: Too many parts"):
|
|
break
|
|
assert num == int(
|
|
instance2.query("SELECT count() FROM test_database.test_table")
|
|
) or num - 1 == int(
|
|
instance2.query("SELECT count() FROM test_database.test_table")
|
|
)
|
|
|
|
assert instance2.contains_in_log("DB::Exception: Too many parts")
|
|
print(num)
|
|
assert num == int(
|
|
instance2.query("SELECT count() FROM test_database.test_table")
|
|
) or num - 1 == int(instance2.query("SELECT count() FROM test_database.test_table"))
|
|
|
|
instance2.query("SYSTEM START MERGES")
|
|
check_tables_are_synchronized(
|
|
instance2, "test_table", postgres_database=pg_manager2.get_default_database()
|
|
)
|
|
|
|
# assert "200" == instance.query("SELECT count FROM test_database.test_table").strip()
|
|
pg_manager2.drop_materialized_db()
|
|
|
|
|
|
def test_toast(started_cluster):
|
|
table = "test_toast"
|
|
pg_manager.create_postgres_table(
|
|
table,
|
|
"",
|
|
"""CREATE TABLE "{}" (id integer PRIMARY KEY, txt text, other text)""",
|
|
)
|
|
pg_manager.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[
|
|
f"materialized_postgresql_tables_list = '{table}'",
|
|
"materialized_postgresql_backoff_min_ms = 100",
|
|
"materialized_postgresql_backoff_max_ms = 100",
|
|
],
|
|
)
|
|
|
|
pg_manager.execute(
|
|
f"""\
|
|
INSERT INTO {table} (id, txt)\
|
|
VALUES (1, (SELECT array_to_string(ARRAY(SELECT chr((100 + round(random() * 25)) :: integer) FROM generate_series(1,30000) as t(i)), '')))
|
|
"""
|
|
)
|
|
|
|
check_tables_are_synchronized(
|
|
instance,
|
|
table,
|
|
postgres_database=pg_manager.get_default_database(),
|
|
order_by="id",
|
|
)
|
|
|
|
|
|
def test_replica_consumer(started_cluster):
|
|
table = "test_replica_consumer"
|
|
pg_manager_instance2.restart()
|
|
|
|
pg_manager.create_postgres_table(table)
|
|
instance.query(
|
|
f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(0, 50)"
|
|
)
|
|
|
|
for pm in [pg_manager, pg_manager_instance2]:
|
|
pm.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[
|
|
f"materialized_postgresql_tables_list = '{table}'",
|
|
"materialized_postgresql_backoff_min_ms = 100",
|
|
"materialized_postgresql_backoff_max_ms = 100",
|
|
"materialized_postgresql_use_unique_replication_consumer_identifier = 1",
|
|
],
|
|
)
|
|
|
|
check_tables_are_synchronized(
|
|
instance, table, postgres_database=pg_manager.get_default_database()
|
|
)
|
|
check_tables_are_synchronized(
|
|
instance2, table, postgres_database=pg_manager_instance2.get_default_database()
|
|
)
|
|
|
|
assert 50 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
|
|
assert 50 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))
|
|
|
|
instance.query(
|
|
f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(1000, 1000)"
|
|
)
|
|
|
|
check_tables_are_synchronized(
|
|
instance, table, postgres_database=pg_manager.get_default_database()
|
|
)
|
|
check_tables_are_synchronized(
|
|
instance2, table, postgres_database=pg_manager_instance2.get_default_database()
|
|
)
|
|
|
|
assert 1050 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
|
|
assert 1050 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))
|
|
|
|
for pm in [pg_manager, pg_manager_instance2]:
|
|
pm.drop_materialized_db()
|
|
pg_manager_instance2.clear()
|
|
|
|
|
|
def test_bad_connection_options(started_cluster):
|
|
table = "test_bad_connection_options"
|
|
|
|
pg_manager.create_postgres_table(table)
|
|
instance.query(
|
|
f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(0, 50)"
|
|
)
|
|
|
|
pg_manager.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[
|
|
f"materialized_postgresql_tables_list = '{table}'",
|
|
"materialized_postgresql_backoff_min_ms = 100",
|
|
"materialized_postgresql_backoff_max_ms = 100",
|
|
],
|
|
user="postrges",
|
|
password="kek",
|
|
)
|
|
|
|
instance.wait_for_log_line('role "postrges" does not exist')
|
|
assert instance.contains_in_log(
|
|
"<Error> void DB::DatabaseMaterializedPostgreSQL::startSynchronization(): std::exception. Code: 1001, type: pqxx::broken_connection"
|
|
)
|
|
assert "test_database" in instance.query("SHOW DATABASES")
|
|
assert "" == instance.query("show tables from test_database").strip()
|
|
pg_manager.drop_materialized_db("test_database")
|
|
|
|
|
|
def test_failed_load_from_snapshot(started_cluster):
|
|
if instance.is_built_with_sanitizer() or instance.is_debug_build():
|
|
pytest.skip(
|
|
"Sanitizers and debug mode are skipped, because this test thrown logical error"
|
|
)
|
|
|
|
table = "failed_load"
|
|
|
|
pg_manager.create_postgres_table(
|
|
table,
|
|
template="""
|
|
CREATE TABLE IF NOT EXISTS "{}" (
|
|
key text NOT NULL, value text[], PRIMARY KEY(key))
|
|
""",
|
|
)
|
|
instance.query(
|
|
f"INSERT INTO postgres_database.{table} SELECT number, [1, 2] from numbers(0, 1000000)"
|
|
)
|
|
|
|
# Create a table with wrong table structure
|
|
assert "Could not convert string to i" in instance.query_and_get_error(
|
|
f"""
|
|
SET allow_experimental_materialized_postgresql_table=1;
|
|
CREATE TABLE {table} (a Int32, b Int32) ENGINE=MaterializedPostgreSQL('{started_cluster.postgres_ip}:{started_cluster.postgres_port}', 'postgres_database', '{table}', 'postgres', 'mysecretpassword') ORDER BY a
|
|
"""
|
|
)
|
|
|
|
|
|
def test_symbols_in_publication_name(started_cluster):
|
|
table = "test_symbols_in_publication_name"
|
|
|
|
pg_manager3.create_postgres_table(table)
|
|
instance.query(
|
|
f"INSERT INTO `{pg_manager3.get_default_database()}`.`{table}` SELECT number, number from numbers(0, 50)"
|
|
)
|
|
|
|
pg_manager3.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[
|
|
f"materialized_postgresql_tables_list = '{table}'",
|
|
"materialized_postgresql_backoff_min_ms = 100",
|
|
"materialized_postgresql_backoff_max_ms = 100",
|
|
],
|
|
)
|
|
check_tables_are_synchronized(
|
|
instance, table, postgres_database=pg_manager3.get_default_database()
|
|
)
|
|
|
|
|
|
def test_generated_columns(started_cluster):
|
|
table = "test_generated_columns"
|
|
|
|
pg_manager.create_postgres_table(
|
|
table,
|
|
"",
|
|
f"""CREATE TABLE {table} (
|
|
key integer PRIMARY KEY,
|
|
x integer DEFAULT 0,
|
|
temp integer DEFAULT 0,
|
|
y integer GENERATED ALWAYS AS (x*2) STORED,
|
|
z text DEFAULT 'z');
|
|
""",
|
|
)
|
|
|
|
pg_manager.execute(f"alter table {table} drop column temp;")
|
|
pg_manager.execute(f"insert into {table} (key, x, z) values (1,1,'1');")
|
|
pg_manager.execute(f"insert into {table} (key, x, z) values (2,2,'2');")
|
|
|
|
pg_manager.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[
|
|
f"materialized_postgresql_tables_list = '{table}'",
|
|
"materialized_postgresql_backoff_min_ms = 100",
|
|
"materialized_postgresql_backoff_max_ms = 100",
|
|
],
|
|
)
|
|
|
|
check_tables_are_synchronized(
|
|
instance, table, postgres_database=pg_manager.get_default_database()
|
|
)
|
|
|
|
pg_manager.execute(f"insert into {table} (key, x, z) values (3,3,'3');")
|
|
pg_manager.execute(f"insert into {table} (key, x, z) values (4,4,'4');")
|
|
|
|
check_tables_are_synchronized(
|
|
instance, table, postgres_database=pg_manager.get_default_database()
|
|
)
|
|
|
|
pg_manager.execute(f"insert into {table} (key, x, z) values (5,5,'5');")
|
|
pg_manager.execute(f"insert into {table} (key, x, z) values (6,6,'6');")
|
|
|
|
check_tables_are_synchronized(
|
|
instance, table, postgres_database=pg_manager.get_default_database()
|
|
)
|
|
|
|
|
|
def test_generated_columns_with_sequence(started_cluster):
|
|
table = "test_generated_columns_with_sequence"
|
|
|
|
pg_manager.create_postgres_table(
|
|
table,
|
|
"",
|
|
f"""CREATE TABLE {table} (
|
|
key integer PRIMARY KEY,
|
|
x integer,
|
|
y integer GENERATED ALWAYS AS (x*2) STORED,
|
|
z text);
|
|
""",
|
|
)
|
|
|
|
pg_manager.execute(
|
|
f"create sequence {table}_id_seq increment by 1 minvalue 1 start 1;"
|
|
)
|
|
pg_manager.execute(
|
|
f"alter table {table} alter key set default nextval('{table}_id_seq');"
|
|
)
|
|
pg_manager.execute(f"insert into {table} (key, x, z) values (1,1,'1');")
|
|
pg_manager.execute(f"insert into {table} (key, x, z) values (2,2,'2');")
|
|
|
|
pg_manager.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[
|
|
f"materialized_postgresql_tables_list = '{table}'",
|
|
"materialized_postgresql_backoff_min_ms = 100",
|
|
"materialized_postgresql_backoff_max_ms = 100",
|
|
],
|
|
)
|
|
|
|
check_tables_are_synchronized(
|
|
instance, table, postgres_database=pg_manager.get_default_database()
|
|
)
|
|
|
|
|
|
def test_default_columns(started_cluster):
|
|
table = "test_default_columns"
|
|
|
|
pg_manager.create_postgres_table(
|
|
table,
|
|
"",
|
|
f"""CREATE TABLE {table} (
|
|
key integer PRIMARY KEY,
|
|
x integer,
|
|
y text DEFAULT 'y1',
|
|
z integer,
|
|
a text DEFAULT 'a1',
|
|
b integer);
|
|
""",
|
|
)
|
|
|
|
pg_manager.execute(f"insert into {table} (key, x, z, b) values (1,1,1,1);")
|
|
pg_manager.execute(f"insert into {table} (key, x, z, b) values (2,2,2,2);")
|
|
|
|
pg_manager.create_materialized_db(
|
|
ip=started_cluster.postgres_ip,
|
|
port=started_cluster.postgres_port,
|
|
settings=[
|
|
f"materialized_postgresql_tables_list = '{table}'",
|
|
"materialized_postgresql_backoff_min_ms = 100",
|
|
"materialized_postgresql_backoff_max_ms = 100",
|
|
],
|
|
)
|
|
|
|
check_tables_are_synchronized(
|
|
instance, table, postgres_database=pg_manager.get_default_database()
|
|
)
|
|
|
|
pg_manager.execute(f"insert into {table} (key, x, z, b) values (3,3,3,3);")
|
|
pg_manager.execute(f"insert into {table} (key, x, z, b) values (4,4,4,4);")
|
|
|
|
check_tables_are_synchronized(
|
|
instance, table, postgres_database=pg_manager.get_default_database()
|
|
)
|
|
|
|
pg_manager.execute(f"insert into {table} (key, x, z, b) values (5,5,5,5);")
|
|
pg_manager.execute(f"insert into {table} (key, x, z, b) values (6,6,6,6);")
|
|
|
|
check_tables_are_synchronized(
|
|
instance, table, postgres_database=pg_manager.get_default_database()
|
|
)
|
|
|
|
|
|
def test_dependent_loading(started_cluster):
|
|
table = "test_dependent_loading"
|
|
|
|
pg_manager.create_postgres_table(table)
|
|
instance.query(
|
|
f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(0, 50)"
|
|
)
|
|
|
|
instance.query(
|
|
f"""
|
|
SET allow_experimental_materialized_postgresql_table=1;
|
|
CREATE TABLE {table} (key Int32, value Int32)
|
|
ENGINE=MaterializedPostgreSQL('{started_cluster.postgres_ip}:{started_cluster.postgres_port}', 'postgres_database', '{table}', 'postgres', 'mysecretpassword') ORDER BY key
|
|
"""
|
|
)
|
|
|
|
check_tables_are_synchronized(
|
|
instance,
|
|
table,
|
|
postgres_database=pg_manager.get_default_database(),
|
|
materialized_database="default",
|
|
)
|
|
|
|
assert 50 == int(instance.query(f"SELECT count() FROM {table}"))
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
check_tables_are_synchronized(
|
|
instance,
|
|
table,
|
|
postgres_database=pg_manager.get_default_database(),
|
|
materialized_database="default",
|
|
)
|
|
|
|
assert 50 == int(instance.query(f"SELECT count() FROM {table}"))
|
|
|
|
uuid = instance.query(
|
|
f"SELECT uuid FROM system.tables WHERE name='{table}' and database='default' limit 1"
|
|
).strip()
|
|
nested_table = f"default.`{uuid}_nested`"
|
|
instance.contains_in_log(
|
|
f"Table default.{table} has 1 dependencies: {nested_table} (level 1)"
|
|
)
|
|
|
|
instance.query("SYSTEM FLUSH LOGS")
|
|
nested_time = instance.query(
|
|
f"SELECT event_time_microseconds FROM system.text_log WHERE message like 'Loading table default.{uuid}_nested' and message not like '%like%'"
|
|
).strip()
|
|
time = (
|
|
instance.query(
|
|
f"SELECT event_time_microseconds FROM system.text_log WHERE message like 'Loading table default.{table}' and message not like '%like%'"
|
|
)
|
|
.strip()
|
|
.split("\n")[-1]
|
|
)
|
|
instance.query(
|
|
f"SELECT toDateTime64('{nested_time}', 6) < toDateTime64('{time}', 6)"
|
|
)
|
|
|
|
instance.query(f"DROP TABLE {table} SYNC")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cluster.start()
|
|
input("Cluster created, press any key to destroy...")
|
|
cluster.shutdown()
|