2018-01-09 19:12:43 +00:00
import os
2020-09-16 04:26:10 +00:00
import random
2018-01-09 19:12:43 +00:00
import sys
import time
2020-06-10 12:47:34 +00:00
import kazoo
2018-01-09 19:12:43 +00:00
import pytest
2021-06-04 14:21:47 +00:00
import string
import random
2021-04-22 23:13:05 +00:00
from contextlib import contextmanager
2020-06-10 12:47:34 +00:00
from helpers . cluster import ClickHouseCluster
from helpers . test_tools import TSV
2018-01-09 19:12:43 +00:00
2021-04-22 23:13:05 +00:00
import docker
2018-01-09 19:12:43 +00:00
CURRENT_TEST_DIR = os . path . dirname ( os . path . abspath ( __file__ ) )
sys . path . insert ( 0 , os . path . dirname ( CURRENT_TEST_DIR ) )
2020-06-10 12:47:34 +00:00
COPYING_FAIL_PROBABILITY = 0.2
MOVING_FAIL_PROBABILITY = 0.2
2018-01-09 19:12:43 +00:00
2022-03-22 16:39:58 +00:00
cluster = ClickHouseCluster ( __file__ , name = " copier_test " )
2018-01-25 18:14:37 +00:00
2021-06-04 14:21:47 +00:00
def generateRandomString ( count ) :
2022-03-22 16:39:58 +00:00
return " " . join (
random . choice ( string . ascii_uppercase + string . digits ) for _ in range ( count )
)
2021-06-04 14:21:47 +00:00
2020-09-16 04:26:10 +00:00
2018-01-25 18:14:37 +00:00
def check_all_hosts_sucesfully_executed ( tsv_content , num_hosts ) :
M = TSV . toMat ( tsv_content )
2020-09-16 04:26:10 +00:00
hosts = [ ( l [ 0 ] , l [ 1 ] ) for l in M ] # (host, port)
2018-01-25 18:14:37 +00:00
codes = [ l [ 2 ] for l in M ]
assert len ( hosts ) == num_hosts and len ( set ( hosts ) ) == num_hosts , " \n " + tsv_content
assert len ( set ( codes ) ) == 1 , " \n " + tsv_content
assert codes [ 0 ] == " 0 " , " \n " + tsv_content
def ddl_check_query ( instance , query , num_hosts = 3 ) :
contents = instance . query ( query )
check_all_hosts_sucesfully_executed ( contents , num_hosts )
return contents
2020-06-10 12:47:34 +00:00
@pytest.fixture ( scope = " module " )
2018-01-09 19:12:43 +00:00
def started_cluster ( ) :
global cluster
try :
clusters_schema = {
2022-03-22 16:39:58 +00:00
" 0 " : { " 0 " : [ " 0 " , " 1 " ] , " 1 " : [ " 0 " ] } ,
" 1 " : { " 0 " : [ " 0 " , " 1 " ] , " 1 " : [ " 0 " ] } ,
2018-01-09 19:12:43 +00:00
}
2020-10-02 16:54:07 +00:00
for cluster_name , shards in clusters_schema . items ( ) :
for shard_name , replicas in shards . items ( ) :
2018-01-09 19:12:43 +00:00
for replica_name in replicas :
name = " s {} _ {} _ {} " . format ( cluster_name , shard_name , replica_name )
2022-03-22 16:39:58 +00:00
cluster . add_instance (
name ,
main_configs = [
" configs/conf.d/query_log.xml " ,
" configs/conf.d/ddl.xml " ,
" configs/conf.d/clusters.xml " ,
] ,
user_configs = [ " configs/users.xml " ] ,
macros = {
" cluster " : cluster_name ,
" shard " : shard_name ,
" replica " : replica_name ,
} ,
with_zookeeper = True ,
)
2018-01-09 19:12:43 +00:00
cluster . start ( )
yield cluster
finally :
2018-01-10 23:42:31 +00:00
cluster . shutdown ( )
2018-01-09 19:12:43 +00:00
2018-02-20 21:03:38 +00:00
class Task1 :
def __init__ ( self , cluster ) :
self . cluster = cluster
2021-06-08 01:50:43 +00:00
self . zk_task_path = " /clickhouse-copier/task_simple_ " + generateRandomString ( 10 )
2021-06-04 14:21:47 +00:00
self . container_task_file = " /task0_description.xml "
for instance_name , _ in cluster . instances . items ( ) :
instance = cluster . instances [ instance_name ]
2022-03-22 16:39:58 +00:00
instance . copy_file_to_container (
os . path . join ( CURRENT_TEST_DIR , " ./task0_description.xml " ) ,
self . container_task_file ,
)
print (
" Copied task file to container of ' {} ' instance. Path {} " . format (
instance_name , self . container_task_file
)
)
2018-01-25 18:14:37 +00:00
2018-02-20 21:03:38 +00:00
def start ( self ) :
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s0_0_0 " ]
2018-02-20 21:03:38 +00:00
for cluster_num in [ " 0 " , " 1 " ] :
2022-03-22 16:39:58 +00:00
ddl_check_query (
instance ,
" DROP DATABASE IF EXISTS default ON CLUSTER cluster {} SYNC " . format (
cluster_num
) ,
)
ddl_check_query (
instance ,
" CREATE DATABASE default ON CLUSTER cluster {} " . format ( cluster_num ) ,
)
ddl_check_query (
instance ,
" CREATE TABLE hits ON CLUSTER cluster0 (d UInt64, d1 UInt64 MATERIALIZED d+1) "
+ " ENGINE=ReplicatedMergeTree "
+ " PARTITION BY d % 3 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16 " ,
)
ddl_check_query (
instance ,
" CREATE TABLE hits_all ON CLUSTER cluster0 (d UInt64) ENGINE=Distributed(cluster0, default, hits, d) " ,
)
ddl_check_query (
instance ,
" CREATE TABLE hits_all ON CLUSTER cluster1 (d UInt64) ENGINE=Distributed(cluster1, default, hits, d + 1) " ,
)
instance . query (
" INSERT INTO hits_all SELECT * FROM system.numbers LIMIT 1002 " ,
settings = { " insert_distributed_sync " : 1 } ,
)
2018-02-20 21:03:38 +00:00
def check ( self ) :
2022-03-22 16:39:58 +00:00
assert (
self . cluster . instances [ " s0_0_0 " ]
. query ( " SELECT count() FROM hits_all " )
. strip ( )
== " 1002 "
)
assert (
self . cluster . instances [ " s1_0_0 " ]
. query ( " SELECT count() FROM hits_all " )
. strip ( )
== " 1002 "
)
assert (
self . cluster . instances [ " s1_0_0 " ]
. query ( " SELECT DISTINCT d % 2 FROM hits " )
. strip ( )
== " 1 "
)
assert (
self . cluster . instances [ " s1_1_0 " ]
. query ( " SELECT DISTINCT d % 2 FROM hits " )
. strip ( )
== " 0 "
)
instance = self . cluster . instances [ " s0_0_0 " ]
2018-02-20 21:03:38 +00:00
ddl_check_query ( instance , " DROP TABLE hits_all ON CLUSTER cluster0 " )
ddl_check_query ( instance , " DROP TABLE hits_all ON CLUSTER cluster1 " )
ddl_check_query ( instance , " DROP TABLE hits ON CLUSTER cluster0 " )
ddl_check_query ( instance , " DROP TABLE hits ON CLUSTER cluster1 " )
class Task2 :
2020-09-23 18:28:59 +00:00
def __init__ ( self , cluster , unique_zk_path ) :
2018-02-20 21:03:38 +00:00
self . cluster = cluster
2022-03-22 16:39:58 +00:00
self . zk_task_path = (
" /clickhouse-copier/task_month_to_week_partition_ " + generateRandomString ( 5 )
)
2021-06-04 14:21:47 +00:00
self . unique_zk_path = generateRandomString ( 10 )
self . container_task_file = " /task_month_to_week_description.xml "
for instance_name , _ in cluster . instances . items ( ) :
instance = cluster . instances [ instance_name ]
2022-03-22 16:39:58 +00:00
instance . copy_file_to_container (
os . path . join ( CURRENT_TEST_DIR , " ./task_month_to_week_description.xml " ) ,
self . container_task_file ,
)
print (
" Copied task file to container of ' {} ' instance. Path {} " . format (
instance_name , self . container_task_file
)
)
2018-02-20 21:03:38 +00:00
def start ( self ) :
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s0_0_0 " ]
2018-02-20 21:03:38 +00:00
for cluster_num in [ " 0 " , " 1 " ] :
2022-03-22 16:39:58 +00:00
ddl_check_query (
instance ,
" DROP DATABASE IF EXISTS default ON CLUSTER cluster {} " . format (
cluster_num
) ,
)
ddl_check_query (
instance ,
" CREATE DATABASE IF NOT EXISTS default ON CLUSTER cluster {} " . format (
cluster_num
) ,
)
ddl_check_query (
instance ,
" CREATE TABLE a ON CLUSTER cluster0 (date Date, d UInt64, d1 UInt64 ALIAS d+1) "
" ENGINE=ReplicatedMergeTree( ' /clickhouse/tables/cluster_ {cluster} / {shard} / "
+ self . unique_zk_path
+ " ' , "
" ' {replica} ' , date, intHash64(d), (date, intHash64(d)), 8192) " ,
)
ddl_check_query (
instance ,
" CREATE TABLE a_all ON CLUSTER cluster0 (date Date, d UInt64) ENGINE=Distributed(cluster0, default, a, d) " ,
)
2018-02-20 21:03:38 +00:00
2020-09-16 04:26:10 +00:00
instance . query (
" INSERT INTO a_all SELECT toDate(17581 + number) AS date, number AS d FROM system.numbers LIMIT 85 " ,
2022-03-22 16:39:58 +00:00
settings = { " insert_distributed_sync " : 1 } ,
)
2018-02-20 21:03:38 +00:00
def check ( self ) :
2022-03-22 16:39:58 +00:00
assert TSV (
self . cluster . instances [ " s0_0_0 " ] . query (
" SELECT count() FROM cluster(cluster0, default, a) "
)
) == TSV ( " 85 \n " )
assert TSV (
self . cluster . instances [ " s1_0_0 " ] . query (
" SELECT count(), uniqExact(date) FROM cluster(cluster1, default, b) "
)
) == TSV ( " 85 \t 85 \n " )
assert TSV (
self . cluster . instances [ " s1_0_0 " ] . query (
" SELECT DISTINCT jumpConsistentHash(intHash64(d), 2) FROM b "
)
) == TSV ( " 0 \n " )
assert TSV (
self . cluster . instances [ " s1_1_0 " ] . query (
" SELECT DISTINCT jumpConsistentHash(intHash64(d), 2) FROM b "
)
) == TSV ( " 1 \n " )
assert TSV (
self . cluster . instances [ " s1_0_0 " ] . query (
" SELECT uniqExact(partition) IN (12, 13) FROM system.parts WHERE active AND database= ' default ' AND table= ' b ' "
)
) == TSV ( " 1 \n " )
assert TSV (
self . cluster . instances [ " s1_1_0 " ] . query (
" SELECT uniqExact(partition) IN (12, 13) FROM system.parts WHERE active AND database= ' default ' AND table= ' b ' "
)
) == TSV ( " 1 \n " )
instance = cluster . instances [ " s0_0_0 " ]
2018-02-20 21:03:38 +00:00
ddl_check_query ( instance , " DROP TABLE a ON CLUSTER cluster0 " )
ddl_check_query ( instance , " DROP TABLE b ON CLUSTER cluster1 " )
2018-03-11 18:36:09 +00:00
class Task_test_block_size :
def __init__ ( self , cluster ) :
self . cluster = cluster
2022-03-22 16:39:58 +00:00
self . zk_task_path = (
" /clickhouse-copier/task_test_block_size_ " + generateRandomString ( 5 )
)
2018-03-11 18:36:09 +00:00
self . rows = 1000000
2021-06-04 14:21:47 +00:00
self . container_task_file = " /task_test_block_size.xml "
for instance_name , _ in cluster . instances . items ( ) :
instance = cluster . instances [ instance_name ]
2022-03-22 16:39:58 +00:00
instance . copy_file_to_container (
os . path . join ( CURRENT_TEST_DIR , " ./task_test_block_size.xml " ) ,
self . container_task_file ,
)
print (
" Copied task file to container of ' {} ' instance. Path {} " . format (
instance_name , self . container_task_file
)
)
2018-03-11 18:36:09 +00:00
def start ( self ) :
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s0_0_0 " ]
2018-03-11 18:36:09 +00:00
2022-03-22 16:39:58 +00:00
ddl_check_query (
instance ,
"""
2018-03-11 18:36:09 +00:00
CREATE TABLE test_block_size ON CLUSTER shard_0_0 ( partition Date , d UInt64 )
2020-09-23 18:28:59 +00:00
ENGINE = ReplicatedMergeTree
2022-03-22 16:39:58 +00:00
ORDER BY ( d , sipHash64 ( d ) ) SAMPLE BY sipHash64 ( d ) """ ,
2 ,
)
2018-03-11 18:36:09 +00:00
2020-09-16 04:26:10 +00:00
instance . query (
" INSERT INTO test_block_size SELECT toDate(0) AS partition, number as d FROM system.numbers LIMIT {} " . format (
2022-03-22 16:39:58 +00:00
self . rows
)
)
2018-03-11 18:36:09 +00:00
def check ( self ) :
2022-03-22 16:39:58 +00:00
assert TSV (
self . cluster . instances [ " s1_0_0 " ] . query (
" SELECT count() FROM cluster(cluster1, default, test_block_size) "
)
) == TSV ( " {} \n " . format ( self . rows ) )
2018-03-11 18:36:09 +00:00
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s0_0_0 " ]
2018-03-11 18:36:09 +00:00
ddl_check_query ( instance , " DROP TABLE test_block_size ON CLUSTER shard_0_0 " , 2 )
ddl_check_query ( instance , " DROP TABLE test_block_size ON CLUSTER cluster1 " )
2019-04-04 09:22:54 +00:00
2019-04-01 14:58:04 +00:00
class Task_no_index :
def __init__ ( self , cluster ) :
self . cluster = cluster
2022-03-22 16:39:58 +00:00
self . zk_task_path = " /clickhouse-copier/task_no_index_ " + generateRandomString (
5
)
2019-04-01 14:58:04 +00:00
self . rows = 1000000
2021-06-04 14:21:47 +00:00
self . container_task_file = " /task_no_index.xml "
for instance_name , _ in cluster . instances . items ( ) :
instance = cluster . instances [ instance_name ]
2022-03-22 16:39:58 +00:00
instance . copy_file_to_container (
os . path . join ( CURRENT_TEST_DIR , " ./task_no_index.xml " ) ,
self . container_task_file ,
)
print (
" Copied task file to container of ' {} ' instance. Path {} " . format (
instance_name , self . container_task_file
)
)
2019-04-01 14:58:04 +00:00
def start ( self ) :
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s0_0_0 " ]
2021-06-08 01:50:43 +00:00
instance . query ( " DROP TABLE IF EXISTS ontime SYNC " )
2022-03-22 16:39:58 +00:00
instance . query (
" create table IF NOT EXISTS ontime (Year UInt16, FlightDate String) ENGINE = Memory "
)
instance . query (
" insert into ontime values (2016, ' test6 ' ), (2017, ' test7 ' ), (2018, ' test8 ' ) "
)
2019-04-01 14:58:04 +00:00
def check ( self ) :
2022-03-22 16:39:58 +00:00
assert TSV (
self . cluster . instances [ " s1_1_0 " ] . query ( " SELECT Year FROM ontime22 " )
) == TSV ( " 2017 \n " )
instance = cluster . instances [ " s0_0_0 " ]
2019-04-01 14:58:04 +00:00
instance . query ( " DROP TABLE ontime " )
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s1_1_0 " ]
2019-04-01 14:58:04 +00:00
instance . query ( " DROP TABLE ontime22 " )
2018-03-11 18:36:09 +00:00
2019-04-04 09:22:54 +00:00
class Task_no_arg :
def __init__ ( self , cluster ) :
self . cluster = cluster
2020-09-16 04:26:10 +00:00
self . zk_task_path = " /clickhouse-copier/task_no_arg "
2019-04-04 09:22:54 +00:00
self . rows = 1000000
2021-06-04 14:21:47 +00:00
self . container_task_file = " /task_no_arg.xml "
for instance_name , _ in cluster . instances . items ( ) :
instance = cluster . instances [ instance_name ]
2022-03-22 16:39:58 +00:00
instance . copy_file_to_container (
os . path . join ( CURRENT_TEST_DIR , " ./task_no_arg.xml " ) ,
self . container_task_file ,
)
print (
" Copied task file to container of ' {} ' instance. Path {} " . format (
instance_name , self . container_task_file
)
)
2019-04-04 09:22:54 +00:00
def start ( self ) :
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s0_0_0 " ]
2021-06-08 01:50:43 +00:00
instance . query ( " DROP TABLE IF EXISTS copier_test1 SYNC " )
2020-09-16 04:26:10 +00:00
instance . query (
2022-03-22 16:39:58 +00:00
" create table if not exists copier_test1 (date Date, id UInt32) engine = MergeTree PARTITION BY date ORDER BY date SETTINGS index_granularity = 8192 "
)
2019-04-04 09:22:54 +00:00
instance . query ( " insert into copier_test1 values ( ' 2016-01-01 ' , 10); " )
def check ( self ) :
2022-03-22 16:39:58 +00:00
assert TSV (
self . cluster . instances [ " s1_1_0 " ] . query ( " SELECT date FROM copier_test1_1 " )
) == TSV ( " 2016-01-01 \n " )
instance = cluster . instances [ " s0_0_0 " ]
2021-06-08 01:50:43 +00:00
instance . query ( " DROP TABLE copier_test1 SYNC " )
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s1_1_0 " ]
2021-06-08 01:50:43 +00:00
instance . query ( " DROP TABLE copier_test1_1 SYNC " )
2019-04-04 09:22:54 +00:00
2020-11-21 04:32:29 +00:00
2022-03-22 16:39:58 +00:00
class Task_non_partitioned_table :
2020-11-21 04:32:29 +00:00
def __init__ ( self , cluster ) :
self . cluster = cluster
self . zk_task_path = " /clickhouse-copier/task_non_partitoned_table "
self . rows = 1000000
2021-06-04 14:21:47 +00:00
self . container_task_file = " /task_non_partitioned_table.xml "
for instance_name , _ in cluster . instances . items ( ) :
instance = cluster . instances [ instance_name ]
2022-03-22 16:39:58 +00:00
instance . copy_file_to_container (
os . path . join ( CURRENT_TEST_DIR , " ./task_non_partitioned_table.xml " ) ,
self . container_task_file ,
)
print (
" Copied task file to container of ' {} ' instance. Path {} " . format (
instance_name , self . container_task_file
)
)
2020-11-21 04:32:29 +00:00
def start ( self ) :
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s0_0_0 " ]
2021-06-08 01:50:43 +00:00
instance . query ( " DROP TABLE IF EXISTS copier_test1 SYNC " )
2020-11-21 04:32:29 +00:00
instance . query (
2022-03-22 16:39:58 +00:00
" create table copier_test1 (date Date, id UInt32) engine = MergeTree ORDER BY date SETTINGS index_granularity = 8192 "
)
2020-11-21 04:32:29 +00:00
instance . query ( " insert into copier_test1 values ( ' 2016-01-01 ' , 10); " )
def check ( self ) :
2022-03-22 16:39:58 +00:00
assert TSV (
self . cluster . instances [ " s1_1_0 " ] . query ( " SELECT date FROM copier_test1_1 " )
) == TSV ( " 2016-01-01 \n " )
instance = cluster . instances [ " s0_0_0 " ]
2020-11-21 04:32:29 +00:00
instance . query ( " DROP TABLE copier_test1 " )
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s1_1_0 " ]
2020-11-21 04:32:29 +00:00
instance . query ( " DROP TABLE copier_test1_1 " )
2021-04-01 14:14:54 +00:00
2022-03-22 16:39:58 +00:00
class Task_self_copy :
2021-04-01 14:14:54 +00:00
def __init__ ( self , cluster ) :
self . cluster = cluster
self . zk_task_path = " /clickhouse-copier/task_self_copy "
2021-06-04 14:21:47 +00:00
self . container_task_file = " /task_self_copy.xml "
for instance_name , _ in cluster . instances . items ( ) :
instance = cluster . instances [ instance_name ]
2022-03-22 16:39:58 +00:00
instance . copy_file_to_container (
os . path . join ( CURRENT_TEST_DIR , " ./task_self_copy.xml " ) ,
self . container_task_file ,
)
print (
" Copied task file to container of ' {} ' instance. Path {} " . format (
instance_name , self . container_task_file
)
)
2021-04-01 14:14:54 +00:00
def start ( self ) :
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s0_0_0 " ]
2021-06-08 01:50:43 +00:00
instance . query ( " DROP DATABASE IF EXISTS db1 SYNC " )
instance . query ( " DROP DATABASE IF EXISTS db2 SYNC " )
instance . query ( " CREATE DATABASE IF NOT EXISTS db1; " )
2021-04-01 14:14:54 +00:00
instance . query (
2022-03-22 16:39:58 +00:00
" CREATE TABLE IF NOT EXISTS db1.source_table (`a` Int8, `b` String, `c` Int8) ENGINE = MergeTree PARTITION BY a ORDER BY a SETTINGS index_granularity = 8192 "
)
2021-06-08 01:50:43 +00:00
instance . query ( " CREATE DATABASE IF NOT EXISTS db2; " )
2021-04-01 14:14:54 +00:00
instance . query (
2022-03-22 16:39:58 +00:00
" CREATE TABLE IF NOT EXISTS db2.destination_table (`a` Int8, `b` String, `c` Int8) ENGINE = MergeTree PARTITION BY a ORDER BY a SETTINGS index_granularity = 8192 "
)
2021-04-01 14:14:54 +00:00
instance . query ( " INSERT INTO db1.source_table VALUES (1, ' ClickHouse ' , 1); " )
instance . query ( " INSERT INTO db1.source_table VALUES (2, ' Copier ' , 2); " )
def check ( self ) :
2022-03-22 16:39:58 +00:00
instance = cluster . instances [ " s0_0_0 " ]
assert TSV (
instance . query ( " SELECT * FROM db2.destination_table ORDER BY a " )
) == TSV ( instance . query ( " SELECT * FROM db1.source_table ORDER BY a " ) )
instance = cluster . instances [ " s0_0_0 " ]
2021-06-08 01:50:43 +00:00
instance . query ( " DROP DATABASE IF EXISTS db1 SYNC " )
instance . query ( " DROP DATABASE IF EXISTS db2 SYNC " )
2021-04-01 14:14:54 +00:00
2019-04-04 09:22:54 +00:00
2021-03-09 07:32:10 +00:00
def execute_task ( started_cluster , task , cmd_options ) :
2018-02-20 21:03:38 +00:00
task . start ( )
2018-01-09 19:12:43 +00:00
2022-03-22 16:39:58 +00:00
zk = started_cluster . get_kazoo_client ( " zoo1 " )
2020-10-02 16:54:07 +00:00
print ( " Use ZooKeeper server: {} : {} " . format ( zk . hosts [ 0 ] [ 0 ] , zk . hosts [ 0 ] [ 1 ] ) )
2021-06-07 10:49:01 +00:00
2021-06-07 10:06:41 +00:00
try :
zk . delete ( " /clickhouse-copier " , recursive = True )
except kazoo . exceptions . NoNodeError :
print ( " No node /clickhouse-copier. It is Ok in first test. " )
2018-01-09 19:12:43 +00:00
2018-01-25 18:14:37 +00:00
# Run cluster-copier processes on each node
2021-06-05 23:19:17 +00:00
docker_api = started_cluster . docker_client . api
2018-01-09 19:12:43 +00:00
copiers_exec_ids = [ ]
2022-03-22 16:39:58 +00:00
cmd = [
" /usr/bin/clickhouse " ,
" copier " ,
" --config " ,
" /etc/clickhouse-server/config-copier.xml " ,
" --task-path " ,
task . zk_task_path ,
" --task-file " ,
task . container_task_file ,
" --task-upload-force " ,
" true " ,
" --base-dir " ,
" /var/log/clickhouse-server/copier " ,
]
2018-01-09 19:12:43 +00:00
cmd + = cmd_options
2021-06-04 14:21:47 +00:00
print ( cmd )
2021-06-07 10:49:01 +00:00
copiers = random . sample ( list ( started_cluster . instances . keys ( ) ) , 3 )
2020-06-10 12:47:34 +00:00
for instance_name in copiers :
2021-03-09 07:32:10 +00:00
instance = started_cluster . instances [ instance_name ]
2018-01-09 19:12:43 +00:00
container = instance . get_docker_handle ( )
2022-03-22 16:39:58 +00:00
instance . copy_file_to_container (
os . path . join ( CURRENT_TEST_DIR , " configs/config-copier.xml " ) ,
" /etc/clickhouse-server/config-copier.xml " ,
)
2020-10-02 16:54:07 +00:00
print ( " Copied copier config to {} " . format ( instance . name ) )
2018-01-09 19:12:43 +00:00
exec_id = docker_api . exec_create ( container . id , cmd , stderr = True )
2022-03-22 16:39:58 +00:00
output = docker_api . exec_start ( exec_id ) . decode ( " utf8 " )
2020-08-12 08:55:04 +00:00
print ( output )
2018-01-09 19:12:43 +00:00
copiers_exec_ids . append ( exec_id )
2022-03-22 16:39:58 +00:00
print (
" Copier for {} ( {} ) has started " . format ( instance . name , instance . ip_address )
)
2018-01-09 19:12:43 +00:00
2018-01-25 18:14:37 +00:00
# Wait for copiers stopping and check their return codes
2020-06-10 12:47:34 +00:00
for exec_id , instance_name in zip ( copiers_exec_ids , copiers ) :
2021-03-09 07:32:10 +00:00
instance = started_cluster . instances [ instance_name ]
2018-01-09 19:12:43 +00:00
while True :
res = docker_api . exec_inspect ( exec_id )
2022-03-22 16:39:58 +00:00
if not res [ " Running " ] :
2018-01-09 19:12:43 +00:00
break
2020-06-10 12:47:34 +00:00
time . sleep ( 0.5 )
2018-01-09 19:12:43 +00:00
2022-03-22 16:39:58 +00:00
assert res [ " ExitCode " ] == 0 , " Instance: {} ( {} ). Info: {} " . format (
instance . name , instance . ip_address , repr ( res )
)
2018-01-09 19:12:43 +00:00
2018-02-20 21:03:38 +00:00
try :
task . check ( )
finally :
2021-06-04 14:21:47 +00:00
zk . delete ( task . zk_task_path , recursive = True )
2018-01-09 19:12:43 +00:00
2018-02-23 22:47:35 +00:00
# Tests
2022-03-22 16:39:58 +00:00
@pytest.mark.parametrize ( ( " use_sample_offset " ) , [ False , True ] )
2020-04-21 17:37:40 +00:00
def test_copy_simple ( started_cluster , use_sample_offset ) :
if use_sample_offset :
2022-03-22 16:39:58 +00:00
execute_task (
started_cluster ,
Task1 ( started_cluster ) ,
[ " --experimental-use-sample-offset " , " 1 " ] ,
)
2020-04-21 17:37:40 +00:00
else :
2021-03-09 07:32:10 +00:00
execute_task ( started_cluster , Task1 ( started_cluster ) , [ ] )
2020-04-21 17:37:40 +00:00
2022-03-22 16:39:58 +00:00
@pytest.mark.parametrize ( ( " use_sample_offset " ) , [ False , True ] )
2020-04-21 17:37:40 +00:00
def test_copy_with_recovering ( started_cluster , use_sample_offset ) :
if use_sample_offset :
2022-03-22 16:39:58 +00:00
execute_task (
started_cluster ,
Task1 ( started_cluster ) ,
[
" --copy-fault-probability " ,
str ( COPYING_FAIL_PROBABILITY ) ,
" --experimental-use-sample-offset " ,
" 1 " ,
] ,
)
2020-04-21 17:37:40 +00:00
else :
2022-03-22 16:39:58 +00:00
execute_task (
started_cluster ,
Task1 ( started_cluster ) ,
[ " --copy-fault-probability " , str ( COPYING_FAIL_PROBABILITY ) ] ,
)
2020-04-21 17:37:40 +00:00
2020-09-16 04:26:10 +00:00
2022-03-22 16:39:58 +00:00
@pytest.mark.parametrize ( ( " use_sample_offset " ) , [ False , True ] )
2020-04-21 17:37:40 +00:00
def test_copy_with_recovering_after_move_faults ( started_cluster , use_sample_offset ) :
if use_sample_offset :
2022-03-22 16:39:58 +00:00
execute_task (
started_cluster ,
Task1 ( started_cluster ) ,
[
" --move-fault-probability " ,
str ( MOVING_FAIL_PROBABILITY ) ,
" --experimental-use-sample-offset " ,
" 1 " ,
] ,
)
2020-04-21 17:37:40 +00:00
else :
2022-03-22 16:39:58 +00:00
execute_task (
started_cluster ,
Task1 ( started_cluster ) ,
[ " --move-fault-probability " , str ( MOVING_FAIL_PROBABILITY ) ] ,
)
2020-03-16 21:05:38 +00:00
2020-09-16 04:26:10 +00:00
2020-06-10 12:47:34 +00:00
@pytest.mark.timeout ( 600 )
2018-02-20 21:03:38 +00:00
def test_copy_month_to_week_partition ( started_cluster ) :
2021-03-09 07:32:10 +00:00
execute_task ( started_cluster , Task2 ( started_cluster , " test1 " ) , [ ] )
2018-01-09 19:12:43 +00:00
2020-09-16 04:26:10 +00:00
2020-06-10 12:47:34 +00:00
@pytest.mark.timeout ( 600 )
2020-04-22 11:35:18 +00:00
def test_copy_month_to_week_partition_with_recovering ( started_cluster ) :
2022-03-22 16:39:58 +00:00
execute_task (
started_cluster ,
Task2 ( started_cluster , " test2 " ) ,
[ " --copy-fault-probability " , str ( COPYING_FAIL_PROBABILITY ) ] ,
)
2020-03-16 21:05:38 +00:00
2020-09-16 04:26:10 +00:00
2020-06-10 12:47:34 +00:00
@pytest.mark.timeout ( 600 )
2022-03-22 16:39:58 +00:00
def test_copy_month_to_week_partition_with_recovering_after_move_faults (
started_cluster ,
) :
execute_task (
started_cluster ,
Task2 ( started_cluster , " test3 " ) ,
[ " --move-fault-probability " , str ( MOVING_FAIL_PROBABILITY ) ] ,
)
2018-01-09 19:12:43 +00:00
2020-09-16 04:26:10 +00:00
2020-04-22 11:35:18 +00:00
def test_block_size ( started_cluster ) :
2021-03-09 07:32:10 +00:00
execute_task ( started_cluster , Task_test_block_size ( started_cluster ) , [ ] )
2018-03-11 18:36:09 +00:00
2020-09-16 04:26:10 +00:00
2020-04-22 11:35:18 +00:00
def test_no_index ( started_cluster ) :
2021-03-09 07:32:10 +00:00
execute_task ( started_cluster , Task_no_index ( started_cluster ) , [ ] )
2019-04-01 14:58:04 +00:00
2020-09-16 04:26:10 +00:00
2020-04-22 11:35:18 +00:00
def test_no_arg ( started_cluster ) :
2021-03-09 07:32:10 +00:00
execute_task ( started_cluster , Task_no_arg ( started_cluster ) , [ ] )
2018-02-23 22:47:35 +00:00
2021-04-01 14:14:54 +00:00
2020-11-21 04:32:29 +00:00
def test_non_partitioned_table ( started_cluster ) :
2021-03-09 07:32:10 +00:00
execute_task ( started_cluster , Task_non_partitioned_table ( started_cluster ) , [ ] )
2020-09-16 04:26:10 +00:00
2021-04-01 14:14:54 +00:00
def test_self_copy ( started_cluster ) :
2021-04-12 15:08:09 +00:00
execute_task ( started_cluster , Task_self_copy ( started_cluster ) , [ ] )