ClickHouse/programs/copier/TaskTable.h

174 lines
5.6 KiB
C++
Raw Normal View History

2020-02-19 15:01:08 +00:00
#pragma once
#include "Aliases.h"
2022-10-20 14:37:27 +00:00
#include "TaskShard.h"
2020-09-17 14:38:06 +00:00
2020-02-19 15:01:08 +00:00
namespace DB
{
2022-10-20 14:37:27 +00:00
struct ClusterPartition;
struct TaskCluster;
2020-03-17 18:07:54 +00:00
struct TaskTable
2020-03-18 13:25:49 +00:00
{
2022-10-20 11:45:15 +00:00
TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix, const String & table_key);
2020-02-19 15:01:08 +00:00
TaskCluster & task_cluster;
2020-02-20 09:01:06 +00:00
/// These functions used in checkPartitionIsDone() or checkPartitionPieceIsDone()
/// They are implemented here not to call task_table.tasks_shard[partition_name].second.pieces[current_piece_number] etc.
2020-02-19 15:01:08 +00:00
String getPartitionPath(const String & partition_name) const;
2020-02-20 09:01:06 +00:00
2020-03-13 14:19:20 +00:00
String getPartitionAttachIsActivePath(const String & partition_name) const;
String getPartitionAttachIsDonePath(const String & partition_name) const;
String getPartitionPiecePath(const String & partition_name, size_t piece_number) const;
2020-02-20 09:01:06 +00:00
String getCertainPartitionIsDirtyPath(const String & partition_name) const;
String getCertainPartitionPieceIsDirtyPath(const String & partition_name, size_t piece_number) const;
2020-02-20 09:01:06 +00:00
String getCertainPartitionIsCleanedPath(const String & partition_name) const;
String getCertainPartitionPieceIsCleanedPath(const String & partition_name, size_t piece_number) const;
2020-02-20 09:01:06 +00:00
String getCertainPartitionTaskStatusPath(const String & partition_name) const;
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, size_t piece_number) const;
2020-02-20 09:01:06 +00:00
2022-10-20 11:45:15 +00:00
bool isReplicatedTable() const;
2020-03-03 13:15:23 +00:00
2021-04-29 19:16:51 +00:00
/// These nodes are used for check-status option
String getStatusAllPartitionCount() const;
String getStatusProcessedPartitionsCount() const;
2020-08-08 01:21:04 +00:00
/// Partitions will be split into number-of-splits pieces.
2020-02-20 09:01:06 +00:00
/// Each piece will be copied independently. (10 by default)
size_t number_of_splits;
2020-02-19 15:01:08 +00:00
2021-04-22 22:32:16 +00:00
bool allow_to_copy_alias_and_materialized_columns{false};
bool allow_to_drop_target_partitions{false};
2021-04-22 22:32:16 +00:00
2020-02-19 15:01:08 +00:00
String name_in_config;
/// Used as task ID
String table_id;
2020-02-20 09:01:06 +00:00
/// Column names in primary key
String primary_key_comma_separated;
2020-02-19 15:01:08 +00:00
/// Source cluster and table
String cluster_pull_name;
DatabaseAndTableName table_pull;
/// Destination cluster and table
String cluster_push_name;
DatabaseAndTableName table_push;
/// Storage of destination table
2020-02-20 09:01:06 +00:00
/// (tables that are stored on each shard of target cluster)
2020-02-19 15:01:08 +00:00
String engine_push_str;
ASTPtr engine_push_ast;
ASTPtr engine_push_partition_key_ast;
2020-03-03 13:15:23 +00:00
/// First argument of Replicated...MergeTree()
String engine_push_zk_path;
2020-09-21 10:24:10 +00:00
bool is_replicated_table;
2020-03-03 13:15:23 +00:00
ASTPtr rewriteReplicatedCreateQueryToPlain() const;
2020-03-03 13:15:23 +00:00
2020-02-20 09:01:06 +00:00
/*
* A Distributed table definition used to split data
* Distributed table will be created on each shard of default
* cluster to perform data copying and resharding
* */
2020-02-19 15:01:08 +00:00
String sharding_key_str;
ASTPtr sharding_key_ast;
2020-02-20 09:01:06 +00:00
ASTPtr main_engine_split_ast;
/*
2020-08-08 01:21:04 +00:00
* To copy partition piece form one cluster to another we have to use Distributed table.
* In case of usage separate table (engine_push) for each partition piece,
2020-02-20 09:01:06 +00:00
* we have to use many Distributed tables.
* */
ASTs auxiliary_engine_split_asts;
2020-02-19 15:01:08 +00:00
/// Additional WHERE expression to filter input data
String where_condition_str;
ASTPtr where_condition_ast;
/// Resolved clusters
ClusterPtr cluster_pull;
ClusterPtr cluster_push;
/// Filter partitions that should be copied
bool has_enabled_partitions = false;
Strings enabled_partitions;
NameSet enabled_partitions_set;
2020-02-20 09:01:06 +00:00
/**
* Prioritized list of shards
* all_shards contains information about all shards in the table.
2020-08-08 01:21:04 +00:00
* So we have to check whether particular shard have current partition or not while processing.
2020-02-20 09:01:06 +00:00
*/
2020-02-19 15:01:08 +00:00
TasksShard all_shards;
TasksShard local_shards;
2020-02-20 09:01:06 +00:00
/// All partitions of the current table.
2020-02-19 15:01:08 +00:00
ClusterPartitions cluster_partitions;
NameSet finished_cluster_partitions;
2020-08-08 01:21:04 +00:00
/// Partition names to process in user-specified order
2020-02-19 15:01:08 +00:00
Strings ordered_partition_names;
2022-10-20 11:45:15 +00:00
ClusterPartition & getClusterPartition(const String & partition_name);
2020-02-19 15:01:08 +00:00
Stopwatch watch;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
2020-03-17 18:07:54 +00:00
template <typename RandomEngine>
2020-02-20 09:01:06 +00:00
void initShards(RandomEngine &&random_engine);
2020-02-19 15:01:08 +00:00
};
2022-10-20 14:37:27 +00:00
using TasksTable = std::list<TaskTable>;
2020-02-19 15:01:08 +00:00
template<typename RandomEngine>
2020-02-20 09:06:00 +00:00
inline void TaskTable::initShards(RandomEngine && random_engine)
2020-02-19 20:50:27 +00:00
{
2020-02-19 15:01:08 +00:00
const String & fqdn_name = getFQDNOrHostName();
2022-10-20 11:20:18 +00:00
std::uniform_int_distribution<uint8_t> get_urand(0, std::numeric_limits<UInt8>::max());
2020-02-19 15:01:08 +00:00
// Compute the priority
for (const auto & shard_info : cluster_pull->getShardsInfo())
2020-02-19 20:50:27 +00:00
{
2020-02-19 15:01:08 +00:00
TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
2020-02-19 15:45:49 +00:00
const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
2020-02-19 15:01:08 +00:00
task_shard->priority = getReplicasPriority(replicas, fqdn_name, get_urand(random_engine));
all_shards.emplace_back(task_shard);
}
// Sort by priority
std::sort(all_shards.begin(), all_shards.end(),
2020-02-20 09:06:00 +00:00
[](const TaskShardPtr & lhs, const TaskShardPtr & rhs)
2020-02-19 20:50:27 +00:00
{
2020-02-19 15:01:08 +00:00
return ShardPriority::greaterPriority(lhs->priority, rhs->priority);
});
// Cut local shards
auto it_first_remote = std::lower_bound(all_shards.begin(), all_shards.end(), 1,
2020-02-20 09:06:00 +00:00
[](const TaskShardPtr & lhs, UInt8 is_remote)
2020-02-19 20:50:27 +00:00
{
2020-02-19 15:01:08 +00:00
return lhs->priority.is_remote < is_remote;
});
local_shards.assign(all_shards.begin(), it_first_remote);
}
}