mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
94 lines
4.0 KiB
Python
94 lines
4.0 KiB
Python
import os
|
|
import sys
|
|
import time
|
|
import pytest
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from helpers.test_tools import TSV
|
|
from .cluster import ClickHouseClusterWithDDLHelpers
|
|
|
|
|
|
@pytest.fixture(scope="module", params=["configs", "configs_secure"])
|
|
def test_cluster(request):
|
|
cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param)
|
|
|
|
try:
|
|
# TODO: Fix ON CLUSTER alters when nodes have different configs. Need to canonicalize node identity.
|
|
cluster.prepare(replace_hostnames_with_ips=False)
|
|
|
|
yield cluster
|
|
|
|
instance = cluster.instances['ch1']
|
|
cluster.ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
|
|
cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
|
|
|
|
# Check query log to ensure that DDL queries are not executed twice
|
|
time.sleep(1.5)
|
|
for instance in cluster.instances.values():
|
|
cluster.ddl_check_there_are_no_dublicates(instance)
|
|
|
|
cluster.pm_random_drops.heal_all()
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_replicated_alters(test_cluster):
|
|
instance = test_cluster.instances['ch2']
|
|
|
|
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge_for_alter ON CLUSTER cluster")
|
|
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster")
|
|
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster")
|
|
|
|
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
|
|
firewall_drops_rules = test_cluster.pm_random_drops.pop_rules()
|
|
|
|
test_cluster.ddl_check_query(instance, """
|
|
CREATE TABLE IF NOT EXISTS merge_for_alter ON CLUSTER cluster (p Date, i Int32)
|
|
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', p, p, 1)
|
|
""")
|
|
|
|
test_cluster.ddl_check_query(instance, """
|
|
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster (p Date, i Int32)
|
|
ENGINE = Distributed(cluster, default, merge_for_alter, i)
|
|
""")
|
|
test_cluster.ddl_check_query(instance, """
|
|
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster (p Date, i Int64, s String)
|
|
ENGINE = Distributed(cluster, default, merge_for_alter, i)
|
|
""")
|
|
|
|
for i in xrange(4):
|
|
k = (i / 2) * 2
|
|
test_cluster.insert_reliable(test_cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge_for_alter (i) VALUES ({})({})".format(k, k+1))
|
|
|
|
test_cluster.sync_replicas("merge_for_alter")
|
|
|
|
assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))
|
|
|
|
|
|
test_cluster.ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster MODIFY COLUMN i Int64")
|
|
test_cluster.ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster ADD COLUMN s DEFAULT toString(i)")
|
|
|
|
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
|
|
|
|
|
|
for i in xrange(4):
|
|
k = (i / 2) * 2 + 4
|
|
test_cluster.insert_reliable(test_cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge_for_alter (p, i) VALUES (31, {})(31, {})".format(k, k+1))
|
|
|
|
test_cluster.sync_replicas("merge_for_alter")
|
|
|
|
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))
|
|
|
|
test_cluster.ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster DETACH PARTITION 197002")
|
|
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
|
|
|
|
test_cluster.ddl_check_query(instance, "DROP TABLE merge_for_alter ON CLUSTER cluster")
|
|
|
|
# Enable random ZK packet drops
|
|
test_cluster.pm_random_drops.push_rules(firewall_drops_rules)
|
|
|
|
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster")
|
|
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster")
|