6.7 KiB
Executable File
clickhouse-copier
Copies data from the tables in one cluster to tables in another (or the same) cluster.
You can run multiple clickhouse-copier
instances on different servers to perform the same job. ZooKeeper is used for syncing the processes.
After starting, clickhouse-copier
:
-
Connects to ZooKeeper and receives:
- Copying jobs.
- The state of the copying jobs.
-
It performs the jobs.
Each running process chooses the "closest" shard of the source cluster and copies the data into the destination cluster, resharding the data if necessary.
clickhouse-copier
tracks the changes in ZooKeeper and applies them on the fly.
To reduce network traffic, we recommend running clickhouse-copier
on the same server where the source data is located.
Running clickhouse-copier
The utility should be run manually:
clickhouse-copier copier --daemon --config zookeeper.xml --task-path /task/path --base-dir /path/to/dir
Parameters:
daemon
— Startsclickhouse-copier
in daemon mode.config
— The path to thezookeeper.xml
file with the parameters for the connection to ZooKeeper.task-path
— The path to the ZooKeeper node. This node is used for syncingclickhouse-copier
processes and storing tasks. Tasks are stored in$task-path/description
.base-dir
— The path to logs and auxiliary files. When it starts,clickhouse-copier
createsclickhouse-copier_YYYYMMHHSS_<PID>
subdirectories in$base-dir
. If this parameter is omitted, the directories are created in the directory whereclickhouse-copier
was launched.
Format of zookeeper.xml
<yandex>
<zookeeper>
<node index="1">
<host>127.0.0.1</host>
<port>2181</port>
</node>
</zookeeper>
</yandex>
Configuration of copying tasks
<yandex>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<source_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
...
</source_cluster>
<destination_cluster>
...
</destination_cluster>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>2</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations. The copier process context also uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, but leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying task descriptions.
You can specify several table tasks in the same task description (in the same ZooKeeper node), and they will be performed
sequentially.
-->
<tables>
<!-- A table task that copies one table. -->
<table_hits>
<!-- Source cluster name (from the <remote_servers/> section) and tables in it that should be copied. -->
<cluster_pull>source_cluster</cluster_pull>
<database_pull>test</database_pull>
<table_pull>hits</table_pull>
<!-- Destination cluster name and tables where the data should be inserted. -->
<cluster_push>destination_cluster</cluster_push>
<database_push>test</database_push>
<table_push>hits2</table_push>
<!-- Engine for destination tables.
If the destination tables have not been created yet, workers create them using column definitions from source tables and the engine
definition from here.
NOTE: If the first worker starts to insert data and detects that the destination partition is not empty, then the partition will
be dropped and refilled. Take this into account if you already have some data in destination tables. You can directly
specify partitions that should be copied in <enabled_partitions/>. They should be in quoted format like the partition column in the
system.parts table.
-->
<engine>
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/hits2', '{replica}')
PARTITION BY toMonday(date)
ORDER BY (CounterID, EventDate)
</engine>
<!-- The sharding key used to insert data to the destination cluster. -->
<sharding_key>jumpConsistentHash(intHash64(UserID), 2)</sharding_key>
<!-- Optional expression that filters data while pulling it from source servers. -->
<where_condition>CounterID != 0</where_condition>
<!-- This section specifies partitions that should be copied. Other partitions will be ignored.
Partition names should have the same format as
the partition column in the system.parts table (i.e. quoted text).
Since the partition key might be different for the source and destination cluster,
these partition names specify the destination partitions.
Note: Although this section is optional (if it omitted, all partitions will be copied), it is strongly recommended to specify the partitions explicitly.
If you already have some partitions ready on the destination cluster, they
will be removed at the start of the copying, because they will be interpreted
as unfinished data from the previous copying.
-->
<enabled_partitions>
<partition>'2018-02-26'</partition>
<partition>'2018-03-05'</partition>
...
</enabled_partitions>
</table_hits>
<!-- Next table to copy. It is not copied until the previous table is copying. -->
</table_visits>
...
</table_visits>
...
</tables>
</yandex>
clickhouse-copier
tracks the changes in /task/path/description
and applies them on the fly. For instance, if you change the value of max_workers
, the number of processes running tasks will also change.