ClickHouse/dbms/programs/copier/TaskTableAndShard.h

403 lines
14 KiB
C++
Raw Normal View History

2020-02-19 15:01:08 +00:00
#pragma once
#include "Aliases.h"
#include "Internals.h"
#include "ClusterPartition.h"
namespace DB
{
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int LOGICAL_ERROR;
}
2020-02-19 15:01:08 +00:00
struct TaskShard;
2020-02-20 09:01:06 +00:00
struct TaskTable {
2020-02-19 15:01:08 +00:00
TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix,
const String & table_key);
TaskCluster & task_cluster;
2020-02-20 09:01:06 +00:00
/// These functions used in checkPartitionIsDone() or checkPartitionPieceIsDone()
/// They are implemented here not to call task_table.tasks_shard[partition_name].second.pieces[current_piece_number] etc.
2020-02-19 15:01:08 +00:00
String getPartitionPath(const String & partition_name) const;
2020-02-20 09:01:06 +00:00
2020-02-20 17:26:20 +00:00
String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const;
2020-02-20 09:01:06 +00:00
String getCertainPartitionIsDirtyPath(const String & partition_name) const;
2020-02-21 16:00:50 +00:00
String getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const;
2020-02-20 09:01:06 +00:00
String getCertainPartitionIsCleanedPath(const String & partition_name) const;
2020-02-21 16:00:50 +00:00
String getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const;
2020-02-20 09:01:06 +00:00
String getCertainPartitionTaskStatusPath(const String & partition_name) const;
2020-02-21 16:00:50 +00:00
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const;
2020-02-20 09:01:06 +00:00
[[maybe_unused]] String getReplicatedEngineFirstArgumentForCurrentPiece(const size_t piece_number) const;
2020-03-03 13:15:23 +00:00
bool isReplicatedTable() { return engine_push_zk_path != ""; }
2020-02-20 09:01:06 +00:00
/// Partitions will be splitted into number-of-splits pieces.
/// Each piece will be copied independently. (10 by default)
size_t number_of_splits;
2020-02-19 15:01:08 +00:00
String name_in_config;
/// Used as task ID
String table_id;
2020-02-20 09:01:06 +00:00
/// Column names in primary key
String primary_key_comma_separated;
2020-02-19 15:01:08 +00:00
/// Source cluster and table
String cluster_pull_name;
DatabaseAndTableName table_pull;
/// Destination cluster and table
String cluster_push_name;
DatabaseAndTableName table_push;
/// Storage of destination table
2020-02-20 09:01:06 +00:00
/// (tables that are stored on each shard of target cluster)
2020-02-19 15:01:08 +00:00
String engine_push_str;
ASTPtr engine_push_ast;
ASTPtr engine_push_partition_key_ast;
2020-03-03 13:15:23 +00:00
/// First argument of Replicated...MergeTree()
String engine_push_zk_path;
ASTPtr rewriteReplicatedCreateQueryToPlain();
2020-03-03 13:15:23 +00:00
2020-02-20 09:01:06 +00:00
/*
* A Distributed table definition used to split data
* Distributed table will be created on each shard of default
* cluster to perform data copying and resharding
* */
2020-02-19 15:01:08 +00:00
String sharding_key_str;
ASTPtr sharding_key_ast;
2020-02-20 09:01:06 +00:00
ASTPtr main_engine_split_ast;
/*
* Auxuliary table engines used to perform partition piece copying.
* Each AST represent table engine for certatin piece number.
* After copying partition piece is Ok, this piece will be moved to the main
* target table. All this tables are stored on each shard as the main table.
* We have to use separate tables for partition pieces because of the atomicity of copying.
* Also if we want to move some partition to another table, the partition keys have to be the same.
* */
/*
* To copy partiton piece form one cluster to another we have to use Distributed table.
* In case of usage separate table (engine_push) for each partiton piece,
* we have to use many Distributed tables.
* */
ASTs auxiliary_engine_split_asts;
2020-02-19 15:01:08 +00:00
/// Additional WHERE expression to filter input data
String where_condition_str;
ASTPtr where_condition_ast;
/// Resolved clusters
ClusterPtr cluster_pull;
ClusterPtr cluster_push;
/// Filter partitions that should be copied
bool has_enabled_partitions = false;
Strings enabled_partitions;
NameSet enabled_partitions_set;
2020-02-20 09:01:06 +00:00
/**
* Prioritized list of shards
* all_shards contains information about all shards in the table.
* So we have to check whether particular shard have current partiton or not while processing.
*/
2020-02-19 15:01:08 +00:00
TasksShard all_shards;
TasksShard local_shards;
2020-02-20 09:01:06 +00:00
/// All partitions of the current table.
2020-02-19 15:01:08 +00:00
ClusterPartitions cluster_partitions;
NameSet finished_cluster_partitions;
/// Parition names to process in user-specified order
Strings ordered_partition_names;
2020-02-20 09:01:06 +00:00
ClusterPartition & getClusterPartition(const String &partition_name) {
2020-02-19 15:01:08 +00:00
auto it = cluster_partitions.find(partition_name);
if (it == cluster_partitions.end())
2020-02-20 09:01:06 +00:00
throw Exception("There are no cluster partition " + partition_name + " in " + table_id,
ErrorCodes::LOGICAL_ERROR);
2020-02-19 15:01:08 +00:00
return it->second;
}
Stopwatch watch;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
2020-02-20 09:01:06 +00:00
template<typename RandomEngine>
void initShards(RandomEngine &&random_engine);
2020-02-19 15:01:08 +00:00
};
2020-02-20 09:06:00 +00:00
struct TaskShard
{
2020-02-21 16:00:50 +00:00
TaskShard(TaskTable & parent, const ShardInfo & info_) : task_table(parent), info(info_)
{
list_of_split_tables_on_shard.assign(task_table.number_of_splits, DatabaseAndTableName());
}
2020-02-19 15:01:08 +00:00
2020-02-19 20:50:27 +00:00
TaskTable & task_table;
2020-02-19 15:01:08 +00:00
ShardInfo info;
UInt32 numberInCluster() const { return info.shard_num; }
UInt32 indexInCluster() const { return info.shard_num - 1; }
String getDescription() const;
String getHostNameExample() const;
/// Used to sort clusters by their proximity
ShardPriority priority;
/// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard
ColumnWithTypeAndName partition_key_column;
/// There is a task for each destination partition
TasksPartition partition_tasks;
/// Which partitions have been checked for existence
/// If some partition from this lists is exists, it is in partition_tasks
std::set<String> checked_partitions;
/// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query;
/// Internal distributed tables
DatabaseAndTableName table_read_shard;
2020-02-20 17:26:20 +00:00
2020-02-21 16:00:50 +00:00
DatabaseAndTableName main_table_split_shard;
2020-02-20 17:26:20 +00:00
2020-02-21 16:00:50 +00:00
ListOfDatabasesAndTableNames list_of_split_tables_on_shard;
2020-02-19 15:01:08 +00:00
};
2020-02-20 10:01:02 +00:00
inline String TaskTable::getPartitionPath(const String &partition_name) const {
return task_cluster.task_zookeeper_path // root
+ "/tables/" + table_id // tables/dst_cluster.merge.hits
+ "/" + escapeForFileName(partition_name); // 201701
}
2020-02-21 16:00:50 +00:00
inline String TaskTable::getPartitionPiecePath(const String & partition_name, size_t piece_number) const
{
2020-02-20 09:01:06 +00:00
assert(piece_number < number_of_splits);
2020-02-21 16:00:50 +00:00
return getPartitionPath(partition_name) + "/piece_" + toString(piece_number); // 1...number_of_splits
2020-02-19 15:01:08 +00:00
}
2020-02-21 16:00:50 +00:00
inline String TaskTable::getCertainPartitionIsDirtyPath(const String &partition_name) const
{
2020-02-19 15:01:08 +00:00
return getPartitionPath(partition_name) + "/is_dirty";
}
2020-02-21 16:00:50 +00:00
inline String TaskTable::getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const
{
return getPartitionPiecePath(partition_name, piece_number) + "/is_dirty";
}
inline String TaskTable::getCertainPartitionIsCleanedPath(const String &partition_name) const
{
2020-02-20 09:01:06 +00:00
return getCertainPartitionIsDirtyPath(partition_name) + "/cleaned";
2020-02-19 15:01:08 +00:00
}
2020-02-21 16:00:50 +00:00
inline String TaskTable::getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const
{
return getCertainPartitionPieceIsDirtyPath(partition_name, piece_number) + "/cleaned";
}
inline String TaskTable::getCertainPartitionTaskStatusPath(const String & partition_name) const
{
2020-02-19 15:01:08 +00:00
return getPartitionPath(partition_name) + "/shards";
}
2020-02-21 16:00:50 +00:00
inline String TaskTable::getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const
{
return getPartitionPiecePath(partition_name, piece_number) + "/shards";
}
2020-02-20 09:01:06 +00:00
inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config,
const String & prefix_, const String & table_key)
2020-02-19 15:01:08 +00:00
: task_cluster(parent)
{
String table_prefix = prefix_ + "." + table_key + ".";
name_in_config = table_key;
2020-03-10 20:04:08 +00:00
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 2);
2020-02-20 09:01:06 +00:00
2020-02-19 15:01:08 +00:00
cluster_pull_name = config.getString(table_prefix + "cluster_pull");
cluster_push_name = config.getString(table_prefix + "cluster_push");
table_pull.first = config.getString(table_prefix + "database_pull");
table_pull.second = config.getString(table_prefix + "table_pull");
table_push.first = config.getString(table_prefix + "database_push");
table_push.second = config.getString(table_prefix + "table_push");
/// Used as node name in ZooKeeper
table_id = escapeForFileName(cluster_push_name)
+ "." + escapeForFileName(table_push.first)
+ "." + escapeForFileName(table_push.second);
engine_push_str = config.getString(table_prefix + "engine");
{
ParserStorage parser_storage;
engine_push_ast = parseQuery(parser_storage, engine_push_str, 0);
engine_push_partition_key_ast = extractPartitionKey(engine_push_ast);
2020-03-03 09:49:06 +00:00
primary_key_comma_separated = createCommaSeparatedStringFrom(extractPrimaryKeyColumnNames(engine_push_ast));
2020-03-03 13:15:23 +00:00
engine_push_zk_path = extractReplicatedTableZookeeperPath(engine_push_ast);
2020-02-19 15:01:08 +00:00
}
sharding_key_str = config.getString(table_prefix + "sharding_key");
2020-02-20 09:01:06 +00:00
auxiliary_engine_split_asts.reserve(number_of_splits);
2020-02-19 15:01:08 +00:00
{
ParserExpressionWithOptionalAlias parser_expression(false);
sharding_key_ast = parseQuery(parser_expression, sharding_key_str, 0);
2020-02-20 09:01:06 +00:00
main_engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second,
sharding_key_ast);
for (const auto piece_number : ext::range(0, number_of_splits))
{
auxiliary_engine_split_asts.emplace_back
(
createASTStorageDistributed(cluster_push_name, table_push.first,
2020-02-20 17:26:20 +00:00
table_push.second + "_piece_" + toString(piece_number), sharding_key_ast)
2020-02-20 09:01:06 +00:00
);
}
2020-02-19 15:01:08 +00:00
}
where_condition_str = config.getString(table_prefix + "where_condition", "");
2020-02-20 09:01:06 +00:00
if (!where_condition_str.empty()) {
2020-02-19 15:01:08 +00:00
ParserExpressionWithOptionalAlias parser_expression(false);
where_condition_ast = parseQuery(parser_expression, where_condition_str, 0);
// Will use canonical expression form
where_condition_str = queryToString(where_condition_ast);
}
String enabled_partitions_prefix = table_prefix + "enabled_partitions";
has_enabled_partitions = config.has(enabled_partitions_prefix);
2020-02-20 09:01:06 +00:00
if (has_enabled_partitions) {
2020-02-19 15:01:08 +00:00
Strings keys;
config.keys(enabled_partitions_prefix, keys);
2020-02-20 09:01:06 +00:00
if (keys.empty()) {
2020-02-19 15:01:08 +00:00
/// Parse list of partition from space-separated string
String partitions_str = config.getString(table_prefix + "enabled_partitions");
boost::trim_if(partitions_str, isWhitespaceASCII);
boost::split(enabled_partitions, partitions_str, isWhitespaceASCII, boost::token_compress_on);
2020-02-20 09:01:06 +00:00
} else {
2020-02-19 15:01:08 +00:00
/// Parse sequence of <partition>...</partition>
2020-02-20 09:01:06 +00:00
for (const String &key : keys) {
2020-02-19 15:01:08 +00:00
if (!startsWith(key, "partition"))
2020-02-20 09:01:06 +00:00
throw Exception("Unknown key " + key + " in " + enabled_partitions_prefix,
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
2020-02-19 15:01:08 +00:00
enabled_partitions.emplace_back(config.getString(enabled_partitions_prefix + "." + key));
}
}
2020-02-20 09:01:06 +00:00
std::copy(enabled_partitions.begin(), enabled_partitions.end(),
std::inserter(enabled_partitions_set, enabled_partitions_set.begin()));
2020-02-19 15:01:08 +00:00
}
}
template<typename RandomEngine>
2020-02-20 09:06:00 +00:00
inline void TaskTable::initShards(RandomEngine && random_engine)
2020-02-19 20:50:27 +00:00
{
2020-02-19 15:01:08 +00:00
const String & fqdn_name = getFQDNOrHostName();
std::uniform_int_distribution<UInt8> get_urand(0, std::numeric_limits<UInt8>::max());
// Compute the priority
2020-02-20 09:06:00 +00:00
for (auto & shard_info : cluster_pull->getShardsInfo())
2020-02-19 20:50:27 +00:00
{
2020-02-19 15:01:08 +00:00
TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
2020-02-19 15:45:49 +00:00
const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
2020-02-19 15:01:08 +00:00
task_shard->priority = getReplicasPriority(replicas, fqdn_name, get_urand(random_engine));
all_shards.emplace_back(task_shard);
}
// Sort by priority
std::sort(all_shards.begin(), all_shards.end(),
2020-02-20 09:06:00 +00:00
[](const TaskShardPtr & lhs, const TaskShardPtr & rhs)
2020-02-19 20:50:27 +00:00
{
2020-02-19 15:01:08 +00:00
return ShardPriority::greaterPriority(lhs->priority, rhs->priority);
});
// Cut local shards
auto it_first_remote = std::lower_bound(all_shards.begin(), all_shards.end(), 1,
2020-02-20 09:06:00 +00:00
[](const TaskShardPtr & lhs, UInt8 is_remote)
2020-02-19 20:50:27 +00:00
{
2020-02-19 15:01:08 +00:00
return lhs->priority.is_remote < is_remote;
});
local_shards.assign(all_shards.begin(), it_first_remote);
}
2020-03-03 13:15:23 +00:00
inline String TaskTable::getReplicatedEngineFirstArgumentForCurrentPiece(const size_t piece_number) const
{
assert (engine_push_zk_path != "");
return engine_push_zk_path + "/piece_" + toString(piece_number);
2020-03-03 13:15:23 +00:00
}
inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain()
2020-03-03 13:15:23 +00:00
{
ASTPtr prev_engine_push_ast = engine_push_ast->clone();
auto & new_storage_ast = prev_engine_push_ast->as<ASTStorage &>();
auto & new_engine_ast = new_storage_ast.engine->as<ASTFunction &>();
2020-03-03 13:15:23 +00:00
auto & replicated_table_arguments = new_engine_ast.arguments->children;
/// Delete first two arguments of Replicated...MergeTree() table.
replicated_table_arguments.erase(replicated_table_arguments.begin());
replicated_table_arguments.erase(replicated_table_arguments.begin());
/// Remove replicated from name
new_engine_ast.name = new_engine_ast.name.substr(10);
2020-03-03 13:15:23 +00:00
return new_storage_ast.clone();
2020-03-03 13:15:23 +00:00
}
2020-02-19 15:01:08 +00:00
inline String DB::TaskShard::getDescription() const
{
std::stringstream ss;
ss << "N" << numberInCluster()
<< " (having a replica " << getHostNameExample()
<< ", pull table " + getQuotedTable(task_table.table_pull)
<< " of cluster " + task_table.cluster_pull_name << ")";
return ss.str();
}
inline String DB::TaskShard::getHostNameExample() const
{
2020-03-03 13:15:23 +00:00
auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
2020-02-19 15:01:08 +00:00
return replicas.at(0).readableString();
}
2020-03-03 13:15:23 +00:00
2020-02-19 15:01:08 +00:00
}