This commit is contained in:
Nikita Mikhaylov 2021-04-01 17:14:54 +03:00
parent bbae136a1e
commit 81fceb38d9
3 changed files with 102 additions and 0 deletions

View File

@ -17,6 +17,14 @@
<profile>default</profile>
<quota>default</quota>
</default>
<dbuser>
<password>12345678</password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</dbuser>
</users>
<quotas>

View File

@ -0,0 +1,64 @@
<?xml version="1.0"?>
<yandex>
<tcp_port_secure>9440</tcp_port_secure>
<remote_servers>
<source_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>s0_0_0</host>
<port>9000</port>
<user>dbuser</user>
<password>12345678</password>
<secure>0</secure>
</replica>
</shard>
</source_cluster>
<destination_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>s0_0_0</host>
<port>9000</port>
<user>dbuser</user>
<password>12345678</password>
<secure>0</secure>
</replica>
</shard>
</destination_cluster>
</remote_servers>
<max_workers>2</max_workers>
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<settings_push>
<readonly>0</readonly>
</settings_push>
<settings>
<connect_timeout>3</connect_timeout>
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<tables>
<table1>
<cluster_pull>source_cluster</cluster_pull>
<database_pull>db1</database_pull>
<table_pull>source_table</table_pull>
<cluster_push>destination_cluster</cluster_push>
<database_push>db2</database_push>
<table_push>destination_table</table_push>
<engine>
ENGINE = MergeTree PARTITION BY a ORDER BY a SETTINGS index_granularity = 8192
</engine>
<sharding_key>rand()</sharding_key>
</table1>
</tables>
</yandex>

View File

@ -251,6 +251,31 @@ class Task_non_partitioned_table:
instance = cluster.instances['s1_1_0']
instance.query("DROP TABLE copier_test1_1")
class Task_self_copy:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_self_copy"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_self_copy.xml'), 'r').read()
def start(self):
instance = cluster.instances['s0_0_0']
instance.query("CREATE DATABASE db1;")
instance.query(
"CREATE TABLE db1.source_table (`a` Int8, `b` String, `c` Int8) ENGINE = MergeTree PARTITION BY a ORDER BY a SETTINGS index_granularity = 8192")
instance.query("CREATE DATABASE db2;")
instance.query(
"CREATE TABLE db2.destination_table (`a` Int8, `b` String, `c` Int8) ENGINE = MergeTree PARTITION BY a ORDER BY a SETTINGS index_granularity = 8192")
instance.query("INSERT INTO db1.source_table VALUES (1, 'ClickHouse', 1);")
instance.query("INSERT INTO db1.source_table VALUES (2, 'Copier', 2);")
def check(self):
instance = cluster.instances['s0_0_0']
assert TSV(instance.query("SELECT * FROM db2.destination_table ORDER BY a")) == TSV(instance.query("SELECT * FROM db1.source_table ORDER BY a"))
instance = cluster.instances['s0_0_0']
instance.query("DROP DATABASE db1 SYNC")
instance.query("DROP DATABASE db2 SYNC")
def execute_task(task, cmd_options):
task.start()
@ -380,9 +405,14 @@ def test_no_index(started_cluster):
def test_no_arg(started_cluster):
execute_task(Task_no_arg(started_cluster), [])
def test_non_partitioned_table(started_cluster):
execute_task(Task_non_partitioned_table(started_cluster), [])
def test_self_copy(started_cluster):
execute_task(Task_self_copy(started_cluster), [])
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in list(cluster.instances.items()):