mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 02:41:59 +00:00
452b3b443d
The following options had been introduced: - max-table-tries - max-shard-partition-tries - max-shard-partition-piece-tries-for-alter - retry-delay-ms
100 lines
4.4 KiB
C++
100 lines
4.4 KiB
C++
#pragma once
|
|
|
|
#include <Poco/Util/ServerApplication.h>
|
|
#include <daemon/BaseDaemon.h>
|
|
|
|
#include "ClusterCopier.h"
|
|
|
|
/* clickhouse cluster copier util
|
|
* Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner.
|
|
*
|
|
* See overview in the docs: docs/en/utils/clickhouse-copier.md
|
|
*
|
|
* Implementation details:
|
|
*
|
|
* cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through
|
|
* Distributed table (to perform data resharding). So, worker job is a partition of a source shard.
|
|
* A job has three states: Active, Finished and Abandoned. Abandoned means that worker died and did not finish the job.
|
|
*
|
|
* If an error occurred during the copying (a worker failed or a worker did not finish the INSERT), then the whole partition (on
|
|
* all destination servers) should be dropped and refilled. So, copying entity is a partition of all destination shards.
|
|
* If a failure is detected a special /is_dirty node is created in ZooKeeper signalling that other workers copying the same partition
|
|
* should stop, after a refilling procedure should start.
|
|
*
|
|
* ZooKeeper task node has the following structure:
|
|
* /task/path_root - path passed in --task-path parameter
|
|
* /description - contains user-defined XML config of the task
|
|
* /task_active_workers - contains ephemeral nodes of all currently active workers, used to implement max_workers limitation
|
|
* /server_fqdn#PID_timestamp - cluster-copier worker ID
|
|
* ...
|
|
* /tables - directory with table tasks
|
|
* /cluster.db.table1 - directory of table_hits task
|
|
* /partition1 - directory for partition1
|
|
* /shards - directory for source cluster shards
|
|
* /1 - worker job for the first shard of partition1 of table test.hits
|
|
* Contains info about current status (Active or Finished) and worker ID.
|
|
* /2
|
|
* ...
|
|
* /partition_active_workers
|
|
* /1 - for each job in /shards a corresponding ephemeral node created in /partition_active_workers
|
|
* It is used to detect Abandoned jobs (if there is Active node in /shards and there is no node in
|
|
* /partition_active_workers).
|
|
* Also, it is used to track active workers in the partition (when we need to refill the partition we do
|
|
* not DROP PARTITION while there are active workers)
|
|
* /2
|
|
* ...
|
|
* /is_dirty - the node is set if some worker detected that an error occurred (the INSERT is failed or an Abandoned node is
|
|
* detected). If the node appeared workers in this partition should stop and start cleaning and refilling
|
|
* partition procedure.
|
|
* During this procedure a single 'cleaner' worker is selected. The worker waits for stopping all partition
|
|
* workers, removes /shards node, executes DROP PARTITION on each destination node and removes /is_dirty node.
|
|
* /cleaner- An ephemeral node used to select 'cleaner' worker. Contains ID of the worker.
|
|
* /cluster.db.table2
|
|
* ...
|
|
*/
|
|
|
|
namespace DB
|
|
{
|
|
|
|
class ClusterCopierApp : public BaseDaemon
|
|
{
|
|
public:
|
|
|
|
void initialize(Poco::Util::Application & self) override;
|
|
|
|
void handleHelp(const std::string &, const std::string &);
|
|
|
|
void defineOptions(Poco::Util::OptionSet & options) override;
|
|
|
|
int main(const std::vector<std::string> &) override;
|
|
|
|
private:
|
|
|
|
using Base = BaseDaemon;
|
|
|
|
void mainImpl();
|
|
|
|
std::string config_xml_path;
|
|
std::string task_path;
|
|
std::string log_level = "info";
|
|
bool is_safe_mode = false;
|
|
bool is_status_mode = false;
|
|
double copy_fault_probability = 0.0;
|
|
double move_fault_probability = 0.0;
|
|
bool is_help = false;
|
|
|
|
UInt64 max_table_tries = 3;
|
|
UInt64 max_shard_partition_tries = 3;
|
|
UInt64 max_shard_partition_piece_tries_for_alter = 10;
|
|
std::chrono::milliseconds retry_delay_ms{1000};
|
|
|
|
bool experimental_use_sample_offset{false};
|
|
|
|
std::string base_dir;
|
|
std::string process_path;
|
|
std::string process_id;
|
|
std::string host_id;
|
|
};
|
|
|
|
}
|