From b481682a1e067d44219bd47e752f685f1e704513 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Tue, 11 Feb 2020 14:19:19 +0300 Subject: [PATCH] just comments --- dbms/programs/copier/ClusterCopier.cpp | 872 ++++--------------------- dbms/programs/copier/ClusterCopier.h | 4 +- dbms/programs/copier/Internals.h | 750 ++++++++++++++++++++- 3 files changed, 854 insertions(+), 772 deletions(-) diff --git a/dbms/programs/copier/ClusterCopier.cpp b/dbms/programs/copier/ClusterCopier.cpp index 2c6b16a7ae4..e8b3018a861 100644 --- a/dbms/programs/copier/ClusterCopier.cpp +++ b/dbms/programs/copier/ClusterCopier.cpp @@ -1,87 +1,10 @@ #include "ClusterCopier.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - +#include "Internals.h" namespace DB { -namespace ErrorCodes -{ - extern const int NO_ZOOKEEPER; - extern const int BAD_ARGUMENTS; - extern const int UNKNOWN_TABLE; - extern const int UNFINISHED; - extern const int UNKNOWN_ELEMENT_IN_CONFIG; -} - using ConfigurationPtr = Poco::AutoPtr; @@ -92,661 +15,19 @@ static ConfigurationPtr getConfigurationFromXMLString(const std::string & xml_da return {new Poco::Util::XMLConfiguration{&input_source}}; } -namespace -{ - - -using DatabaseAndTableName = std::pair; - -String getQuotedTable(const String & database, const String & table) -{ - if (database.empty()) - { - return backQuoteIfNeed(table); - } - - return backQuoteIfNeed(database) + "." + backQuoteIfNeed(table); -} - -String getQuotedTable(const DatabaseAndTableName & db_and_table) -{ - return getQuotedTable(db_and_table.first, db_and_table.second); -} - - -enum class TaskState -{ - Started = 0, - Finished, - Unknown -}; - -/// Used to mark status of shard partition tasks -struct TaskStateWithOwner -{ - TaskStateWithOwner() = default; - TaskStateWithOwner(TaskState state_, const String & owner_) : state(state_), owner(owner_) {} - - TaskState state{TaskState::Unknown}; - String owner; - - static String getData(TaskState state, const String & owner) - { - return TaskStateWithOwner(state, owner).toString(); - } - - String toString() - { - WriteBufferFromOwnString wb; - wb << static_cast(state) << "\n" << escape << owner; - return wb.str(); - } - - static TaskStateWithOwner fromString(const String & data) - { - ReadBufferFromString rb(data); - TaskStateWithOwner res; - UInt32 state; - - rb >> state >> "\n" >> escape >> res.owner; - - if (state >= static_cast(TaskState::Unknown)) - throw Exception("Unknown state " + data, ErrorCodes::LOGICAL_ERROR); - - res.state = static_cast(state); - return res; - } -}; - - -/// Hierarchical description of the tasks -struct ShardPartition; -struct TaskShard; -struct TaskTable; -struct TaskCluster; -struct ClusterPartition; - -using TasksPartition = std::map>; -using ShardInfo = Cluster::ShardInfo; -using TaskShardPtr = std::shared_ptr; -using TasksShard = std::vector; -using TasksTable = std::list; -using ClusterPartitions = std::map>; - - -/// Just destination partition of a shard -struct ShardPartition -{ - ShardPartition(TaskShard & parent, const String & name_quoted_) : task_shard(parent), name(name_quoted_) {} - - String getPartitionPath() const; - String getPartitionCleanStartPath() const; - String getCommonPartitionIsDirtyPath() const; - String getCommonPartitionIsCleanedPath() const; - String getPartitionActiveWorkersPath() const; - String getActiveWorkerPath() const; - String getPartitionShardsPath() const; - String getShardStatusPath() const; - - TaskShard & task_shard; - String name; -}; - - -struct ShardPriority -{ - UInt8 is_remote = 1; - size_t hostname_difference = 0; - UInt8 random = 0; - - static bool greaterPriority(const ShardPriority & current, const ShardPriority & other) - { - return std::forward_as_tuple(current.is_remote, current.hostname_difference, current.random) - < std::forward_as_tuple(other.is_remote, other.hostname_difference, other.random); - } -}; - - -struct TaskShard -{ - TaskShard(TaskTable & parent, const ShardInfo & info_) : task_table(parent), info(info_) {} - - TaskTable & task_table; - - ShardInfo info; - UInt32 numberInCluster() const { return info.shard_num; } - UInt32 indexInCluster() const { return info.shard_num - 1; } - - String getDescription() const; - String getHostNameExample() const; - - /// Used to sort clusters by their proximity - ShardPriority priority; - - /// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard - ColumnWithTypeAndName partition_key_column; - - /// There is a task for each destination partition - TasksPartition partition_tasks; - - /// Which partitions have been checked for existence - /// If some partition from this lists is exists, it is in partition_tasks - std::set checked_partitions; - - /// Last CREATE TABLE query of the table of the shard - ASTPtr current_pull_table_create_query; - - /// Internal distributed tables - DatabaseAndTableName table_read_shard; - DatabaseAndTableName table_split_shard; -}; - - -/// Contains info about all shards that contain a partition -struct ClusterPartition -{ - double elapsed_time_seconds = 0; - UInt64 bytes_copied = 0; - UInt64 rows_copied = 0; - UInt64 blocks_copied = 0; - - UInt64 total_tries = 0; -}; - - -struct TaskTable -{ - TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix, - const String & table_key); - - TaskCluster & task_cluster; - - String getPartitionPath(const String & partition_name) const; - String getPartitionIsDirtyPath(const String & partition_name) const; - String getPartitionIsCleanedPath(const String & partition_name) const; - String getPartitionTaskStatusPath(const String & partition_name) const; - - String name_in_config; - - /// Used as task ID - String table_id; - - /// Source cluster and table - String cluster_pull_name; - DatabaseAndTableName table_pull; - - /// Destination cluster and table - String cluster_push_name; - DatabaseAndTableName table_push; - - /// Storage of destination table - String engine_push_str; - ASTPtr engine_push_ast; - ASTPtr engine_push_partition_key_ast; - - /// A Distributed table definition used to split data - String sharding_key_str; - ASTPtr sharding_key_ast; - ASTPtr engine_split_ast; - - /// Additional WHERE expression to filter input data - String where_condition_str; - ASTPtr where_condition_ast; - - /// Resolved clusters - ClusterPtr cluster_pull; - ClusterPtr cluster_push; - - /// Filter partitions that should be copied - bool has_enabled_partitions = false; - Strings enabled_partitions; - NameSet enabled_partitions_set; - - /// Prioritized list of shards - TasksShard all_shards; - TasksShard local_shards; - - ClusterPartitions cluster_partitions; - NameSet finished_cluster_partitions; - - /// Parition names to process in user-specified order - Strings ordered_partition_names; - - ClusterPartition & getClusterPartition(const String & partition_name) - { - auto it = cluster_partitions.find(partition_name); - if (it == cluster_partitions.end()) - throw Exception("There are no cluster partition " + partition_name + " in " + table_id, ErrorCodes::LOGICAL_ERROR); - return it->second; - } - - Stopwatch watch; - UInt64 bytes_copied = 0; - UInt64 rows_copied = 0; - - template - void initShards(RandomEngine && random_engine); -}; - - -struct TaskCluster -{ - TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_) - : task_zookeeper_path(task_zookeeper_path_), default_local_database(default_local_database_) {} - - void loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key = ""); - - /// Set (or update) settings and max_workers param - void reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key = ""); - - /// Base node for all tasks. Its structure: - /// workers/ - directory with active workers (amount of them is less or equal max_workers) - /// description - node with task configuration - /// table_table1/ - directories with per-partition copying status - String task_zookeeper_path; - - /// Database used to create temporary Distributed tables - String default_local_database; - - /// Limits number of simultaneous workers - UInt64 max_workers = 0; - - /// Base settings for pull and push - Settings settings_common; - /// Settings used to fetch data - Settings settings_pull; - /// Settings used to insert data - Settings settings_push; - - String clusters_prefix; - - /// Subtasks - TasksTable table_tasks; - - std::random_device random_device; - pcg64 random_engine; -}; - - -struct MultiTransactionInfo -{ - int32_t code; - Coordination::Requests requests; - Coordination::Responses responses; -}; - -// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key]) -std::shared_ptr createASTStorageDistributed( - const String & cluster_name, const String & database, const String & table, const ASTPtr & sharding_key_ast = nullptr) -{ - auto args = std::make_shared(); - args->children.emplace_back(std::make_shared(cluster_name)); - args->children.emplace_back(std::make_shared(database)); - args->children.emplace_back(std::make_shared(table)); - if (sharding_key_ast) - args->children.emplace_back(sharding_key_ast); - - auto engine = std::make_shared(); - engine->name = "Distributed"; - engine->arguments = args; - - auto storage = std::make_shared(); - storage->set(storage->engine, engine); - - return storage; -} - - -BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream) -{ - return std::make_shared( - stream, - std::numeric_limits::max(), - std::numeric_limits::max()); -} - -Block getBlockWithAllStreamData(const BlockInputStreamPtr & stream) -{ - return squashStreamIntoOneBlock(stream)->read(); -} - - -/// Path getters - -String TaskTable::getPartitionPath(const String & partition_name) const -{ - return task_cluster.task_zookeeper_path // root - + "/tables/" + table_id // tables/dst_cluster.merge.hits - + "/" + escapeForFileName(partition_name); // 201701 -} - -String ShardPartition::getPartitionCleanStartPath() const -{ - return getPartitionPath() + "/clean_start"; -} - -String ShardPartition::getPartitionPath() const -{ - return task_shard.task_table.getPartitionPath(name); -} - -String ShardPartition::getShardStatusPath() const -{ - // schema: //tables///shards/ - // e.g. /root/table_test.hits/201701/shards/1 - return getPartitionShardsPath() + "/" + toString(task_shard.numberInCluster()); -} - -String ShardPartition::getPartitionShardsPath() const -{ - return getPartitionPath() + "/shards"; -} - -String ShardPartition::getPartitionActiveWorkersPath() const -{ - return getPartitionPath() + "/partition_active_workers"; -} - -String ShardPartition::getActiveWorkerPath() const -{ - return getPartitionActiveWorkersPath() + "/" + toString(task_shard.numberInCluster()); -} - -String ShardPartition::getCommonPartitionIsDirtyPath() const -{ - return getPartitionPath() + "/is_dirty"; -} - -String ShardPartition::getCommonPartitionIsCleanedPath() const -{ - return getCommonPartitionIsDirtyPath() + "/cleaned"; -} - -String TaskTable::getPartitionIsDirtyPath(const String & partition_name) const -{ - return getPartitionPath(partition_name) + "/is_dirty"; -} - -String TaskTable::getPartitionIsCleanedPath(const String & partition_name) const -{ - return getPartitionIsDirtyPath(partition_name) + "/cleaned"; -} - -String TaskTable::getPartitionTaskStatusPath(const String & partition_name) const -{ - return getPartitionPath(partition_name) + "/shards"; -} - -String DB::TaskShard::getDescription() const -{ - std::stringstream ss; - ss << "N" << numberInCluster() - << " (having a replica " << getHostNameExample() - << ", pull table " + getQuotedTable(task_table.table_pull) - << " of cluster " + task_table.cluster_pull_name << ")"; - return ss.str(); -} - -String DB::TaskShard::getHostNameExample() const -{ - auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster()); - return replicas.at(0).readableString(); -} - - -static bool isExtendedDefinitionStorage(const ASTPtr & storage_ast) -{ - const auto & storage = storage_ast->as(); - return storage.partition_by || storage.order_by || storage.sample_by; -} - -static ASTPtr extractPartitionKey(const ASTPtr & storage_ast) -{ - String storage_str = queryToString(storage_ast); - - const auto & storage = storage_ast->as(); - const auto & engine = storage.engine->as(); - - if (!endsWith(engine.name, "MergeTree")) - { - throw Exception("Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported", - ErrorCodes::BAD_ARGUMENTS); - } - - if (isExtendedDefinitionStorage(storage_ast)) - { - if (storage.partition_by) - return storage.partition_by->clone(); - - static const char * all = "all"; - return std::make_shared(Field(all, strlen(all))); - } - else - { - bool is_replicated = startsWith(engine.name, "Replicated"); - size_t min_args = is_replicated ? 3 : 1; - - if (!engine.arguments) - throw Exception("Expected arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS); - - ASTPtr arguments_ast = engine.arguments->clone(); - ASTs & arguments = arguments_ast->children; - - if (arguments.size() < min_args) - throw Exception("Expected at least " + toString(min_args) + " arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS); - - ASTPtr & month_arg = is_replicated ? arguments[2] : arguments[1]; - return makeASTFunction("toYYYYMM", month_arg->clone()); - } -} - - -TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix_, - const String & table_key) -: task_cluster(parent) -{ - String table_prefix = prefix_ + "." + table_key + "."; - - name_in_config = table_key; - - cluster_pull_name = config.getString(table_prefix + "cluster_pull"); - cluster_push_name = config.getString(table_prefix + "cluster_push"); - - table_pull.first = config.getString(table_prefix + "database_pull"); - table_pull.second = config.getString(table_prefix + "table_pull"); - - table_push.first = config.getString(table_prefix + "database_push"); - table_push.second = config.getString(table_prefix + "table_push"); - - /// Used as node name in ZooKeeper - table_id = escapeForFileName(cluster_push_name) - + "." + escapeForFileName(table_push.first) - + "." + escapeForFileName(table_push.second); - - engine_push_str = config.getString(table_prefix + "engine"); - { - ParserStorage parser_storage; - engine_push_ast = parseQuery(parser_storage, engine_push_str, 0); - engine_push_partition_key_ast = extractPartitionKey(engine_push_ast); - } - - sharding_key_str = config.getString(table_prefix + "sharding_key"); - { - ParserExpressionWithOptionalAlias parser_expression(false); - sharding_key_ast = parseQuery(parser_expression, sharding_key_str, 0); - engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast); - } - - where_condition_str = config.getString(table_prefix + "where_condition", ""); - if (!where_condition_str.empty()) - { - ParserExpressionWithOptionalAlias parser_expression(false); - where_condition_ast = parseQuery(parser_expression, where_condition_str, 0); - - // Will use canonical expression form - where_condition_str = queryToString(where_condition_ast); - } - - String enabled_partitions_prefix = table_prefix + "enabled_partitions"; - has_enabled_partitions = config.has(enabled_partitions_prefix); - - if (has_enabled_partitions) - { - Strings keys; - config.keys(enabled_partitions_prefix, keys); - - if (keys.empty()) - { - /// Parse list of partition from space-separated string - String partitions_str = config.getString(table_prefix + "enabled_partitions"); - boost::trim_if(partitions_str, isWhitespaceASCII); - boost::split(enabled_partitions, partitions_str, isWhitespaceASCII, boost::token_compress_on); - } - else - { - /// Parse sequence of ... - for (const String & key : keys) - { - if (!startsWith(key, "partition")) - throw Exception("Unknown key " + key + " in " + enabled_partitions_prefix, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); - - enabled_partitions.emplace_back(config.getString(enabled_partitions_prefix + "." + key)); - } - } - - std::copy(enabled_partitions.begin(), enabled_partitions.end(), std::inserter(enabled_partitions_set, enabled_partitions_set.begin())); - } -} - - -static ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random) -{ - ShardPriority res; - - if (replicas.empty()) - return res; - - res.is_remote = 1; - for (auto & replica : replicas) - { - if (isLocalAddress(DNSResolver::instance().resolveHost(replica.host_name))) - { - res.is_remote = 0; - break; - } - } - - res.hostname_difference = std::numeric_limits::max(); - for (auto & replica : replicas) - { - size_t difference = getHostNameDifference(local_hostname, replica.host_name); - res.hostname_difference = std::min(difference, res.hostname_difference); - } - - res.random = random; - return res; -} - -template -void TaskTable::initShards(RandomEngine && random_engine) -{ - const String & fqdn_name = getFQDNOrHostName(); - std::uniform_int_distribution get_urand(0, std::numeric_limits::max()); - - // Compute the priority - for (auto & shard_info : cluster_pull->getShardsInfo()) - { - TaskShardPtr task_shard = std::make_shared(*this, shard_info); - const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster()); - task_shard->priority = getReplicasPriority(replicas, fqdn_name, get_urand(random_engine)); - - all_shards.emplace_back(task_shard); - } - - // Sort by priority - std::sort(all_shards.begin(), all_shards.end(), - [] (const TaskShardPtr & lhs, const TaskShardPtr & rhs) - { - return ShardPriority::greaterPriority(lhs->priority, rhs->priority); - }); - - // Cut local shards - auto it_first_remote = std::lower_bound(all_shards.begin(), all_shards.end(), 1, - [] (const TaskShardPtr & lhs, UInt8 is_remote) - { - return lhs->priority.is_remote < is_remote; - }); - - local_shards.assign(all_shards.begin(), it_first_remote); -} - - -void DB::TaskCluster::loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key) -{ - String prefix = base_key.empty() ? "" : base_key + "."; - - clusters_prefix = prefix + "remote_servers"; - if (!config.has(clusters_prefix)) - throw Exception("You should specify list of clusters in " + clusters_prefix, ErrorCodes::BAD_ARGUMENTS); - - Poco::Util::AbstractConfiguration::Keys tables_keys; - config.keys(prefix + "tables", tables_keys); - - for (const auto & table_key : tables_keys) - { - table_tasks.emplace_back(*this, config, prefix + "tables", table_key); - } -} - -void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key) -{ - String prefix = base_key.empty() ? "" : base_key + "."; - - max_workers = config.getUInt64(prefix + "max_workers"); - - settings_common = Settings(); - if (config.has(prefix + "settings")) - settings_common.loadSettingsFromConfig(prefix + "settings", config); - - settings_pull = settings_common; - if (config.has(prefix + "settings_pull")) - settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config); - - settings_push = settings_common; - if (config.has(prefix + "settings_push")) - settings_push.loadSettingsFromConfig(prefix + "settings_push", config); - - auto set_default_value = [] (auto && setting, auto && default_value) - { - setting = setting.changed ? setting.value : default_value; - }; - - /// Override important settings - settings_pull.readonly = 1; - settings_push.insert_distributed_sync = 1; - set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME); - set_default_value(settings_pull.max_threads, 1); - set_default_value(settings_pull.max_block_size, 8192UL); - set_default_value(settings_pull.preferred_block_size_bytes, 0); - set_default_value(settings_push.insert_distributed_timeout, 0); -} - - -} // end of an anonymous namespace - class ClusterCopier { public: - ClusterCopier(const String & task_path_, - const String & host_id_, - const String & proxy_database_name_, + ClusterCopier(String task_path_, + String host_id_, + String proxy_database_name_, Context & context_) : - task_zookeeper_path(task_path_), - host_id(host_id_), - working_database_name(proxy_database_name_), + task_zookeeper_path(std::move(task_path_)), + host_id(std::move(host_id_)), + working_database_name(std::move(proxy_database_name_)), context(context_), log(&Poco::Logger::get("ClusterCopier")) { @@ -940,14 +221,14 @@ public: task_description_watch_zookeeper = zookeeper; String task_config_str; - Coordination::Stat stat; + Coordination::Stat stat{}; int code; zookeeper->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code); if (code) throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS); - LOG_DEBUG(log, "Loading description, zxid=" << task_descprtion_current_stat.czxid); + LOG_DEBUG(log, "Loading description, zxid=" << task_description_current_stat.czxid); auto config = getConfigurationFromXMLString(task_config_str); /// Setup settings @@ -955,7 +236,7 @@ public: context.getSettingsRef() = task_cluster->settings_common; task_cluster_current_config = config; - task_descprtion_current_stat = stat; + task_description_current_stat = stat; } void updateConfigIfNeeded() @@ -1092,7 +373,7 @@ protected: { updateConfigIfNeeded(); - Coordination::Stat stat; + Coordination::Stat stat{}; zookeeper->get(workers_version_path, &stat); auto version = stat.version; zookeeper->get(workers_path, &stat); @@ -1186,7 +467,7 @@ protected: task_table.getPartitionIsDirtyPath(partition_name), task_table.getPartitionIsCleanedPath(partition_name) ); - Coordination::Stat stat; + Coordination::Stat stat{}; LogicalClock task_start_clock; if (zookeeper->exists(task_table.getPartitionTaskStatusPath(partition_name), &stat)) task_start_clock = LogicalClock(stat.mzxid); @@ -1264,7 +545,7 @@ protected: } /// Replaces ENGINE and table name in a create query - std::shared_ptr rewriteCreateQueryStorage(const ASTPtr & create_query_ast, const DatabaseAndTableName & new_table, const ASTPtr & new_storage_ast) + static std::shared_ptr rewriteCreateQueryStorage(const ASTPtr & create_query_ast, const DatabaseAndTableName & new_table, const ASTPtr & new_storage_ast) { const auto & create = create_query_ast->as(); auto res = std::make_shared(create); @@ -1291,7 +572,7 @@ protected: public: UInt32 value; - WrappingUInt32(UInt32 _value) + explicit WrappingUInt32(UInt32 _value) : value(_value) {} @@ -1315,13 +596,20 @@ protected: /** Conforming Zxid definition. * cf. https://github.com/apache/zookeeper/blob/631d1b284f0edb1c4f6b0fb221bf2428aec71aaa/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md#guarantees-properties-and-definitions + * + * But it is better to read this: https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html + * + * Actually here is the definition of Zxid. + * Every change to the ZooKeeper state receives a stamp in the form of a zxid (ZooKeeper Transaction Id). + * This exposes the total ordering of all changes to ZooKeeper. Each change will have a unique zxid + * and if zxid1 is smaller than zxid2 then zxid1 happened before zxid2. */ class Zxid { public: WrappingUInt32 epoch; WrappingUInt32 counter; - Zxid(UInt64 _zxid) + explicit Zxid(UInt64 _zxid) : epoch(_zxid >> 32) , counter(_zxid) {} @@ -1338,6 +626,51 @@ protected: } }; + /* When multiple ClusterCopiers discover that the target partition is not empty, + * they will attempt to clean up this partition before proceeding to copying. + * + * Instead of purging is_dirty, the history of cleaning work is preserved and partition hygiene is established + * based on a happens-before relation between the events. + * This relation is encoded by LogicalClock based on the mzxid of the is_dirty ZNode and is_dirty/cleaned. + * The fact of the partition hygiene is encoded by CleanStateClock. + * + * For you to know what mzxid means: + * + * ZooKeeper Stat Structure: + * The Stat structure for each znode in ZooKeeper is made up of the following fields: + * + * -- czxid + * The zxid of the change that caused this znode to be created. + * + * -- mzxid + * The zxid of the change that last modified this znode. + * + * -- ctime + * The time in milliseconds from epoch when this znode was created. + * + * -- mtime + * The time in milliseconds from epoch when this znode was last modified. + * + * -- version + * The number of changes to the data of this znode. + * + * -- cversion + * The number of changes to the children of this znode. + * + * -- aversion + * The number of changes to the ACL of this znode. + * + * -- ephemeralOwner + * The session id of the owner of this znode if the znode is an ephemeral node. + * If it is not an ephemeral node, it will be zero. + * + * -- dataLength + * The length of the data field of this znode. + * + * -- numChildren + * The number of children of this znode. + * */ + class LogicalClock { public: @@ -1345,7 +678,7 @@ protected: LogicalClock() = default; - LogicalClock(UInt64 _zxid) + explicit LogicalClock(UInt64 _zxid) : zxid(_zxid) {} @@ -1354,7 +687,7 @@ protected: return bool(zxid); } - // happens-before relation with a reasonable time bound + /// happens-before relation with a reasonable time bound bool happensBefore(const LogicalClock & other) const { return !zxid @@ -1366,13 +699,14 @@ protected: return happensBefore(other); } - // strict equality check + /// strict equality check bool operator==(const LogicalClock & other) const { return zxid == other.zxid; } }; + class CleanStateClock { public: @@ -1404,7 +738,7 @@ protected: const String & clean_state_path) : stale(std::make_shared(false)) { - Coordination::Stat stat; + Coordination::Stat stat{}; String _some_data; auto watch_callback = [stale = stale] (const Coordination::WatchResponse & rsp) @@ -1462,13 +796,13 @@ protected: const String current_shards_path = task_partition.getPartitionShardsPath(); const String current_partition_active_workers_dir = task_partition.getPartitionActiveWorkersPath(); const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath(); - const String dirt_cleaner_path = is_dirty_flag_path + "/cleaner"; - const String is_dirt_cleaned_path = task_partition.getCommonPartitionIsCleanedPath(); + const String dirty_cleaner_path = is_dirty_flag_path + "/cleaner"; + const String is_dirty_cleaned_path = task_partition.getCommonPartitionIsCleanedPath(); zkutil::EphemeralNodeHolder::Ptr cleaner_holder; try { - cleaner_holder = zkutil::EphemeralNodeHolder::create(dirt_cleaner_path, *zookeeper, host_id); + cleaner_holder = zkutil::EphemeralNodeHolder::create(dirty_cleaner_path, *zookeeper, host_id); } catch (const Coordination::Exception & e) { @@ -1482,7 +816,7 @@ protected: throw; } - Coordination::Stat stat; + Coordination::Stat stat{}; if (zookeeper->exists(current_partition_active_workers_dir, &stat)) { if (stat.numChildren != 0) @@ -1517,7 +851,7 @@ protected: // Lock the dirty flag zookeeper->set(is_dirty_flag_path, host_id, clean_state_clock.discovery_version.value()); zookeeper->tryRemove(task_partition.getPartitionCleanStartPath()); - CleanStateClock my_clock(zookeeper, is_dirty_flag_path, is_dirt_cleaned_path); + CleanStateClock my_clock(zookeeper, is_dirty_flag_path, is_dirty_cleaned_path); /// Remove all status nodes { @@ -1556,9 +890,9 @@ protected: { zookeeper->set(is_dirty_flag_path, host_id, my_clock.discovery_version.value()); if (my_clock.clean_state_version) - zookeeper->set(is_dirt_cleaned_path, host_id, my_clock.clean_state_version.value()); + zookeeper->set(is_dirty_cleaned_path, host_id, my_clock.clean_state_version.value()); else - zookeeper->create(is_dirt_cleaned_path, host_id, zkutil::CreateMode::Persistent); + zookeeper->create(is_dirty_cleaned_path, host_id, zkutil::CreateMode::Persistent); } else { @@ -1582,7 +916,7 @@ protected: bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table) { - /// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint + /// A heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint bool previous_shard_is_instantly_finished = false; /// Process each partition that is present in cluster @@ -1594,6 +928,7 @@ protected: ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name]; Stopwatch watch; + /// We will check all the shards of the table and check if they contain current partition. TasksShard expected_shards; UInt64 num_failed_shards = 0; @@ -1637,6 +972,8 @@ protected: } auto it_shard_partition = shard->partition_tasks.find(partition_name); + /// Previously when we discovered that shard does not contain current partition, we skipped it. + /// At this moment partition have to be present. if (it_shard_partition == shard->partition_tasks.end()) throw Exception("There are no such partition in a shard. This is a bug.", ErrorCodes::LOGICAL_ERROR); auto & partition = it_shard_partition->second; @@ -1738,6 +1075,7 @@ protected: Error, }; + /// Job for copying partition from particular shard. PartitionTaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task) { PartitionTaskStatus res; @@ -1770,6 +1108,8 @@ protected: TaskShard & task_shard = task_partition.task_shard; TaskTable & task_table = task_shard.task_table; ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name); + const size_t number_of_splits = task_table.number_of_splits; + UNUSED(number_of_splits); /// We need to update table definitions for each partition, it could be changed after ALTER createShardInternalTables(timeouts, task_shard); @@ -1777,7 +1117,7 @@ protected: auto zookeeper = context.getZooKeeper(); const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath(); - const String is_dirt_cleaned_path = task_partition.getCommonPartitionIsCleanedPath(); + const String is_dirty_cleaned_path = task_partition.getCommonPartitionIsCleanedPath(); const String current_task_is_active_path = task_partition.getActiveWorkerPath(); const String current_task_status_path = task_partition.getShardStatusPath(); @@ -1803,12 +1143,15 @@ protected: }; /// Returns SELECT query filtering current partition and applying user filter - auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, String limit = "") + auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, String limit = "", + bool enable_splitting = false, size_t current_piece_number = 0) { String query; query += "SELECT " + fields + " FROM " + getQuotedTable(from_table); /// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field) query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + task_partition.name + " AS partition_key))"; + if (enable_splitting) + query += " AND ( cityHash64(*) = " + std::to_string(current_piece_number) + " )"; if (!task_table.where_condition_str.empty()) query += " AND (" + task_table.where_condition_str + ")"; if (!limit.empty()) @@ -1823,11 +1166,11 @@ protected: LOG_DEBUG(log, "Processing " << current_task_status_path); - CleanStateClock clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path); + CleanStateClock clean_state_clock (zookeeper, is_dirty_flag_path, is_dirty_cleaned_path); LogicalClock task_start_clock; { - Coordination::Stat stat; + Coordination::Stat stat{}; if (zookeeper->exists(task_partition.getPartitionShardsPath(), &stat)) task_start_clock = LogicalClock(stat.mzxid); } @@ -1887,7 +1230,8 @@ protected: } // Task is abandoned, initialize DROP PARTITION - LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner << ". Partition will be dropped and refilled."); + LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << + status.owner << ". Partition will be dropped and refilled."); create_is_dirty_node(clean_state_clock); return PartitionTaskStatus::Error; @@ -1900,7 +1244,8 @@ protected: if (!zookeeper->tryGet(task_partition.getPartitionCleanStartPath(), clean_start_status) || clean_start_status != "ok") { zookeeper->createIfNotExists(task_partition.getPartitionCleanStartPath(), ""); - auto checker = zkutil::EphemeralNodeHolder::create(task_partition.getPartitionCleanStartPath() + "/checker", *zookeeper, host_id); + auto checker = zkutil::EphemeralNodeHolder::create(task_partition.getPartitionCleanStartPath() + "/checker", + *zookeeper, host_id); // Maybe we are the first worker ASTPtr query_select_ast = get_select_query(task_shard.table_split_shard, "count()"); UInt64 count; @@ -1916,7 +1261,7 @@ protected: if (count != 0) { - Coordination::Stat stat_shards; + Coordination::Stat stat_shards{}; zookeeper->get(task_partition.getPartitionShardsPath(), &stat_shards); /// NOTE: partition is still fresh if dirt discovery happens before cleaning @@ -1938,7 +1283,7 @@ protected: /// Try start processing, create node about it { String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id); - CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path); + CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirty_cleaned_path); if (clean_state_clock != new_clean_state_clock) { LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing"); @@ -1955,15 +1300,17 @@ protected: /// Try create table (if not exists) on each shard { - auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query, task_table.table_push, task_table.engine_push_ast); + auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query, + task_table.table_push, task_table.engine_push_ast); create_query_push_ast->as().if_not_exists = true; String query = queryToString(create_query_push_ast); LOG_DEBUG(log, "Create destination tables. Query: " << query); - UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push, + UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, + create_query_push_ast, &task_cluster->settings_push, PoolMode::GET_MANY); - LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push) << " have been created on " << shards - << " shards of " << task_table.cluster_push->getShardCount()); + LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push) << + " have been created on " << shards << " shards of " << task_table.cluster_push->getShardCount()); } /// Do the copying @@ -2073,7 +1420,7 @@ protected: /// Finalize the processing, change state of current partition task (and also check is_dirty flag) { String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id); - CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirt_cleaned_path); + CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirty_cleaned_path); if (clean_state_clock != new_clean_state_clock) { LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing"); @@ -2265,7 +1612,7 @@ protected: UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index); num_successful_executions = 0; - auto increment_and_check_exit = [&] () + auto increment_and_check_exit = [&] () -> bool { ++num_successful_executions; return max_successful_executions_per_shard && num_successful_executions >= max_successful_executions_per_shard; @@ -2348,7 +1695,7 @@ private: ConfigurationPtr task_cluster_initial_config; ConfigurationPtr task_cluster_current_config; - Coordination::Stat task_descprtion_current_stat{}; + Coordination::Stat task_description_current_stat{}; std::unique_ptr task_cluster; @@ -2373,12 +1720,11 @@ void ClusterCopierApp::initialize(Poco::Util::Application & self) config_xml_path = config().getString("config-file"); task_path = config().getString("task-path"); - log_level = config().getString("log-level", "debug"); + log_level = config().getString("log-level", "trace"); is_safe_mode = config().has("safe-mode"); if (config().has("copy-fault-probability")) copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0); base_dir = (config().has("base-dir")) ? config().getString("base-dir") : Poco::Path::current(); - // process_id is '#_' time_t timestamp = Poco::Timestamp().epochTime(); auto curr_pid = Poco::Process::id(); diff --git a/dbms/programs/copier/ClusterCopier.h b/dbms/programs/copier/ClusterCopier.h index 89f45df8686..fe228fd6194 100644 --- a/dbms/programs/copier/ClusterCopier.h +++ b/dbms/programs/copier/ClusterCopier.h @@ -71,11 +71,9 @@ private: void mainImpl(); - void setupLogging(); - std::string config_xml_path; std::string task_path; - std::string log_level = "debug"; + std::string log_level = "trace"; bool is_safe_mode = false; double copy_fault_probability = 0; bool is_help = false; diff --git a/dbms/programs/copier/Internals.h b/dbms/programs/copier/Internals.h index a25ae7b973c..5f14604fbf9 100644 --- a/dbms/programs/copier/Internals.h +++ b/dbms/programs/copier/Internals.h @@ -1,8 +1,746 @@ -// -// Created by jakalletti on 2/7/20. -// +#pragma once -#ifndef CLICKHOUSE_INTERNALS_H -#define CLICKHOUSE_INTERNALS_H +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NO_ZOOKEEPER; + extern const int BAD_ARGUMENTS; + extern const int UNKNOWN_TABLE; + extern const int UNFINISHED; + extern const int UNKNOWN_ELEMENT_IN_CONFIG; +} + +namespace +{ + +using DatabaseAndTableName = std::pair; + +String getQuotedTable(const String & database, const String & table) +{ + if (database.empty()) + { + return backQuoteIfNeed(table); + } + + return backQuoteIfNeed(database) + "." + backQuoteIfNeed(table); +} + +String getQuotedTable(const DatabaseAndTableName & db_and_table) +{ + return getQuotedTable(db_and_table.first, db_and_table.second); +} + + +enum class TaskState +{ + Started = 0, + Finished, + Unknown +}; + +/// Used to mark status of shard partition tasks +struct TaskStateWithOwner +{ + TaskStateWithOwner() = default; + TaskStateWithOwner(TaskState state_, const String & owner_) : state(state_), owner(owner_) {} + + TaskState state{TaskState::Unknown}; + String owner; + + static String getData(TaskState state, const String & owner) + { + return TaskStateWithOwner(state, owner).toString(); + } + + String toString() + { + WriteBufferFromOwnString wb; + wb << static_cast(state) << "\n" << escape << owner; + return wb.str(); + } + + static TaskStateWithOwner fromString(const String & data) + { + ReadBufferFromString rb(data); + TaskStateWithOwner res; + UInt32 state; + + rb >> state >> "\n" >> escape >> res.owner; + + if (state >= static_cast(TaskState::Unknown)) + throw Exception("Unknown state " + data, ErrorCodes::LOGICAL_ERROR); + + res.state = static_cast(state); + return res; + } +}; + + +/// Hierarchical description of the tasks +struct ShardPartition; +struct TaskShard; +struct TaskTable; +struct TaskCluster; +struct ClusterPartition; + +using TasksPartition = std::map>; +using ShardInfo = Cluster::ShardInfo; +using TaskShardPtr = std::shared_ptr; +using TasksShard = std::vector; +using TasksTable = std::list; +using ClusterPartitions = std::map>; + + +/// Just destination partition of a shard +/// I don't know what this comment means. +/// In short, when we discovered what shards contain currently processing partition, +/// This class describes a partition (name) that is stored on the shard (parent). +struct ShardPartition +{ + ShardPartition(TaskShard & parent, String name_quoted_) : task_shard(parent), name(std::move(name_quoted_)) {} + + String getPartitionPath() const; + String getPartitionCleanStartPath() const; + String getCommonPartitionIsDirtyPath() const; + String getCommonPartitionIsCleanedPath() const; + String getPartitionActiveWorkersPath() const; + String getActiveWorkerPath() const; + String getPartitionShardsPath() const; + String getShardStatusPath() const; + + TaskShard & task_shard; + String name; +}; + + +struct ShardPriority +{ + UInt8 is_remote = 1; + size_t hostname_difference = 0; + UInt8 random = 0; + + static bool greaterPriority(const ShardPriority & current, const ShardPriority & other) + { + return std::forward_as_tuple(current.is_remote, current.hostname_difference, current.random) + < std::forward_as_tuple(other.is_remote, other.hostname_difference, other.random); + } +}; + +/// Tables has many shards and table's partiton can be stored on different shards. +/// When we copy partition we have to discover it's shards (shards which store this partition) +/// For simplier retrieval of which partitions are stored in particular shard we created TaskShard. +struct TaskShard +{ + TaskShard(TaskTable & parent, const ShardInfo & info_) : task_table(parent), info(info_) {} + + TaskTable & task_table; + + ShardInfo info; + UInt32 numberInCluster() const { return info.shard_num; } + UInt32 indexInCluster() const { return info.shard_num - 1; } + + String getDescription() const; + String getHostNameExample() const; + + /// Used to sort clusters by their proximity + ShardPriority priority; + + /// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard + ColumnWithTypeAndName partition_key_column; + + /// There is a task for each destination partition + TasksPartition partition_tasks; + + /// Which partitions have been checked for existence + /// If some partition from this lists is exists, it is in partition_tasks + std::set checked_partitions; + + /// Last CREATE TABLE query of the table of the shard + ASTPtr current_pull_table_create_query; + + /// Internal distributed tables + DatabaseAndTableName table_read_shard; + DatabaseAndTableName table_split_shard; +}; + + +/// Contains info about all shards that contain a partition +struct ClusterPartition +{ + double elapsed_time_seconds = 0; + UInt64 bytes_copied = 0; + UInt64 rows_copied = 0; + UInt64 blocks_copied = 0; + + UInt64 total_tries = 0; +}; + + +struct TaskTable +{ + TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix, + const String & table_key); + + TaskCluster & task_cluster; + + String getPartitionPath(const String & partition_name) const; + [[maybe_unused]] String getPartitionPathWithPieceNumber(const String & partition_name, size_t current_piece_number) const; + String getPartitionIsDirtyPath(const String & partition_name) const; + String getPartitionIsCleanedPath(const String & partition_name) const; + String getPartitionTaskStatusPath(const String & partition_name) const; + + /// Partitions will be splitted into number-of-splits pieces. + /// Each piece will be copied independently. (10 by default) + size_t number_of_splits; + + String name_in_config; + + /// Used as task ID + String table_id; + + /// Source cluster and table + String cluster_pull_name; + DatabaseAndTableName table_pull; + + /// Destination cluster and table + String cluster_push_name; + DatabaseAndTableName table_push; + + /// Storage of destination table + String engine_push_str; + ASTPtr engine_push_ast; + ASTPtr engine_push_partition_key_ast; + + /// A Distributed table definition used to split data + String sharding_key_str; + ASTPtr sharding_key_ast; + ASTPtr engine_split_ast; + + /// Additional WHERE expression to filter input data + String where_condition_str; + ASTPtr where_condition_ast; + + /// Resolved clusters + ClusterPtr cluster_pull; + ClusterPtr cluster_push; + + /// Filter partitions that should be copied + bool has_enabled_partitions = false; + Strings enabled_partitions; + NameSet enabled_partitions_set; + + /// Prioritized list of shards + /// all_shards contains information about all shards in the table. + /// So we have to check whether particular shard have current partiton or not while processing. + TasksShard all_shards; + TasksShard local_shards; + + /// All partitions of the current table. + ClusterPartitions cluster_partitions; + NameSet finished_cluster_partitions; + + /// Parition names to process in user-specified order + Strings ordered_partition_names; + + ClusterPartition & getClusterPartition(const String & partition_name) + { + auto it = cluster_partitions.find(partition_name); + if (it == cluster_partitions.end()) + throw Exception("There are no cluster partition " + partition_name + " in " + table_id, ErrorCodes::LOGICAL_ERROR); + return it->second; + } + + Stopwatch watch; + UInt64 bytes_copied = 0; + UInt64 rows_copied = 0; + + template + void initShards(RandomEngine && random_engine); +}; + + +struct TaskCluster +{ + TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_) + : task_zookeeper_path(task_zookeeper_path_), default_local_database(default_local_database_) {} + + void loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key = ""); + + /// Set (or update) settings and max_workers param + void reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key = ""); + + /// Base node for all tasks. Its structure: + /// workers/ - directory with active workers (amount of them is less or equal max_workers) + /// description - node with task configuration + /// table_table1/ - directories with per-partition copying status + String task_zookeeper_path; + + /// Database used to create temporary Distributed tables + String default_local_database; + + /// Limits number of simultaneous workers + UInt64 max_workers = 0; + + /// Base settings for pull and push + Settings settings_common; + /// Settings used to fetch data + Settings settings_pull; + /// Settings used to insert data + Settings settings_push; + + String clusters_prefix; + + /// Subtasks + TasksTable table_tasks; + + std::random_device random_device; + pcg64 random_engine; +}; + + + +struct MultiTransactionInfo +{ + int32_t code; + Coordination::Requests requests; + Coordination::Responses responses; +}; + +// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key]) +std::shared_ptr createASTStorageDistributed( + const String & cluster_name, const String & database, const String & table, const ASTPtr & sharding_key_ast = nullptr) +{ + auto args = std::make_shared(); + args->children.emplace_back(std::make_shared(cluster_name)); + args->children.emplace_back(std::make_shared(database)); + args->children.emplace_back(std::make_shared(table)); + if (sharding_key_ast) + args->children.emplace_back(sharding_key_ast); + + auto engine = std::make_shared(); + engine->name = "Distributed"; + engine->arguments = args; + + auto storage = std::make_shared(); + storage->set(storage->engine, engine); + + return storage; +} + + +BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream) +{ + return std::make_shared( + stream, + std::numeric_limits::max(), + std::numeric_limits::max()); +} + +Block getBlockWithAllStreamData(const BlockInputStreamPtr & stream) +{ + return squashStreamIntoOneBlock(stream)->read(); +} + + +/// Path getters + +String TaskTable::getPartitionPath(const String & partition_name) const +{ + return task_cluster.task_zookeeper_path // root + + "/tables/" + table_id // tables/dst_cluster.merge.hits + + "/" + escapeForFileName(partition_name); // 201701 +} + +String TaskTable::getPartitionPathWithPieceNumber(const String & partition_name, size_t current_piece_number) const +{ + return getPartitionPath(partition_name) + "/" + std::to_string(current_piece_number); // 1...number_of_splits +} + +String ShardPartition::getPartitionCleanStartPath() const +{ + return getPartitionPath() + "/clean_start"; +} + +String ShardPartition::getPartitionPath() const +{ + return task_shard.task_table.getPartitionPath(name); +} + +String ShardPartition::getShardStatusPath() const +{ + // schema: //tables/
//shards/ + // e.g. /root/table_test.hits/201701/shards/1 + return getPartitionShardsPath() + "/" + toString(task_shard.numberInCluster()); +} + +String ShardPartition::getPartitionShardsPath() const +{ + return getPartitionPath() + "/shards"; +} + +String ShardPartition::getPartitionActiveWorkersPath() const +{ + return getPartitionPath() + "/partition_active_workers"; +} + +String ShardPartition::getActiveWorkerPath() const +{ + return getPartitionActiveWorkersPath() + "/" + toString(task_shard.numberInCluster()); +} + +String ShardPartition::getCommonPartitionIsDirtyPath() const +{ + return getPartitionPath() + "/is_dirty"; +} + +String ShardPartition::getCommonPartitionIsCleanedPath() const +{ + return getCommonPartitionIsDirtyPath() + "/cleaned"; +} + +String TaskTable::getPartitionIsDirtyPath(const String & partition_name) const +{ + return getPartitionPath(partition_name) + "/is_dirty"; +} + +String TaskTable::getPartitionIsCleanedPath(const String & partition_name) const +{ + return getPartitionIsDirtyPath(partition_name) + "/cleaned"; +} + +String TaskTable::getPartitionTaskStatusPath(const String & partition_name) const +{ + return getPartitionPath(partition_name) + "/shards"; +} + +String TaskShard::getDescription() const +{ + std::stringstream ss; + ss << "N" << numberInCluster() + << " (having a replica " << getHostNameExample() + << ", pull table " + getQuotedTable(task_table.table_pull) + << " of cluster " + task_table.cluster_pull_name << ")"; + return ss.str(); +} + +String TaskShard::getHostNameExample() const +{ + auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster()); + return replicas.at(0).readableString(); +} + + +static bool isExtendedDefinitionStorage(const ASTPtr & storage_ast) +{ + const auto & storage = storage_ast->as(); + return storage.partition_by || storage.order_by || storage.sample_by; +} + +static ASTPtr extractPartitionKey(const ASTPtr & storage_ast) +{ + String storage_str = queryToString(storage_ast); + + const auto & storage = storage_ast->as(); + const auto & engine = storage.engine->as(); + + if (!endsWith(engine.name, "MergeTree")) + { + throw Exception("Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported", + ErrorCodes::BAD_ARGUMENTS); + } + + if (isExtendedDefinitionStorage(storage_ast)) + { + if (storage.partition_by) + return storage.partition_by->clone(); + + static const char * all = "all"; + return std::make_shared(Field(all, strlen(all))); + } + else + { + bool is_replicated = startsWith(engine.name, "Replicated"); + size_t min_args = is_replicated ? 3 : 1; + + if (!engine.arguments) + throw Exception("Expected arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS); + + ASTPtr arguments_ast = engine.arguments->clone(); + ASTs & arguments = arguments_ast->children; + + if (arguments.size() < min_args) + throw Exception("Expected at least " + toString(min_args) + " arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS); + + ASTPtr & month_arg = is_replicated ? arguments[2] : arguments[1]; + return makeASTFunction("toYYYYMM", month_arg->clone()); + } +} + + +TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix_, + const String & table_key) + : task_cluster(parent) +{ + String table_prefix = prefix_ + "." + table_key + "."; + + name_in_config = table_key; + + number_of_splits = config.getUInt64("number_of_splits", 10); + + cluster_pull_name = config.getString(table_prefix + "cluster_pull"); + cluster_push_name = config.getString(table_prefix + "cluster_push"); + + table_pull.first = config.getString(table_prefix + "database_pull"); + table_pull.second = config.getString(table_prefix + "table_pull"); + + table_push.first = config.getString(table_prefix + "database_push"); + table_push.second = config.getString(table_prefix + "table_push"); + + /// Used as node name in ZooKeeper + table_id = escapeForFileName(cluster_push_name) + + "." + escapeForFileName(table_push.first) + + "." + escapeForFileName(table_push.second); + + engine_push_str = config.getString(table_prefix + "engine"); + { + ParserStorage parser_storage; + engine_push_ast = parseQuery(parser_storage, engine_push_str, 0); + engine_push_partition_key_ast = extractPartitionKey(engine_push_ast); + } + + sharding_key_str = config.getString(table_prefix + "sharding_key"); + { + ParserExpressionWithOptionalAlias parser_expression(false); + sharding_key_ast = parseQuery(parser_expression, sharding_key_str, 0); + engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast); + } + + where_condition_str = config.getString(table_prefix + "where_condition", ""); + if (!where_condition_str.empty()) + { + ParserExpressionWithOptionalAlias parser_expression(false); + where_condition_ast = parseQuery(parser_expression, where_condition_str, 0); + + // Will use canonical expression form + where_condition_str = queryToString(where_condition_ast); + } + + String enabled_partitions_prefix = table_prefix + "enabled_partitions"; + has_enabled_partitions = config.has(enabled_partitions_prefix); + + if (has_enabled_partitions) + { + Strings keys; + config.keys(enabled_partitions_prefix, keys); + + if (keys.empty()) + { + /// Parse list of partition from space-separated string + String partitions_str = config.getString(table_prefix + "enabled_partitions"); + boost::trim_if(partitions_str, isWhitespaceASCII); + boost::split(enabled_partitions, partitions_str, isWhitespaceASCII, boost::token_compress_on); + } + else + { + /// Parse sequence of ... + for (const String & key : keys) + { + if (!startsWith(key, "partition")) + throw Exception("Unknown key " + key + " in " + enabled_partitions_prefix, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); + + enabled_partitions.emplace_back(config.getString(enabled_partitions_prefix + "." + key)); + } + } + + std::copy(enabled_partitions.begin(), enabled_partitions.end(), std::inserter(enabled_partitions_set, enabled_partitions_set.begin())); + } +} + + +static ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random) +{ + ShardPriority res; + + if (replicas.empty()) + return res; + + res.is_remote = 1; + for (auto & replica : replicas) + { + if (isLocalAddress(DNSResolver::instance().resolveHost(replica.host_name))) + { + res.is_remote = 0; + break; + } + } + + res.hostname_difference = std::numeric_limits::max(); + for (auto & replica : replicas) + { + size_t difference = getHostNameDifference(local_hostname, replica.host_name); + res.hostname_difference = std::min(difference, res.hostname_difference); + } + + res.random = random; + return res; +} + +template +void TaskTable::initShards(RandomEngine && random_engine) +{ + const String & fqdn_name = getFQDNOrHostName(); + std::uniform_int_distribution get_urand(0, std::numeric_limits::max()); + + // Compute the priority + for (auto & shard_info : cluster_pull->getShardsInfo()) + { + TaskShardPtr task_shard = std::make_shared(*this, shard_info); + const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster()); + task_shard->priority = getReplicasPriority(replicas, fqdn_name, get_urand(random_engine)); + + all_shards.emplace_back(task_shard); + } + + // Sort by priority + std::sort(all_shards.begin(), all_shards.end(), + [] (const TaskShardPtr & lhs, const TaskShardPtr & rhs) + { + return ShardPriority::greaterPriority(lhs->priority, rhs->priority); + }); + + // Cut local shards + auto it_first_remote = std::lower_bound(all_shards.begin(), all_shards.end(), 1, + [] (const TaskShardPtr & lhs, UInt8 is_remote) + { + return lhs->priority.is_remote < is_remote; + }); + + local_shards.assign(all_shards.begin(), it_first_remote); +} + + +void TaskCluster::loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key) +{ + String prefix = base_key.empty() ? "" : base_key + "."; + + clusters_prefix = prefix + "remote_servers"; + if (!config.has(clusters_prefix)) + throw Exception("You should specify list of clusters in " + clusters_prefix, ErrorCodes::BAD_ARGUMENTS); + + Poco::Util::AbstractConfiguration::Keys tables_keys; + config.keys(prefix + "tables", tables_keys); + + for (const auto & table_key : tables_keys) + { + table_tasks.emplace_back(*this, config, prefix + "tables", table_key); + } +} + +void TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key) +{ + String prefix = base_key.empty() ? "" : base_key + "."; + + max_workers = config.getUInt64(prefix + "max_workers"); + + settings_common = Settings(); + if (config.has(prefix + "settings")) + settings_common.loadSettingsFromConfig(prefix + "settings", config); + + settings_pull = settings_common; + if (config.has(prefix + "settings_pull")) + settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config); + + settings_push = settings_common; + if (config.has(prefix + "settings_push")) + settings_push.loadSettingsFromConfig(prefix + "settings_push", config); + + auto set_default_value = [] (auto && setting, auto && default_value) + { + setting = setting.changed ? setting.value : default_value; + }; + + /// Override important settings + settings_pull.readonly = 1; + settings_push.insert_distributed_sync = 1; + set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME); + set_default_value(settings_pull.max_threads, 1); + set_default_value(settings_pull.max_block_size, 8192UL); + set_default_value(settings_pull.preferred_block_size_bytes, 0); + set_default_value(settings_push.insert_distributed_timeout, 0); +} + + +} // end of an anonymous namespace +} -#endif //CLICKHOUSE_INTERNALS_H