ClickHouse/tests/integration/test_distributed_ddl/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

613 lines
22 KiB
Python
Raw Normal View History

import os
import sys
import time
from contextlib import contextmanager
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers.test_tools import TSV
2024-09-27 10:19:39 +00:00
from .cluster import ClickHouseClusterWithDDLHelpers
@pytest.fixture(scope="module", params=["configs", "configs_secure"])
def test_cluster(request):
2021-02-12 19:23:50 +00:00
cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param, request.param)
try:
cluster.prepare()
yield cluster
instance = cluster.instances["ch1"]
2024-09-27 16:47:00 +00:00
cluster.ddl_check_query(
instance, "DROP DATABASE IF EXISTS test ON CLUSTER 'cluster'"
)
cluster.ddl_check_query(
instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'"
)
# Check query log to ensure that DDL queries are not executed twice
time.sleep(1.5)
2020-10-02 16:54:07 +00:00
for instance in list(cluster.instances.values()):
cluster.ddl_check_there_are_no_dublicates(instance)
cluster.pm_random_drops.heal_all()
finally:
cluster.shutdown()
def test_default_database(test_cluster):
instance = test_cluster.instances["ch3"]
test_cluster.ddl_check_query(
instance, "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster' FORMAT TSV"
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS null ON CLUSTER 'cluster' FORMAT TSV"
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS null ON CLUSTER 'cluster2' (s String DEFAULT 'escape\t\nme') ENGINE = Null",
2022-07-21 18:32:33 +00:00
settings={"distributed_ddl_entry_format_version": 2},
)
contents = instance.query(
"SELECT hostName() AS h, database FROM all_tables WHERE name = 'null' ORDER BY h"
)
assert TSV(contents) == TSV("ch1\tdefault\nch2\ttest2\nch3\tdefault\nch4\ttest2\n")
test_cluster.ddl_check_query(
2022-07-21 18:32:33 +00:00
instance,
"DROP TABLE IF EXISTS null ON CLUSTER cluster2",
settings={"distributed_ddl_entry_format_version": 2},
)
test_cluster.ddl_check_query(
instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'"
)
def test_create_view(test_cluster):
instance = test_cluster.instances["ch3"]
2021-06-16 12:31:19 +00:00
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test.super_simple_view ON CLUSTER 'cluster'"
)
test_cluster.ddl_check_query(
instance,
"CREATE VIEW IF NOT EXISTS test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV",
)
test_cluster.ddl_check_query(
instance,
"CREATE MATERIALIZED VIEW test.simple_mat_view ON CLUSTER 'cluster' ENGINE = Memory AS SELECT * FROM system.numbers FORMAT TSV",
)
test_cluster.ddl_check_query(
2024-09-27 16:47:00 +00:00
instance,
"DROP TABLE IF EXISTS test.simple_mat_view ON CLUSTER 'cluster' FORMAT TSV",
)
test_cluster.ddl_check_query(
instance,
"DROP TABLE IF EXISTS test.super_simple_view2 ON CLUSTER 'cluster' FORMAT TSV",
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test.super_simple ON CLUSTER 'cluster' (i Int8) ENGINE = Memory",
)
test_cluster.ddl_check_query(
instance,
"RENAME TABLE test.super_simple TO test.super_simple2 ON CLUSTER 'cluster' FORMAT TSV",
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test.super_simple2 ON CLUSTER 'cluster'"
)
def test_on_server_fail(test_cluster):
instance = test_cluster.instances["ch1"]
kill_instance = test_cluster.instances["ch2"]
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'"
)
kill_instance.get_docker_handle().stop()
request = instance.get_query_request(
"CREATE TABLE test.test_server_fail ON CLUSTER 'cluster' (i Int8) ENGINE=Null",
2021-06-16 12:31:19 +00:00
timeout=180,
)
kill_instance.get_docker_handle().start()
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'"
)
# Check query itself
test_cluster.check_all_hosts_successfully_executed(request.get_answer())
# And check query artefacts
contents = instance.query(
"SELECT hostName() AS h FROM all_tables WHERE database='test' AND name='test_server_fail' ORDER BY h"
)
assert TSV(contents) == TSV("ch1\nch2\nch3\nch4\n")
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'"
)
def test_simple_alters(test_cluster):
instance = test_cluster.instances["ch2"]
2020-03-12 06:43:01 +00:00
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS merge ON CLUSTER '{cluster}'"
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER '{cluster}'"
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER '{cluster}'"
)
test_cluster.ddl_check_query(
instance,
"""
2020-03-12 06:43:01 +00:00
CREATE TABLE IF NOT EXISTS merge ON CLUSTER '{cluster}' (p Date, i Int32)
ENGINE = MergeTree(p, p, 1)
""",
)
test_cluster.ddl_check_query(
instance,
"""
2020-03-12 06:43:01 +00:00
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER '{cluster}' (p Date, i Int32)
ENGINE = Distributed('{cluster}', default, merge, i)
""",
)
test_cluster.ddl_check_query(
instance,
"""
2020-03-12 06:43:01 +00:00
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER '{cluster}' (p Date, i Int64, s String)
ENGINE = Distributed('{cluster}', default, merge, i)
""",
)
2020-10-02 16:54:07 +00:00
for i in range(0, 4, 2):
k = (i / 2) * 2
test_cluster.instances["ch{}".format(i + 1)].query(
"INSERT INTO merge (i) VALUES ({})({})".format(k, k + 1)
)
assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(
2020-10-02 16:54:07 +00:00
"".join(["{}\n".format(x) for x in range(4)])
)
time.sleep(5)
2020-03-12 06:43:01 +00:00
test_cluster.ddl_check_query(
instance, "ALTER TABLE merge ON CLUSTER '{cluster}' MODIFY COLUMN i Int64"
)
time.sleep(5)
test_cluster.ddl_check_query(
instance,
"ALTER TABLE merge ON CLUSTER '{cluster}' ADD COLUMN s String DEFAULT toString(i) FORMAT TSV",
)
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(
2020-10-02 16:54:07 +00:00
"".join(["{}\t{}\n".format(x, x) for x in range(4)])
)
2020-10-02 16:54:07 +00:00
for i in range(0, 4, 2):
k = (i / 2) * 2 + 4
test_cluster.instances["ch{}".format(i + 1)].query(
"INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k + 1)
)
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(
2020-10-02 16:54:07 +00:00
"".join(["{}\t{}\n".format(x, x) for x in range(8)])
)
2020-03-12 06:43:01 +00:00
test_cluster.ddl_check_query(
instance, "ALTER TABLE merge ON CLUSTER '{cluster}' DETACH PARTITION 197002"
)
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(
2020-10-02 16:54:07 +00:00
"".join(["{}\t{}\n".format(x, x) for x in range(4)])
)
2020-03-12 06:43:01 +00:00
test_cluster.ddl_check_query(
2024-09-27 16:47:00 +00:00
instance, "DROP TABLE IF EXISTS merge ON CLUSTER '{cluster}'"
)
2020-03-12 06:43:01 +00:00
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER '{cluster}'"
2020-03-12 06:43:01 +00:00
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER '{cluster}'"
2020-03-12 06:43:01 +00:00
)
def test_macro(test_cluster):
instance = test_cluster.instances["ch2"]
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS tab ON CLUSTER '{cluster}' (value UInt8) ENGINE = Memory",
)
2020-10-02 16:54:07 +00:00
for i in range(4):
test_cluster.insert_reliable(
test_cluster.instances["ch{}".format(i + 1)],
"INSERT INTO tab VALUES ({})".format(i),
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS distr ON CLUSTER '{cluster}' (value UInt8) ENGINE = Distributed('{cluster}', 'default', 'tab', value % 4)",
)
assert TSV(instance.query("SELECT value FROM distr ORDER BY value")) == TSV(
"0\n1\n2\n3\n"
)
assert TSV(
test_cluster.instances["ch3"].query("SELECT value FROM distr ORDER BY value")
) == TSV("0\n1\n2\n3\n")
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS distr ON CLUSTER '{cluster}'"
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS tab ON CLUSTER '{cluster}'"
)
def test_implicit_macros(test_cluster):
2018-09-28 19:07:29 +00:00
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
firewall_drops_rules = test_cluster.pm_random_drops.pop_rules()
instance = test_cluster.instances["ch2"]
2018-09-28 19:07:29 +00:00
2021-06-21 15:42:40 +00:00
test_cluster.ddl_check_query(
instance, "DROP DATABASE IF EXISTS test_db ON CLUSTER '{cluster}' SYNC"
)
test_cluster.ddl_check_query(
instance, "CREATE DATABASE IF NOT EXISTS test_db ON CLUSTER '{cluster}'"
)
2018-10-01 09:01:50 +00:00
test_cluster.ddl_check_query(
instance,
"""
2018-09-28 19:07:29 +00:00
CREATE TABLE IF NOT EXISTS test_db.test_macro ON CLUSTER '{cluster}' (p Date, i Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/{layer}-{shard}/{table}', '{replica}', p, p, 1)
""",
)
# Check that table was created at correct path in zookeeper
assert (
test_cluster.get_kazoo_client("zoo1").exists(
"/clickhouse/tables/test_db/0-1/test_macro"
)
is not None
)
2018-09-28 19:07:29 +00:00
# Enable random ZK packet drops
test_cluster.pm_random_drops.push_rules(firewall_drops_rules)
2018-09-28 19:07:29 +00:00
def test_allowed_databases(test_cluster):
instance = test_cluster.instances["ch2"]
instance.query("CREATE DATABASE IF NOT EXISTS db1 ON CLUSTER cluster")
instance.query("CREATE DATABASE IF NOT EXISTS db2 ON CLUSTER cluster")
instance.query(
2024-04-17 00:31:15 +00:00
"CREATE TABLE db1.t1 ON CLUSTER cluster (i Int8) ENGINE = Memory",
settings={"user": "restricted_user"},
)
with pytest.raises(Exception):
instance.query(
2024-04-17 00:31:15 +00:00
"CREATE TABLE db2.t2 ON CLUSTER cluster (i Int8) ENGINE = Memory",
settings={"user": "restricted_user"},
)
2024-03-16 22:18:42 +00:00
with pytest.raises(Exception):
instance.query(
2024-04-17 00:31:15 +00:00
"CREATE TABLE t3 ON CLUSTER cluster (i Int8) ENGINE = Memory",
settings={"user": "restricted_user"},
)
2024-03-16 22:18:42 +00:00
with pytest.raises(Exception):
instance.query(
"DROP DATABASE db2 ON CLUSTER cluster", settings={"user": "restricted_user"}
)
instance.query(
"DROP DATABASE db1 ON CLUSTER cluster", settings={"user": "restricted_user"}
)
def test_kill_query(test_cluster):
instance = test_cluster.instances["ch3"]
test_cluster.ddl_check_query(
instance, "KILL QUERY ON CLUSTER 'cluster' WHERE NOT elapsed FORMAT TSV"
)
def test_detach_query(test_cluster):
instance = test_cluster.instances["ch3"]
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test_attach ON CLUSTER cluster FORMAT TSV"
)
test_cluster.ddl_check_query(
instance, "CREATE TABLE test_attach ON CLUSTER cluster (i Int8)ENGINE = Log"
)
test_cluster.ddl_check_query(
instance, "DETACH TABLE test_attach ON CLUSTER cluster FORMAT TSV"
)
test_cluster.ddl_check_query(
instance, "ATTACH TABLE test_attach ON CLUSTER cluster"
)
def test_optimize_query(test_cluster):
instance = test_cluster.instances["ch3"]
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test_optimize ON CLUSTER cluster FORMAT TSV"
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test_optimize ON CLUSTER cluster (p Date, i Int32) ENGINE = MergeTree(p, p, 8192)",
)
test_cluster.ddl_check_query(
instance, "OPTIMIZE TABLE test_optimize ON CLUSTER cluster FORMAT TSV"
)
2018-09-28 16:43:41 +00:00
def test_create_as_select(test_cluster):
instance = test_cluster.instances["ch2"]
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test_as_select ON CLUSTER cluster ENGINE = Memory AS (SELECT 1 AS x UNION ALL SELECT 2 AS x)",
)
2018-09-28 16:43:41 +00:00
assert TSV(instance.query("SELECT x FROM test_as_select ORDER BY x")) == TSV(
"1\n2\n"
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test_as_select ON CLUSTER cluster"
)
2018-09-28 16:05:58 +00:00
def test_create_reserved(test_cluster):
instance = test_cluster.instances["ch2"]
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test_reserved ON CLUSTER cluster (`p` Date, `image` Nullable(String), `index` Nullable(Float64), `invalidate` Nullable(Int64)) ENGINE = MergeTree(`p`, `p`, 8192)",
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test_as_reserved ON CLUSTER cluster ENGINE = Memory AS (SELECT * from test_reserved)",
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test_reserved ON CLUSTER cluster"
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test_as_reserved ON CLUSTER cluster"
)
2019-12-19 19:39:49 +00:00
def test_rename(test_cluster):
instance = test_cluster.instances["ch1"]
rules = test_cluster.pm_random_drops.pop_rules()
2021-06-21 15:42:40 +00:00
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS rename_shard ON CLUSTER cluster SYNC"
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS rename_new ON CLUSTER cluster SYNC"
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS rename_old ON CLUSTER cluster SYNC"
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS rename ON CLUSTER cluster SYNC"
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS rename_shard ON CLUSTER cluster (id Int64, sid String DEFAULT concat('old', toString(id))) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/staging/test_shard', '{replica}') ORDER BY (id)",
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS rename_new ON CLUSTER cluster AS rename_shard ENGINE = Distributed(cluster, default, rename_shard, id % 2)",
)
2019-12-19 19:39:49 +00:00
test_cluster.ddl_check_query(
instance, "RENAME TABLE rename_new TO rename ON CLUSTER cluster;"
)
for i in range(10):
instance.query("insert into rename (id) values ({})".format(i))
2023-01-08 06:08:20 +00:00
# FIXME ddl_check_query doesn't work for replicated DDDL if replace_hostnames_with_ips=True
2019-12-19 19:39:49 +00:00
# because replicas use wrong host name of leader (and wrong path in zk) to check if it has executed query
# so ddl query will always fail on some replicas even if query was actually executed by leader
# Also such inconsistency in cluster configuration may lead to query duplication if leader suddenly changed
# because path of lock in zk contains shard name, which is list of host names of replicas
instance.query(
"ALTER TABLE rename_shard ON CLUSTER cluster MODIFY COLUMN sid String DEFAULT concat('new', toString(id))",
ignore_error=True,
)
2019-12-19 19:39:49 +00:00
time.sleep(1)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS rename_new ON CLUSTER cluster AS rename_shard ENGINE = Distributed(cluster, default, rename_shard, id % 2)",
)
2019-12-19 19:39:49 +00:00
instance.query("system stop distributed sends rename")
for i in range(10, 20):
instance.query("insert into rename (id) values ({})".format(i))
test_cluster.ddl_check_query(
instance,
"RENAME TABLE rename TO rename_old, rename_new TO rename ON CLUSTER cluster",
)
for i in range(20, 30):
instance.query("insert into rename (id) values ({})".format(i))
instance.query("system flush distributed rename")
for name in ["ch1", "ch2", "ch3", "ch4"]:
test_cluster.instances[name].query("system sync replica rename_shard")
# system stop distributed sends does not affect inserts into local shard,
# so some ids in range (10, 20) will be inserted into rename_shard
assert instance.query("select count(id), sum(id) from rename").rstrip() == "25\t360"
# assert instance.query("select count(id), sum(id) from rename").rstrip() == "20\t290"
2019-12-19 19:39:49 +00:00
assert (
instance.query(
"select count(id), sum(id) from rename where sid like 'old%'"
).rstrip()
== "15\t115"
)
# assert instance.query("select count(id), sum(id) from rename where sid like 'old%'").rstrip() == "10\t45"
2019-12-19 19:39:49 +00:00
assert (
instance.query(
"select count(id), sum(id) from rename where sid like 'new%'"
).rstrip()
== "10\t245"
)
2019-12-19 19:39:49 +00:00
test_cluster.pm_random_drops.push_rules(rules)
2020-02-05 15:24:23 +00:00
def test_socket_timeout(test_cluster):
instance = test_cluster.instances["ch1"]
# queries should not fail with "Timeout exceeded while reading from socket" in case of EINTR caused by query profiler
for i in range(0, 100):
instance.query(
"select hostName() as host, count() from cluster('cluster', 'system', 'settings') group by host"
)
2019-12-19 19:39:49 +00:00
2020-07-10 09:19:32 +00:00
def test_replicated_without_arguments(test_cluster):
2020-08-03 12:19:41 +00:00
rules = test_cluster.pm_random_drops.pop_rules()
2020-07-10 09:19:32 +00:00
instance = test_cluster.instances["ch1"]
2021-06-21 15:42:40 +00:00
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test_atomic.rmt ON CLUSTER cluster SYNC"
)
test_cluster.ddl_check_query(
instance, "DROP DATABASE IF EXISTS test_atomic ON CLUSTER cluster SYNC"
)
2020-09-21 21:09:50 +00:00
test_cluster.ddl_check_query(
2024-09-27 16:47:00 +00:00
instance,
"CREATE DATABASE IF NOT EXISTS test_atomic ON CLUSTER cluster ENGINE=Atomic",
2020-09-21 21:09:50 +00:00
)
assert (
"are supported only for ON CLUSTER queries with Atomic database engine"
in instance.query_and_get_error(
"CREATE TABLE test_atomic.rmt (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n"
)
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree() ORDER BY n",
2020-09-26 19:18:28 +00:00
)
2021-06-21 15:42:40 +00:00
test_cluster.ddl_check_query(
instance, "DROP TABLE test_atomic.rmt ON CLUSTER cluster SYNC"
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test_atomic.rmt UUID '12345678-0000-4000-8000-000000000001' ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n",
2020-09-26 19:18:28 +00:00
)
assert (
instance.query("SHOW CREATE test_atomic.rmt FORMAT TSVRaw")
2022-04-13 14:51:59 +00:00
== "CREATE TABLE test_atomic.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}')\nORDER BY n\nSETTINGS index_granularity = 8192\n"
2020-07-10 09:19:32 +00:00
)
test_cluster.ddl_check_query(
instance,
"RENAME TABLE test_atomic.rmt TO test_atomic.rmt_renamed ON CLUSTER cluster",
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') ORDER BY n",
)
test_cluster.ddl_check_query(
instance,
"EXCHANGE TABLES test_atomic.rmt AND test_atomic.rmt_renamed ON CLUSTER cluster",
)
2021-05-10 14:45:37 +00:00
assert (
instance.query(
"SELECT countDistinct(uuid) from clusterAllReplicas('cluster', 'system', 'databases') WHERE uuid != '00000000-0000-0000-0000-000000000000' AND name='test_atomic'"
)
== "1\n"
)
assert (
instance.query(
"SELECT countDistinct(uuid) from clusterAllReplicas('cluster', 'system', 'tables') WHERE uuid != '00000000-0000-0000-0000-000000000000' AND name='rmt'"
)
2021-05-10 14:45:37 +00:00
== "1\n"
)
2021-02-25 17:09:13 +00:00
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test_atomic.rrmt ON CLUSTER cluster (n UInt64, m UInt64) ENGINE=ReplicatedReplacingMergeTree(m) ORDER BY n",
2021-02-25 17:09:13 +00:00
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test_atomic.rsmt ON CLUSTER cluster (n UInt64, m UInt64, k UInt64) ENGINE=ReplicatedSummingMergeTree((m, k)) ORDER BY n",
2021-02-25 17:09:13 +00:00
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test_atomic.rvcmt ON CLUSTER cluster (n UInt64, m Int8, k UInt64) ENGINE=ReplicatedVersionedCollapsingMergeTree(m, k) ORDER BY n",
2021-02-25 17:09:13 +00:00
)
2021-06-21 15:42:40 +00:00
test_cluster.ddl_check_query(
instance, "DROP DATABASE test_atomic ON CLUSTER cluster SYNC"
)
test_cluster.ddl_check_query(
2022-06-23 19:40:05 +00:00
instance,
"CREATE DATABASE IF NOT EXISTS test_ordinary ON CLUSTER cluster ENGINE=Ordinary",
2022-06-23 19:40:05 +00:00
settings={"allow_deprecated_database_ordinary": 1},
)
assert (
"are supported only for ON CLUSTER queries with Atomic database engine"
in instance.query_and_get_error(
"CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n"
)
)
assert (
"are supported only for ON CLUSTER queries with Atomic database engine"
in instance.query_and_get_error(
"CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{uuid}/', '{replica}') ORDER BY n"
)
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE IF NOT EXISTS test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{table}/', '{replica}') ORDER BY n",
)
2020-09-26 19:18:28 +00:00
assert (
instance.query("SHOW CREATE test_ordinary.rmt FORMAT TSVRaw")
== "CREATE TABLE test_ordinary.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree('/{shard}/rmt/', '{replica}')\nORDER BY n\nSETTINGS index_granularity = 8192\n"
2021-06-21 15:42:40 +00:00
)
test_cluster.ddl_check_query(
instance, "DROP DATABASE IF EXISTS test_ordinary ON CLUSTER cluster SYNC"
)
2020-08-03 12:19:41 +00:00
test_cluster.pm_random_drops.push_rules(rules)
2020-07-10 09:19:32 +00:00
def test_disabled_distributed_ddl(test_cluster):
instance = test_cluster.instances["ch1"]
assert (
"Distributed DDL queries are prohibited for the cluster"
in instance.query_and_get_error(
"CREATE DATABASE IF NOT EXISTS test ON CLUSTER 'cluster_disabled_ddl'"
)
)
if __name__ == "__main__":
with contextmanager(test_cluster)() as ctx_cluster:
2020-10-02 16:54:07 +00:00
for name, instance in list(ctx_cluster.instances.items()):
print(name, instance.ip_address)
input("Cluster created, press any key to destroy...")