ClickHouse/dbms/programs/copier/ClusterCopier.h

211 lines
8.3 KiB
C++
Raw Normal View History

#pragma once
2020-02-11 18:34:48 +00:00
2020-02-20 09:01:06 +00:00
#include "Aliases.h"
2020-02-11 18:34:48 +00:00
#include "Internals.h"
2020-02-13 10:52:46 +00:00
#include "TaskCluster.h"
2020-02-20 09:01:06 +00:00
#include "TaskTableAndShard.h"
2020-02-13 10:52:46 +00:00
#include "ShardPartition.h"
#include "ShardPartitionPiece.h"
2020-02-20 09:01:06 +00:00
#include "ZooKeeperStaff.h"
namespace DB
{
2020-02-11 18:34:48 +00:00
class ClusterCopier
{
public:
2020-02-11 18:34:48 +00:00
ClusterCopier(String task_path_,
String host_id_,
String proxy_database_name_,
Context & context_)
:
task_zookeeper_path(std::move(task_path_)),
host_id(std::move(host_id_)),
working_database_name(std::move(proxy_database_name_)),
context(context_),
2020-02-20 10:01:02 +00:00
log(&Poco::Logger::get("ClusterCopier")) {}
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
void init();
2020-02-11 18:34:48 +00:00
template <typename T>
2020-02-18 13:39:22 +00:00
decltype(auto) retry(T && func, UInt64 max_tries = 100);
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
void discoverShardPartitions(const ConnectionTimeouts & timeouts, const TaskShardPtr & task_shard);
2020-02-11 18:34:48 +00:00
/// Compute set of partitions, assume set of partitions aren't changed during the processing
2020-02-18 13:39:22 +00:00
void discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads = 0);
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
void uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force);
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
void reloadTaskDescription();
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
void updateConfigIfNeeded();
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
void process(const ConnectionTimeouts & timeouts);
2020-02-11 18:34:48 +00:00
/// Disables DROP PARTITION commands that used to clear data after errors
2020-02-20 10:01:02 +00:00
void setSafeMode(bool is_safe_mode_ = true)
{
is_safe_mode = is_safe_mode_;
}
2020-02-11 18:34:48 +00:00
2020-02-20 10:01:02 +00:00
void setCopyFaultProbability(double copy_fault_probability_)
{
copy_fault_probability = copy_fault_probability_;
}
2020-02-11 18:34:48 +00:00
2020-02-20 10:01:02 +00:00
protected:
2020-02-11 18:34:48 +00:00
2020-02-20 10:01:02 +00:00
String getWorkersPath() const
{
return task_cluster->task_zookeeper_path + "/task_active_workers";
}
2020-02-11 18:34:48 +00:00
2020-02-20 10:01:02 +00:00
String getWorkersPathVersion() const
{
return getWorkersPath() + "_version";
}
2020-02-11 18:34:48 +00:00
2020-02-20 10:01:02 +00:00
String getCurrentWorkerNodePath() const
{
return getWorkersPath() + "/" + host_id;
}
2020-02-11 18:34:48 +00:00
zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(
const zkutil::ZooKeeperPtr & zookeeper,
const String & description,
2020-02-18 13:39:22 +00:00
bool unprioritized);
2020-02-11 18:34:48 +00:00
2020-02-21 16:00:50 +00:00
/*
* Checks that partition piece or some other entity is clean.
* The only requirement is that you have to pass is_dirty_flag_path and is_dirty_cleaned_path to the function.
* And is_dirty_flag_path is a parent of is_dirty_cleaned_path.
* */
bool checkPartitionPieceIsClean(
const zkutil::ZooKeeperPtr & zookeeper,
const CleanStateClock & clean_state_clock,
const String & task_status_path) const;
bool checkAllPiecesInPartitionAreDone(const TaskTable & task_table, const String & partition_name, const TasksShard & shards_with_partition);
2020-02-11 18:34:48 +00:00
/** Checks that the whole partition of a table was copied. We should do it carefully due to dirty lock.
* State of some task could change during the processing.
* We have to ensure that all shards have the finished state and there is no dirty flag.
* Moreover, we have to check status twice and check zxid, because state can change during the checking.
*/
2020-02-18 13:26:08 +00:00
/* The same as function above
* Assume that we don't know on which shards do we have partition certain piece.
* We'll check them all (I mean shards that contain the whole partition)
* And shards that don't have certain piece MUST mark that piece is_done true.
* */
bool checkPartitionPieceIsDone(const TaskTable & task_table, const String & partition_name,
2020-02-18 13:39:22 +00:00
size_t piece_number, const TasksShard & shards_with_partition);
2020-02-18 13:26:08 +00:00
2020-03-13 14:19:20 +00:00
/*Alter successful insertion to helping tables it will move all pieces to destination table*/
PartitionTaskStatus tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name);
2020-02-11 18:34:48 +00:00
/// Removes MATERIALIZED and ALIAS columns from create table query
2020-02-20 09:01:06 +00:00
ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast);
2020-02-11 18:34:48 +00:00
/// Replaces ENGINE and table name in a create query
2020-02-20 09:01:06 +00:00
std::shared_ptr<ASTCreateQuery> rewriteCreateQueryStorage(const ASTPtr & create_query_ast,
2020-02-18 13:39:22 +00:00
const DatabaseAndTableName & new_table, const ASTPtr & new_storage_ast);
2020-02-11 18:34:48 +00:00
2020-02-20 18:58:00 +00:00
bool tryDropPartitionPiece(ShardPartition & task_partition, const size_t current_piece_number,
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);
2020-02-11 18:34:48 +00:00
static constexpr UInt64 max_table_tries = 1000;
static constexpr UInt64 max_shard_partition_tries = 600;
2020-03-11 19:55:27 +00:00
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 5;
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
2020-02-11 18:34:48 +00:00
/// Job for copying partition from particular shard.
2020-02-18 13:39:22 +00:00
PartitionTaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task);
2020-02-11 18:34:48 +00:00
2020-02-18 13:26:08 +00:00
PartitionTaskStatus iterateThroughAllPiecesInPartition(const ConnectionTimeouts & timeouts, ShardPartition & task_partition,
2020-02-18 13:39:22 +00:00
bool is_unprioritized_task);
2020-02-18 13:26:08 +00:00
PartitionTaskStatus processPartitionPieceTaskImpl(const ConnectionTimeouts & timeouts, ShardPartition & task_partition,
2020-02-18 13:39:22 +00:00
const size_t current_piece_number, bool is_unprioritized_task);
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
void dropAndCreateLocalTable(const ASTPtr & create_ast);
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
void dropLocalTableIfExists(const DatabaseAndTableName & table_name) const;
2020-02-11 18:34:48 +00:00
2020-03-13 16:25:07 +00:00
void dropHelpingTables(const TaskTable & task_table);
2020-02-18 13:39:22 +00:00
String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings = nullptr);
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard);
2020-02-11 18:34:48 +00:00
2020-02-18 13:26:08 +00:00
/// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it.
2020-02-21 16:00:50 +00:00
/// TODO: rewrite comment
void createShardInternalTables(const ConnectionTimeouts & timeouts, TaskShard & task_shard, bool create_split = true);
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
std::set<String> getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard);
2020-02-11 18:34:48 +00:00
2020-02-18 13:39:22 +00:00
bool checkShardHasPartition(const ConnectionTimeouts & timeouts, TaskShard & task_shard, const String & partition_quoted_name);
2020-02-11 18:34:48 +00:00
/// TODO: Implement checkPresentPartitionPiecesOnCurrentShard();
/// Just copypaste the function above
bool checkPresentPartitionPiecesOnCurrentShard(const ConnectionTimeouts & timeouts,
2020-02-18 13:39:22 +00:00
TaskShard & task_shard, const String & partition_quoted_name, size_t current_piece_number);
2020-02-11 18:34:48 +00:00
2020-03-10 20:04:08 +00:00
/*
* This class is used in executeQueryOnCluster function
* You can execute query on each shard (no sense it is executed on each replica of a shard or not)
* or you can execute query on each replica on each shard.
* First mode is useful for INSERTS queries.
* */
enum ClusterExecutionMode
{
ON_EACH_SHARD,
ON_EACH_NODE
};
2020-02-11 18:34:48 +00:00
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
* Returns number of shards for which at least one replica executed query successfully
*/
UInt64 executeQueryOnCluster(
const ClusterPtr & cluster,
const String & query,
const ASTPtr & query_ast_ = nullptr,
const Settings * settings = nullptr,
PoolMode pool_mode = PoolMode::GET_ALL,
2020-03-10 20:04:08 +00:00
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD,
2020-02-18 13:39:22 +00:00
UInt64 max_successful_executions_per_shard = 0) const;
private:
2020-02-11 18:34:48 +00:00
String task_zookeeper_path;
String task_description_path;
String host_id;
String working_database_name;
/// Auto update config stuff
2020-02-20 17:26:20 +00:00
UInt64 task_description_current_version = 1;
std::atomic<UInt64> task_description_version{1};
2020-02-11 18:34:48 +00:00
Coordination::WatchCallback task_description_watch_callback;
/// ZooKeeper session used to set the callback
zkutil::ZooKeeperPtr task_description_watch_zookeeper;
2020-02-11 18:34:48 +00:00
ConfigurationPtr task_cluster_initial_config;
ConfigurationPtr task_cluster_current_config;
Coordination::Stat task_description_current_stat{};
2020-02-11 18:34:48 +00:00
std::unique_ptr<TaskCluster> task_cluster;
bool is_safe_mode = false;
2020-02-11 18:34:48 +00:00
double copy_fault_probability = 0.0;
2020-02-11 18:34:48 +00:00
Context & context;
Poco::Logger * log;
2020-02-11 18:34:48 +00:00
std::chrono::milliseconds default_sleep_time{1000};
};
}