This commit is contained in:
nikitamikhaylov 2020-09-25 19:42:02 +03:00 committed by Nikita Mikhailov
parent 015bd56516
commit ffce2a490b
5 changed files with 392 additions and 66 deletions

View File

@ -0,0 +1,71 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<anime_source>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_0_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_1_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_1_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_2_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_2_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_3_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_3_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_4_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_4_1</host>
<port>9000</port>
</replica>
</shard>
</anime_source>
<anime_destination>
<shard>
<replica>
<host>s1_0_0</host>
<port>9000</port>
</replica>
</shard>
</anime_destination>
</remote_servers>
</yandex>

View File

@ -0,0 +1,128 @@
<yandex>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<anime_source>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_0_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_1_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_1_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_2_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_2_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_3_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_3_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>s0_4_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_4_1</host>
<port>9000</port>
</replica>
</shard>
</anime_source>
<anime_destination>
<shard>
<replica>
<host>s1_0_0</host>
<port>9000</port>
</replica>
</shard>
</anime_destination>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>3</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- A table task, copies one table. -->
<table_first>
<!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
<cluster_pull>anime_source</cluster_pull>
<database_pull>default</database_pull>
<table_pull>anime_all</table_pull>
<!-- Destination cluster name and tables in which the data should be inserted -->
<cluster_push>anime_destination</cluster_push>
<database_push>default</database_push>
<table_push>anime</table_push>
<!-- Engine of destination tables.
If destination tables have not be created, workers create them using columns definition from source tables and engine
definition from here.
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
system.parts table.
-->
<engine>
ENGINE = MergeTree()
ORDER BY (first_id, second_id, toSecond(datetime))
PARTITION BY toSecond(datetime)
</engine>
<!-- Sharding key used to insert data to destination cluster -->
<sharding_key>cityHash64(first_id, second_id)</sharding_key>
</table_first>
</tables>
</yandex>

View File

@ -94,6 +94,8 @@ class Task1:
instance.query("INSERT INTO hits_all SELECT * FROM system.numbers LIMIT 1002", instance.query("INSERT INTO hits_all SELECT * FROM system.numbers LIMIT 1002",
settings={"insert_distributed_sync": 1}) settings={"insert_distributed_sync": 1})
print instance.query("SELECT DISTINCT 'all' AS partition FROM hits_all ORDER BY partition DESC")
def check(self): def check(self):
assert TSV(self.cluster.instances['s0_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n") assert TSV(self.cluster.instances['s0_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n") assert TSV(self.cluster.instances['s1_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
@ -310,13 +312,7 @@ def execute_task(task, cmd_options):
# Tests # Tests
@pytest.mark.parametrize( @pytest.mark.parametrize(('use_sample_offset'),[False,True])
('use_sample_offset'),
[
False,
True
]
)
def test_copy_simple(started_cluster, use_sample_offset): def test_copy_simple(started_cluster, use_sample_offset):
if use_sample_offset: if use_sample_offset:
execute_task(Task1(started_cluster), ['--experimental-use-sample-offset', '1']) execute_task(Task1(started_cluster), ['--experimental-use-sample-offset', '1'])
@ -324,13 +320,7 @@ def test_copy_simple(started_cluster, use_sample_offset):
execute_task(Task1(started_cluster), []) execute_task(Task1(started_cluster), [])
@pytest.mark.parametrize( @pytest.mark.parametrize(('use_sample_offset'),[False,True])
('use_sample_offset'),
[
False,
True
]
)
def test_copy_with_recovering(started_cluster, use_sample_offset): def test_copy_with_recovering(started_cluster, use_sample_offset):
if use_sample_offset: if use_sample_offset:
execute_task(Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY), execute_task(Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY),
@ -339,13 +329,7 @@ def test_copy_with_recovering(started_cluster, use_sample_offset):
execute_task(Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)]) execute_task(Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
@pytest.mark.parametrize( @pytest.mark.parametrize(('use_sample_offset'),[False,True])
('use_sample_offset'),
[
False,
True
]
)
def test_copy_with_recovering_after_move_faults(started_cluster, use_sample_offset): def test_copy_with_recovering_after_move_faults(started_cluster, use_sample_offset):
if use_sample_offset: if use_sample_offset:
execute_task(Task1(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY), execute_task(Task1(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY),
@ -380,11 +364,7 @@ def test_no_index(started_cluster):
def test_no_arg(started_cluster): def test_no_arg(started_cluster):
execute_task(Task_no_arg(started_cluster), []) execute_task(Task_no_arg(started_cluster), [])
def test_non_partitioned_table(started_cluster): def test_non_partitioned_table(started_cluster):
execute_task(Task_non_partitioned_table(started_cluster), []) execute_task(Task_non_partitioned_table(started_cluster), [])
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in list(cluster.instances.items()):
print(name, instance.ip_address)
input("Cluster created, press any key to destroy...")

View File

@ -0,0 +1,161 @@
import os
import random
import sys
import time
from contextlib import contextmanager
import docker
import kazoo
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR))
COPYING_FAIL_PROBABILITY = 0.2
MOVING_FAIL_PROBABILITY = 0.2
cluster = ClickHouseCluster(__file__)
def check_all_hosts_sucesfully_executed(tsv_content, num_hosts):
M = TSV.toMat(tsv_content)
hosts = [(l[0], l[1]) for l in M] # (host, port)
codes = [l[2] for l in M]
assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content
assert len(set(codes)) == 1, "\n" + tsv_content
assert codes[0] == "0", "\n" + tsv_content
def ddl_check_query(instance, query, num_hosts=3):
contents = instance.query(query)
check_all_hosts_sucesfully_executed(contents, num_hosts)
return contents
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
clusters_schema = {
"0": {
"0": ["0", "1"],
"1": ["0", "1"],
"2": ["0", "1"],
"3": ["0", "1"],
"4": ["0", "1"]
},
"1": {
"0": ["0"]
}
}
for cluster_name, shards in clusters_schema.iteritems():
for shard_name, replicas in shards.iteritems():
for replica_name in replicas:
name = "s{}_{}_{}".format(cluster_name, shard_name, replica_name)
cluster.add_instance(name,
main_configs=["configs/conf.d/query_log.xml", "configs/conf.d/ddl.xml",
"configs/conf.d/big_clusters.xml"],
user_configs=["configs/users.xml"],
macros={"cluster": cluster_name, "shard": shard_name, "replica": replica_name},
with_zookeeper=True)
cluster.start()
yield cluster
finally:
cluster.shutdown()
class Task_many_to_one:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_many_to_one"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_many_to_one.xml'), 'r').read()
self.rows = 1000
def start(self):
instance = cluster.instances['s0_0_0']
for cluster_num in ["anime_source", "anime_destination"]:
instance.query("DROP DATABASE IF EXISTS default ON CLUSTER {}".format(cluster_num))
instance.query("CREATE DATABASE IF NOT EXISTS default ON CLUSTER {}".format(cluster_num))
instance.query("CREATE TABLE anime ON CLUSTER anime_source (first_id UUID DEFAULT generateUUIDv4(), second_id UInt64, datetime DateTime DEFAULT now()) " +
"ENGINE=ReplicatedMergeTree " +
"PARTITION BY toSecond(datetime) " +
"ORDER BY (first_id, second_id, toSecond(datetime))")
instance.query("CREATE TABLE anime_all ON CLUSTER anime_source (first_id UUID, second_id UInt64, datetime DateTime) ENGINE=Distributed(anime_source, default, anime, rand() % 5)")
instance.query("INSERT INTO anime_all SELECT generateUUIDv4(), number, now() FROM system.numbers LIMIT 1002",
settings={"insert_distributed_sync": 1})
def check(self):
assert TSV(self.cluster.instances['s0_0_0'].query("SELECT count() FROM anime_all")) == TSV("1002\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT count() FROM anime")) == TSV("1002\n")
def execute_task(task, cmd_options):
task.start()
zk = cluster.get_kazoo_client('zoo1')
print "Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1])
try:
zk.delete("/clickhouse-copier", recursive=True)
except kazoo.exceptions.NoNodeError:
print "No node /clickhouse-copier. It is Ok in first test."
zk_task_path = task.zk_task_path
zk.ensure_path(zk_task_path)
zk.create(zk_task_path + "/description", task.copier_task_config)
# Run cluster-copier processes on each node
docker_api = docker.from_env().api
copiers_exec_ids = []
cmd = ['/usr/bin/clickhouse', 'copier',
'--config', '/etc/clickhouse-server/config-copier.xml',
'--task-path', zk_task_path,
'--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options
# copiers = []
copiers = random.sample(cluster.instances.keys(), 3)
copiers.append('s1_0_0')
for instance_name in copiers:
instance = cluster.instances[instance_name]
container = instance.get_docker_handle()
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs/config-copier.xml"),
"/etc/clickhouse-server/config-copier.xml")
print "Copied copier config to {}".format(instance.name)
exec_id = docker_api.exec_create(container.id, cmd, stderr=True)
output = docker_api.exec_start(exec_id).decode('utf8')
print(output)
copiers_exec_ids.append(exec_id)
print "Copier for {} ({}) has started".format(instance.name, instance.ip_address)
# Wait for copiers stopping and check their return codes
for exec_id, instance_name in zip(copiers_exec_ids, copiers):
instance = cluster.instances[instance_name]
while True:
res = docker_api.exec_inspect(exec_id)
if not res['Running']:
break
time.sleep(0.5)
assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(instance.name, instance.ip_address, repr(res))
try:
task.check()
finally:
zk.delete(zk_task_path, recursive=True)
# Tests
def test_copy_simple(started_cluster):
execute_task(Task_many_to_one(started_cluster), [])

View File

@ -11,8 +11,8 @@ sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR))
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV from helpers.test_tools import TSV
COPYING_FAIL_PROBABILITY = 0.33 COPYING_FAIL_PROBABILITY = 0.2
MOVING_FAIL_PROBABILITY = 0.1 MOVING_FAIL_PROBABILITY = 0.2
cluster = None cluster = None
@ -85,6 +85,11 @@ def execute_task(task, cmd_options):
zk = cluster.get_kazoo_client('zoo1') zk = cluster.get_kazoo_client('zoo1')
print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1])) print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]))
try:
zk.delete("/clickhouse-copier", recursive=True)
except kazoo.exceptions.NoNodeError:
print "No node /clickhouse-copier. It is Ok in first test."
zk_task_path = task.zk_task_path zk_task_path = task.zk_task_path
zk.ensure_path(zk_task_path) zk.ensure_path(zk_task_path)
zk.create(zk_task_path + "/description", task.copier_task_config) zk.create(zk_task_path + "/description", task.copier_task_config)
@ -99,23 +104,28 @@ def execute_task(task, cmd_options):
'--base-dir', '/var/log/clickhouse-server/copier'] '--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options cmd += cmd_options
print(cmd) copiers = cluster.instances.keys()
for instance_name, instance in cluster.instances.items(): for instance_name in copiers:
instance = cluster.instances[instance_name]
container = instance.get_docker_handle() container = instance.get_docker_handle()
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs/config-copier.xml"),
"/etc/clickhouse-server/config-copier.xml")
print "Copied copier config to {}".format(instance.name)
exec_id = docker_api.exec_create(container.id, cmd, stderr=True) exec_id = docker_api.exec_create(container.id, cmd, stderr=True)
docker_api.exec_start(exec_id, detach=True) output = docker_api.exec_start(exec_id).decode('utf8')
print(output)
copiers_exec_ids.append(exec_id) copiers_exec_ids.append(exec_id)
print("Copier for {} ({}) has started".format(instance.name, instance.ip_address)) print("Copier for {} ({}) has started".format(instance.name, instance.ip_address))
# Wait for copiers stopping and check their return codes # Wait for copiers stopping and check their return codes
for exec_id, instance in zip(copiers_exec_ids, iter(cluster.instances.values())): for exec_id, instance_name in zip(copiers_exec_ids, copiers):
instance = cluster.instances[instance_name]
while True: while True:
res = docker_api.exec_inspect(exec_id) res = docker_api.exec_inspect(exec_id)
if not res['Running']: if not res['Running']:
break break
time.sleep(1) time.sleep(0.5)
assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(instance.name, instance.ip_address, repr(res)) assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(instance.name, instance.ip_address, repr(res))
@ -125,56 +135,32 @@ def execute_task(task, cmd_options):
zk.delete(zk_task_path, recursive=True) zk.delete(zk_task_path, recursive=True)
# Tests # Tests
@pytest.mark.parametrize( @pytest.mark.parametrize(('use_sample_offset'),[False,True])
('use_sample_offset'),
[
False,
True
]
)
def test_trivial_copy(started_cluster, use_sample_offset): def test_trivial_copy(started_cluster, use_sample_offset):
if use_sample_offset: if use_sample_offset:
execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--experimental-use-sample-offset', '1']) execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--experimental-use-sample-offset', '1'])
else: else:
print("AAAAA")
execute_task(TaskTrivial(started_cluster, use_sample_offset), []) execute_task(TaskTrivial(started_cluster, use_sample_offset), [])
@pytest.mark.parametrize( @pytest.mark.parametrize(('use_sample_offset'),[False,True])
('use_sample_offset'),
[
False,
True
]
)
def test_trivial_copy_with_copy_fault(started_cluster, use_sample_offset): def test_trivial_copy_with_copy_fault(started_cluster, use_sample_offset):
if use_sample_offset: if use_sample_offset:
execute_task(TaskTrivial(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY), execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY),
'--experimental-use-sample-offset', '1']) '--experimental-use-sample-offset', '1'])
else: else:
execute_task(TaskTrivial(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)]) execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
@pytest.mark.parametrize( @pytest.mark.parametrize(('use_sample_offset'),[False,True])
('use_sample_offset'),
[
False,
True
]
)
def test_trivial_copy_with_move_fault(started_cluster, use_sample_offset): def test_trivial_copy_with_move_fault(started_cluster, use_sample_offset):
if use_sample_offset: if use_sample_offset:
execute_task(TaskTrivial(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY), execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY),
'--experimental-use-sample-offset', '1']) '--experimental-use-sample-offset', '1'])
else: else:
execute_task(TaskTrivial(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)]) execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in list(cluster.instances.items()):
print(name, instance.ip_address)
input("Cluster created, press any key to destroy...")