2017-08-25 13:47:09 +00:00
import os
import sys
2017-05-30 11:49:17 +00:00
import time
2017-08-25 13:47:09 +00:00
from contextlib import contextmanager
2017-05-30 11:49:17 +00:00
2019-11-05 18:47:15 +00:00
import pytest
2017-05-30 11:49:17 +00:00
2019-11-05 18:47:15 +00:00
sys . path . insert ( 0 , os . path . dirname ( os . path . dirname ( os . path . abspath ( __file__ ) ) ) )
2017-05-30 11:49:17 +00:00
2019-11-05 18:47:15 +00:00
from helpers . test_tools import TSV
from . cluster import ClickHouseClusterWithDDLHelpers
2017-05-30 11:49:17 +00:00
2017-08-02 14:42:35 +00:00
2019-11-05 18:47:15 +00:00
@pytest.fixture ( scope = " module " , params = [ " configs " , " configs_secure " ] )
def test_cluster ( request ) :
2021-02-12 19:23:50 +00:00
cluster = ClickHouseClusterWithDDLHelpers ( __file__ , request . param , request . param )
2017-08-02 14:42:35 +00:00
try :
2019-11-05 18:47:15 +00:00
cluster . prepare ( )
2017-08-02 14:42:35 +00:00
2017-05-30 11:49:17 +00:00
yield cluster
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " ch1 " ]
2019-11-05 18:47:15 +00:00
cluster . ddl_check_query ( instance , " DROP DATABASE test ON CLUSTER ' cluster ' " )
2022-03-22 16:39:58 +00:00
cluster . ddl_check_query (
instance , " DROP DATABASE IF EXISTS test2 ON CLUSTER ' cluster ' "
)
2017-05-30 11:49:17 +00:00
2017-07-27 18:44:55 +00:00
# Check query log to ensure that DDL queries are not executed twice
2017-07-31 18:57:13 +00:00
time . sleep ( 1.5 )
2020-10-02 16:54:07 +00:00
for instance in list ( cluster . instances . values ( ) ) :
2019-11-05 18:47:15 +00:00
cluster . ddl_check_there_are_no_dublicates ( instance )
2017-07-31 18:57:13 +00:00
2018-01-25 18:14:37 +00:00
cluster . pm_random_drops . heal_all ( )
2017-08-02 14:42:35 +00:00
finally :
2017-08-14 11:49:30 +00:00
cluster . shutdown ( )
2017-05-30 11:49:17 +00:00
2019-11-05 18:47:15 +00:00
def test_default_database ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch3 " ]
test_cluster . ddl_check_query (
instance , " CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER ' cluster ' FORMAT TSV "
)
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS null ON CLUSTER ' cluster ' FORMAT TSV "
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE null ON CLUSTER ' cluster2 ' (s String DEFAULT ' escape \t \n me ' ) ENGINE = Null " ,
2022-07-21 18:32:33 +00:00
settings = { " distributed_ddl_entry_format_version " : 2 } ,
2022-03-22 16:39:58 +00:00
)
2017-05-30 11:49:17 +00:00
2022-03-22 16:39:58 +00:00
contents = instance . query (
" SELECT hostName() AS h, database FROM all_tables WHERE name = ' null ' ORDER BY h "
)
2017-05-30 11:49:17 +00:00
assert TSV ( contents ) == TSV ( " ch1 \t default \n ch2 \t test2 \n ch3 \t default \n ch4 \t test2 \n " )
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
2022-07-21 18:32:33 +00:00
instance ,
" DROP TABLE IF EXISTS null ON CLUSTER cluster2 " ,
settings = { " distributed_ddl_entry_format_version " : 2 } ,
2022-03-22 16:39:58 +00:00
)
test_cluster . ddl_check_query (
instance , " DROP DATABASE IF EXISTS test2 ON CLUSTER ' cluster ' "
)
2017-05-30 11:49:17 +00:00
2019-11-05 18:47:15 +00:00
def test_create_view ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch3 " ]
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS test.super_simple_view ON CLUSTER ' cluster ' "
)
test_cluster . ddl_check_query (
instance ,
" CREATE VIEW test.super_simple_view ON CLUSTER ' cluster ' AS SELECT * FROM system.numbers FORMAT TSV " ,
)
test_cluster . ddl_check_query (
instance ,
" CREATE MATERIALIZED VIEW test.simple_mat_view ON CLUSTER ' cluster ' ENGINE = Memory AS SELECT * FROM system.numbers FORMAT TSV " ,
)
test_cluster . ddl_check_query (
instance , " DROP TABLE test.simple_mat_view ON CLUSTER ' cluster ' FORMAT TSV "
)
test_cluster . ddl_check_query (
instance ,
" DROP TABLE IF EXISTS test.super_simple_view2 ON CLUSTER ' cluster ' FORMAT TSV " ,
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test.super_simple ON CLUSTER ' cluster ' (i Int8) ENGINE = Memory " ,
)
test_cluster . ddl_check_query (
instance ,
" RENAME TABLE test.super_simple TO test.super_simple2 ON CLUSTER ' cluster ' FORMAT TSV " ,
)
test_cluster . ddl_check_query (
instance , " DROP TABLE test.super_simple2 ON CLUSTER ' cluster ' "
)
2017-08-02 20:54:41 +00:00
2019-11-05 18:47:15 +00:00
def test_on_server_fail ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch1 " ]
kill_instance = test_cluster . instances [ " ch2 " ]
2017-05-30 11:49:17 +00:00
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER ' cluster ' "
)
2017-05-30 11:49:17 +00:00
kill_instance . get_docker_handle ( ) . stop ( )
2022-03-22 16:39:58 +00:00
request = instance . get_query_request (
" CREATE TABLE test.test_server_fail ON CLUSTER ' cluster ' (i Int8) ENGINE=Null " ,
timeout = 180 ,
)
2017-05-30 11:49:17 +00:00
kill_instance . get_docker_handle ( ) . start ( )
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS test.__nope__ ON CLUSTER ' cluster ' "
)
2017-05-30 11:49:17 +00:00
# Check query itself
2019-11-05 18:47:15 +00:00
test_cluster . check_all_hosts_successfully_executed ( request . get_answer ( ) )
2017-05-30 11:49:17 +00:00
# And check query artefacts
2020-09-16 04:26:10 +00:00
contents = instance . query (
2022-03-22 16:39:58 +00:00
" SELECT hostName() AS h FROM all_tables WHERE database= ' test ' AND name= ' test_server_fail ' ORDER BY h "
)
2017-05-30 11:49:17 +00:00
assert TSV ( contents ) == TSV ( " ch1 \n ch2 \n ch3 \n ch4 \n " )
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance , " DROP TABLE test.test_server_fail ON CLUSTER ' cluster ' "
)
2017-05-30 11:49:17 +00:00
2019-11-05 18:47:15 +00:00
def test_simple_alters ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch2 " ]
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS merge ON CLUSTER ' {cluster} ' "
)
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS all_merge_32 ON CLUSTER ' {cluster} ' "
)
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS all_merge_64 ON CLUSTER ' {cluster} ' "
)
test_cluster . ddl_check_query (
instance ,
"""
2020-03-12 06:43:01 +00:00
CREATE TABLE IF NOT EXISTS merge ON CLUSTER ' {cluster} ' ( p Date , i Int32 )
2017-05-30 11:49:17 +00:00
ENGINE = MergeTree ( p , p , 1 )
2022-03-22 16:39:58 +00:00
""" ,
)
test_cluster . ddl_check_query (
instance ,
"""
2020-03-12 06:43:01 +00:00
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER ' {cluster} ' ( p Date , i Int32 )
ENGINE = Distributed ( ' {cluster} ' , default , merge , i )
2022-03-22 16:39:58 +00:00
""" ,
)
test_cluster . ddl_check_query (
instance ,
"""
2020-03-12 06:43:01 +00:00
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER ' {cluster} ' ( p Date , i Int64 , s String )
ENGINE = Distributed ( ' {cluster} ' , default , merge , i )
2022-03-22 16:39:58 +00:00
""" ,
)
2017-05-30 11:49:17 +00:00
2020-10-02 16:54:07 +00:00
for i in range ( 0 , 4 , 2 ) :
2017-05-30 11:49:17 +00:00
k = ( i / 2 ) * 2
2022-03-22 16:39:58 +00:00
test_cluster . instances [ " ch {} " . format ( i + 1 ) ] . query (
" INSERT INTO merge (i) VALUES ( {} )( {} ) " . format ( k , k + 1 )
)
2017-05-30 11:49:17 +00:00
2020-09-16 04:26:10 +00:00
assert TSV ( instance . query ( " SELECT i FROM all_merge_32 ORDER BY i " ) ) == TSV (
2022-03-22 16:39:58 +00:00
" " . join ( [ " {} \n " . format ( x ) for x in range ( 4 ) ] )
)
2017-05-30 11:49:17 +00:00
2018-09-03 15:02:24 +00:00
time . sleep ( 5 )
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance , " ALTER TABLE merge ON CLUSTER ' {cluster} ' MODIFY COLUMN i Int64 "
)
2018-09-03 15:02:24 +00:00
time . sleep ( 5 )
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance ,
" ALTER TABLE merge ON CLUSTER ' {cluster} ' ADD COLUMN s String DEFAULT toString(i) FORMAT TSV " ,
)
2017-05-30 11:49:17 +00:00
2020-09-16 04:26:10 +00:00
assert TSV ( instance . query ( " SELECT i, s FROM all_merge_64 ORDER BY i " ) ) == TSV (
2022-03-22 16:39:58 +00:00
" " . join ( [ " {} \t {} \n " . format ( x , x ) for x in range ( 4 ) ] )
)
2017-05-30 11:49:17 +00:00
2020-10-02 16:54:07 +00:00
for i in range ( 0 , 4 , 2 ) :
2017-05-30 11:49:17 +00:00
k = ( i / 2 ) * 2 + 4
2022-03-22 16:39:58 +00:00
test_cluster . instances [ " ch {} " . format ( i + 1 ) ] . query (
" INSERT INTO merge (p, i) VALUES (31, {} )(31, {} ) " . format ( k , k + 1 )
)
2017-05-30 11:49:17 +00:00
2020-09-16 04:26:10 +00:00
assert TSV ( instance . query ( " SELECT i, s FROM all_merge_64 ORDER BY i " ) ) == TSV (
2022-03-22 16:39:58 +00:00
" " . join ( [ " {} \t {} \n " . format ( x , x ) for x in range ( 8 ) ] )
)
2017-05-30 11:49:17 +00:00
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance , " ALTER TABLE merge ON CLUSTER ' {cluster} ' DETACH PARTITION 197002 "
)
2020-09-16 04:26:10 +00:00
assert TSV ( instance . query ( " SELECT i, s FROM all_merge_64 ORDER BY i " ) ) == TSV (
2022-03-22 16:39:58 +00:00
" " . join ( [ " {} \t {} \n " . format ( x , x ) for x in range ( 4 ) ] )
)
2017-05-30 11:49:17 +00:00
2020-03-12 06:43:01 +00:00
test_cluster . ddl_check_query ( instance , " DROP TABLE merge ON CLUSTER ' {cluster} ' " )
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance , " DROP TABLE all_merge_32 ON CLUSTER ' {cluster} ' "
)
test_cluster . ddl_check_query (
instance , " DROP TABLE all_merge_64 ON CLUSTER ' {cluster} ' "
)
2017-08-25 13:47:09 +00:00
2019-11-05 18:47:15 +00:00
def test_macro ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch2 " ]
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE tab ON CLUSTER ' {cluster} ' (value UInt8) ENGINE = Memory " ,
)
2018-01-22 18:48:44 +00:00
2020-10-02 16:54:07 +00:00
for i in range ( 4 ) :
2022-03-22 16:39:58 +00:00
test_cluster . insert_reliable (
test_cluster . instances [ " ch {} " . format ( i + 1 ) ] ,
" INSERT INTO tab VALUES ( {} ) " . format ( i ) ,
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE distr ON CLUSTER ' {cluster} ' (value UInt8) ENGINE = Distributed( ' {cluster} ' , ' default ' , ' tab ' , value % 4) " ,
)
assert TSV ( instance . query ( " SELECT value FROM distr ORDER BY value " ) ) == TSV (
" 0 \n 1 \n 2 \n 3 \n "
)
assert TSV (
test_cluster . instances [ " ch3 " ] . query ( " SELECT value FROM distr ORDER BY value " )
) == TSV ( " 0 \n 1 \n 2 \n 3 \n " )
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS distr ON CLUSTER ' {cluster} ' "
)
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS tab ON CLUSTER ' {cluster} ' "
)
2018-01-22 18:48:44 +00:00
2018-04-17 19:33:58 +00:00
2019-11-05 18:47:15 +00:00
def test_implicit_macros ( test_cluster ) :
2018-09-28 19:07:29 +00:00
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
2019-11-05 18:47:15 +00:00
firewall_drops_rules = test_cluster . pm_random_drops . pop_rules ( )
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch2 " ]
2018-09-28 19:07:29 +00:00
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance , " DROP DATABASE IF EXISTS test_db ON CLUSTER ' {cluster} ' SYNC "
)
test_cluster . ddl_check_query (
instance , " CREATE DATABASE IF NOT EXISTS test_db ON CLUSTER ' {cluster} ' "
)
2018-10-01 09:01:50 +00:00
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance ,
"""
2018-09-28 19:07:29 +00:00
CREATE TABLE IF NOT EXISTS test_db . test_macro ON CLUSTER ' {cluster} ' ( p Date , i Int32 )
ENGINE = ReplicatedMergeTree ( ' /clickhouse/tables/ {database} / {layer} - {shard} / {table} ' , ' {replica} ' , p , p , 1 )
2022-03-22 16:39:58 +00:00
""" ,
)
2018-09-28 19:07:29 +00:00
# Check that table was created at correct path in zookeeper
2022-03-22 16:39:58 +00:00
assert (
test_cluster . get_kazoo_client ( " zoo1 " ) . exists (
" /clickhouse/tables/test_db/0-1/test_macro "
)
is not None
)
2018-09-28 19:07:29 +00:00
# Enable random ZK packet drops
2019-11-05 18:47:15 +00:00
test_cluster . pm_random_drops . push_rules ( firewall_drops_rules )
2018-09-28 19:07:29 +00:00
2018-04-17 19:33:58 +00:00
2019-11-05 18:47:15 +00:00
def test_allowed_databases ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch2 " ]
2018-04-17 19:33:58 +00:00
instance . query ( " CREATE DATABASE IF NOT EXISTS db1 ON CLUSTER cluster " )
instance . query ( " CREATE DATABASE IF NOT EXISTS db2 ON CLUSTER cluster " )
2022-03-22 16:39:58 +00:00
instance . query (
2024-04-17 00:31:15 +00:00
" CREATE TABLE db1.t1 ON CLUSTER cluster (i Int8) ENGINE = Memory " ,
2022-03-22 16:39:58 +00:00
settings = { " user " : " restricted_user " } ,
)
2019-11-05 18:47:15 +00:00
2018-04-17 19:33:58 +00:00
with pytest . raises ( Exception ) :
2022-03-22 16:39:58 +00:00
instance . query (
2024-04-17 00:31:15 +00:00
" CREATE TABLE db2.t2 ON CLUSTER cluster (i Int8) ENGINE = Memory " ,
2022-03-22 16:39:58 +00:00
settings = { " user " : " restricted_user " } ,
)
2024-03-16 22:18:42 +00:00
2018-04-17 19:33:58 +00:00
with pytest . raises ( Exception ) :
2022-03-22 16:39:58 +00:00
instance . query (
2024-04-17 00:31:15 +00:00
" CREATE TABLE t3 ON CLUSTER cluster (i Int8) ENGINE = Memory " ,
2022-03-22 16:39:58 +00:00
settings = { " user " : " restricted_user " } ,
)
2024-03-16 22:18:42 +00:00
2018-04-17 19:33:58 +00:00
with pytest . raises ( Exception ) :
2022-03-22 16:39:58 +00:00
instance . query (
" DROP DATABASE db2 ON CLUSTER cluster " , settings = { " user " : " restricted_user " }
)
2020-09-16 04:26:10 +00:00
2022-03-22 16:39:58 +00:00
instance . query (
" DROP DATABASE db1 ON CLUSTER cluster " , settings = { " user " : " restricted_user " }
)
2018-04-17 19:33:58 +00:00
2019-11-05 18:47:15 +00:00
def test_kill_query ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch3 " ]
2018-07-20 05:46:48 +00:00
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance , " KILL QUERY ON CLUSTER ' cluster ' WHERE NOT elapsed FORMAT TSV "
)
2018-07-20 05:46:48 +00:00
2020-09-16 04:26:10 +00:00
2019-11-05 18:47:15 +00:00
def test_detach_query ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch3 " ]
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS test_attach ON CLUSTER cluster FORMAT TSV "
)
test_cluster . ddl_check_query (
instance , " CREATE TABLE test_attach ON CLUSTER cluster (i Int8)ENGINE = Log "
)
test_cluster . ddl_check_query (
instance , " DETACH TABLE test_attach ON CLUSTER cluster FORMAT TSV "
)
test_cluster . ddl_check_query (
instance , " ATTACH TABLE test_attach ON CLUSTER cluster "
)
2018-07-20 05:46:48 +00:00
2019-11-05 18:47:15 +00:00
def test_optimize_query ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch3 " ]
2018-07-20 05:46:48 +00:00
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS test_optimize ON CLUSTER cluster FORMAT TSV "
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test_optimize ON CLUSTER cluster (p Date, i Int32) ENGINE = MergeTree(p, p, 8192) " ,
)
test_cluster . ddl_check_query (
instance , " OPTIMIZE TABLE test_optimize ON CLUSTER cluster FORMAT TSV "
)
2018-09-28 16:43:41 +00:00
2018-04-17 19:33:58 +00:00
2019-11-05 18:47:15 +00:00
def test_create_as_select ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch2 " ]
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test_as_select ON CLUSTER cluster ENGINE = Memory AS (SELECT 1 AS x UNION ALL SELECT 2 AS x) " ,
)
assert TSV ( instance . query ( " SELECT x FROM test_as_select ORDER BY x " ) ) == TSV (
" 1 \n 2 \n "
)
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS test_as_select ON CLUSTER cluster "
)
2018-09-28 16:05:58 +00:00
2019-11-05 18:47:15 +00:00
def test_create_reserved ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch2 " ]
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test_reserved ON CLUSTER cluster (`p` Date, `image` Nullable(String), `index` Nullable(Float64), `invalidate` Nullable(Int64)) ENGINE = MergeTree(`p`, `p`, 8192) " ,
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test_as_reserved ON CLUSTER cluster ENGINE = Memory AS (SELECT * from test_reserved) " ,
)
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS test_reserved ON CLUSTER cluster "
)
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS test_as_reserved ON CLUSTER cluster "
)
2019-04-11 17:20:36 +00:00
2019-12-19 19:39:49 +00:00
def test_rename ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch1 " ]
2019-12-19 19:39:49 +00:00
rules = test_cluster . pm_random_drops . pop_rules ( )
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS rename_shard ON CLUSTER cluster SYNC "
)
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS rename_new ON CLUSTER cluster SYNC "
)
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS rename_old ON CLUSTER cluster SYNC "
)
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS rename ON CLUSTER cluster SYNC "
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE rename_shard ON CLUSTER cluster (id Int64, sid String DEFAULT concat( ' old ' , toString(id))) ENGINE = ReplicatedMergeTree( ' /clickhouse/tables/ {shard} /staging/test_shard ' , ' {replica} ' ) ORDER BY (id) " ,
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE rename_new ON CLUSTER cluster AS rename_shard ENGINE = Distributed(cluster, default, rename_shard, id % 2) " ,
)
test_cluster . ddl_check_query (
instance , " RENAME TABLE rename_new TO rename ON CLUSTER cluster; "
)
2019-12-19 19:39:49 +00:00
for i in range ( 10 ) :
instance . query ( " insert into rename (id) values ( {} ) " . format ( i ) )
2023-01-08 06:08:20 +00:00
# FIXME ddl_check_query doesn't work for replicated DDDL if replace_hostnames_with_ips=True
2019-12-19 19:39:49 +00:00
# because replicas use wrong host name of leader (and wrong path in zk) to check if it has executed query
# so ddl query will always fail on some replicas even if query was actually executed by leader
# Also such inconsistency in cluster configuration may lead to query duplication if leader suddenly changed
# because path of lock in zk contains shard name, which is list of host names of replicas
2020-09-16 04:26:10 +00:00
instance . query (
" ALTER TABLE rename_shard ON CLUSTER cluster MODIFY COLUMN sid String DEFAULT concat( ' new ' , toString(id)) " ,
2022-03-22 16:39:58 +00:00
ignore_error = True ,
)
2019-12-19 19:39:49 +00:00
time . sleep ( 1 )
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE rename_new ON CLUSTER cluster AS rename_shard ENGINE = Distributed(cluster, default, rename_shard, id % 2) " ,
)
2019-12-19 19:39:49 +00:00
instance . query ( " system stop distributed sends rename " )
for i in range ( 10 , 20 ) :
instance . query ( " insert into rename (id) values ( {} ) " . format ( i ) )
2022-03-22 16:39:58 +00:00
test_cluster . ddl_check_query (
instance ,
" RENAME TABLE rename TO rename_old, rename_new TO rename ON CLUSTER cluster " ,
)
2019-12-19 19:39:49 +00:00
for i in range ( 20 , 30 ) :
instance . query ( " insert into rename (id) values ( {} ) " . format ( i ) )
instance . query ( " system flush distributed rename " )
2022-03-22 16:39:58 +00:00
for name in [ " ch1 " , " ch2 " , " ch3 " , " ch4 " ] :
2019-12-19 19:39:49 +00:00
test_cluster . instances [ name ] . query ( " system sync replica rename_shard " )
# system stop distributed sends does not affect inserts into local shard,
# so some ids in range (10, 20) will be inserted into rename_shard
assert instance . query ( " select count(id), sum(id) from rename " ) . rstrip ( ) == " 25 \t 360 "
2020-09-16 04:26:10 +00:00
# assert instance.query("select count(id), sum(id) from rename").rstrip() == "20\t290"
2022-03-22 16:39:58 +00:00
assert (
instance . query (
" select count(id), sum(id) from rename where sid like ' old % ' "
) . rstrip ( )
== " 15 \t 115 "
)
2020-09-16 04:26:10 +00:00
# assert instance.query("select count(id), sum(id) from rename where sid like 'old%'").rstrip() == "10\t45"
2022-03-22 16:39:58 +00:00
assert (
instance . query (
" select count(id), sum(id) from rename where sid like ' new % ' "
) . rstrip ( )
== " 10 \t 245 "
)
2019-12-19 19:39:49 +00:00
test_cluster . pm_random_drops . push_rules ( rules )
2020-09-16 04:26:10 +00:00
2020-02-05 15:24:23 +00:00
def test_socket_timeout ( test_cluster ) :
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch1 " ]
2020-02-05 15:24:23 +00:00
# queries should not fail with "Timeout exceeded while reading from socket" in case of EINTR caused by query profiler
for i in range ( 0 , 100 ) :
2022-03-22 16:39:58 +00:00
instance . query (
" select hostName() as host, count() from cluster( ' cluster ' , ' system ' , ' settings ' ) group by host "
)
2019-12-19 19:39:49 +00:00
2020-09-16 04:26:10 +00:00
2020-07-10 09:19:32 +00:00
def test_replicated_without_arguments ( test_cluster ) :
2020-08-03 12:19:41 +00:00
rules = test_cluster . pm_random_drops . pop_rules ( )
2022-03-22 16:39:58 +00:00
instance = test_cluster . instances [ " ch1 " ]
test_cluster . ddl_check_query (
instance , " DROP TABLE IF EXISTS test_atomic.rmt ON CLUSTER cluster SYNC "
)
test_cluster . ddl_check_query (
instance , " DROP DATABASE IF EXISTS test_atomic ON CLUSTER cluster SYNC "
)
test_cluster . ddl_check_query (
instance , " CREATE DATABASE test_atomic ON CLUSTER cluster ENGINE=Atomic "
)
assert (
" are supported only for ON CLUSTER queries with Atomic database engine "
in instance . query_and_get_error (
" CREATE TABLE test_atomic.rmt (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n "
)
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree() ORDER BY n " ,
)
test_cluster . ddl_check_query (
instance , " DROP TABLE test_atomic.rmt ON CLUSTER cluster SYNC "
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test_atomic.rmt UUID ' 12345678-0000-4000-8000-000000000001 ' ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n " ,
)
assert (
instance . query ( " SHOW CREATE test_atomic.rmt FORMAT TSVRaw " )
2022-04-13 14:51:59 +00:00
== " CREATE TABLE test_atomic.rmt \n ( \n `n` UInt64, \n `s` String \n ) \n ENGINE = ReplicatedMergeTree( ' /clickhouse/tables/ {uuid} / {shard} ' , ' {replica} ' ) \n ORDER BY n \n SETTINGS index_granularity = 8192 \n "
2022-03-22 16:39:58 +00:00
)
test_cluster . ddl_check_query (
instance ,
" RENAME TABLE test_atomic.rmt TO test_atomic.rmt_renamed ON CLUSTER cluster " ,
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree( ' /clickhouse/tables/ {uuid} / {shard} ' , ' {replica} ' ) ORDER BY n " ,
)
test_cluster . ddl_check_query (
instance ,
" EXCHANGE TABLES test_atomic.rmt AND test_atomic.rmt_renamed ON CLUSTER cluster " ,
)
assert (
instance . query (
" SELECT countDistinct(uuid) from clusterAllReplicas( ' cluster ' , ' system ' , ' databases ' ) WHERE uuid != ' 00000000-0000-0000-0000-000000000000 ' AND name= ' test_atomic ' "
)
== " 1 \n "
)
assert (
instance . query (
" SELECT countDistinct(uuid) from clusterAllReplicas( ' cluster ' , ' system ' , ' tables ' ) WHERE uuid != ' 00000000-0000-0000-0000-000000000000 ' AND name= ' rmt ' "
)
== " 1 \n "
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test_atomic.rrmt ON CLUSTER cluster (n UInt64, m UInt64) ENGINE=ReplicatedReplacingMergeTree(m) ORDER BY n " ,
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test_atomic.rsmt ON CLUSTER cluster (n UInt64, m UInt64, k UInt64) ENGINE=ReplicatedSummingMergeTree((m, k)) ORDER BY n " ,
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test_atomic.rvcmt ON CLUSTER cluster (n UInt64, m Int8, k UInt64) ENGINE=ReplicatedVersionedCollapsingMergeTree(m, k) ORDER BY n " ,
)
test_cluster . ddl_check_query (
instance , " DROP DATABASE test_atomic ON CLUSTER cluster SYNC "
)
test_cluster . ddl_check_query (
2022-06-23 19:40:05 +00:00
instance ,
" CREATE DATABASE test_ordinary ON CLUSTER cluster ENGINE=Ordinary " ,
settings = { " allow_deprecated_database_ordinary " : 1 } ,
2022-03-22 16:39:58 +00:00
)
assert (
" are supported only for ON CLUSTER queries with Atomic database engine "
in instance . query_and_get_error (
" CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n "
)
)
assert (
" are supported only for ON CLUSTER queries with Atomic database engine "
in instance . query_and_get_error (
" CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree( ' / {shard} / {uuid} / ' , ' {replica} ' ) ORDER BY n "
)
)
test_cluster . ddl_check_query (
instance ,
" CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree( ' / {shard} / {table} / ' , ' {replica} ' ) ORDER BY n " ,
)
assert (
instance . query ( " SHOW CREATE test_ordinary.rmt FORMAT TSVRaw " )
== " CREATE TABLE test_ordinary.rmt \n ( \n `n` UInt64, \n `s` String \n ) \n ENGINE = ReplicatedMergeTree( ' / {shard} /rmt/ ' , ' {replica} ' ) \n ORDER BY n \n SETTINGS index_granularity = 8192 \n "
)
test_cluster . ddl_check_query (
instance , " DROP DATABASE test_ordinary ON CLUSTER cluster SYNC "
)
2020-08-03 12:19:41 +00:00
test_cluster . pm_random_drops . push_rules ( rules )
2020-07-10 09:19:32 +00:00
2020-09-16 04:26:10 +00:00
2023-03-29 13:37:58 +00:00
def test_disabled_distributed_ddl ( test_cluster ) :
instance = test_cluster . instances [ " ch1 " ]
assert (
" Distributed DDL queries are prohibited for the cluster "
in instance . query_and_get_error (
" CREATE DATABASE IF NOT EXISTS test ON CLUSTER ' cluster_disabled_ddl ' "
)
)
2022-03-22 16:39:58 +00:00
if __name__ == " __main__ " :
2019-11-05 18:47:15 +00:00
with contextmanager ( test_cluster ) ( ) as ctx_cluster :
2020-10-02 16:54:07 +00:00
for name , instance in list ( ctx_cluster . instances . items ( ) ) :
print ( name , instance . ip_address )
input ( " Cluster created, press any key to destroy... " )