diff --git a/dbms/src/Common/ZooKeeper/Types.h b/dbms/src/Common/ZooKeeper/Types.h index 1e010008f25..ac96c10e58a 100644 --- a/dbms/src/Common/ZooKeeper/Types.h +++ b/dbms/src/Common/ZooKeeper/Types.h @@ -131,11 +131,12 @@ private: struct OpResult : public zoo_op_result_t { - /// Указатели в этой структуре указывают на поля в классе Op. - /// Поэтому деструктор не нужен + /// Pointers in this class point to fields of class Op. + /// Op instances have the same (or longer lifetime), therefore destructor is not required. }; -using Ops = std::vector>; +using OpPtr = std::unique_ptr; +using Ops = std::vector; using OpResults = std::vector; using OpResultsPtr = std::shared_ptr; using Strings = std::vector; diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp index 52d19387e62..ab0abc8713c 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp @@ -28,6 +28,15 @@ namespace CurrentMetrics } +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} +} + + namespace zkutil { @@ -1001,4 +1010,19 @@ ZooKeeper::MultiFuture ZooKeeper::asyncMulti(const zkutil::Ops & ops) return asyncMultiImpl(ops, true); } + +size_t getFailedOpIndex(const OpResultsPtr & op_results) +{ + if (!op_results) + throw DB::Exception("OpResults is nullptr", DB::ErrorCodes::LOGICAL_ERROR); + + for (size_t index = 0; index < op_results->size(); ++index) + { + if ((*op_results)[index].err != ZOK) + return index; + } + + throw DB::Exception("There is no failed OpResult", DB::ErrorCodes::LOGICAL_ERROR); +} + } diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.h b/dbms/src/Common/ZooKeeper/ZooKeeper.h index 40f84bce4a1..382b0d21a17 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.h @@ -420,6 +420,11 @@ private: bool is_dirty = false; }; + +/// Returns first op which op_result != ZOK or throws an exception +size_t getFailedOpIndex(const OpResultsPtr & op_results); + + using ZooKeeperPtr = ZooKeeper::Ptr; diff --git a/dbms/src/DataStreams/IBlockOutputStream.h b/dbms/src/DataStreams/IBlockOutputStream.h index 83f66202e85..fe56c32b285 100644 --- a/dbms/src/DataStreams/IBlockOutputStream.h +++ b/dbms/src/DataStreams/IBlockOutputStream.h @@ -4,6 +4,7 @@ #include #include #include +#include "IBlockInputStream.h" namespace DB diff --git a/dbms/src/DataStreams/copyData.cpp b/dbms/src/DataStreams/copyData.cpp index dcf908b1f9a..cf015c38b79 100644 --- a/dbms/src/DataStreams/copyData.cpp +++ b/dbms/src/DataStreams/copyData.cpp @@ -16,20 +16,21 @@ bool isAtomicSet(std::atomic * val) } -void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic * is_cancelled) +template +void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, Pred && is_cancelled) { from.readPrefix(); to.writePrefix(); while (Block block = from.read()) { - if (isAtomicSet(is_cancelled)) + if (is_cancelled()) break; to.write(block); } - if (isAtomicSet(is_cancelled)) + if (is_cancelled()) return; /// For outputting additional information in some formats. @@ -42,11 +43,28 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomicgetExtremes()); } - if (isAtomicSet(is_cancelled)) + if (is_cancelled()) return; from.readSuffix(); to.writeSuffix(); } + +void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic * is_cancelled) +{ + auto is_cancelled_pred = [is_cancelled] () + { + return isAtomicSet(is_cancelled); + }; + + copyDataImpl(from, to, is_cancelled_pred); +} + + +void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & is_cancelled) +{ + copyDataImpl(from, to, is_cancelled); +} + } diff --git a/dbms/src/DataStreams/copyData.h b/dbms/src/DataStreams/copyData.h index 2a42ef191cb..6e8d54806c4 100644 --- a/dbms/src/DataStreams/copyData.h +++ b/dbms/src/DataStreams/copyData.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB @@ -14,4 +15,6 @@ class IBlockOutputStream; */ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic * is_cancelled = nullptr); +void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & is_cancelled); + } diff --git a/dbms/src/Server/ClusterCopier.cpp b/dbms/src/Server/ClusterCopier.cpp index 318e5dd6f65..52be9afc1e5 100644 --- a/dbms/src/Server/ClusterCopier.cpp +++ b/dbms/src/Server/ClusterCopier.cpp @@ -52,6 +52,10 @@ #include #include #include +#include +#include +#include +#include namespace DB @@ -62,6 +66,7 @@ namespace ErrorCodes extern const int NO_ZOOKEEPER; extern const int BAD_ARGUMENTS; extern const int UNKNOWN_TABLE; + extern const int UNFINISHED; } @@ -86,12 +91,6 @@ namespace using DatabaseAndTableName = std::pair; -struct TaskCluster; -struct TaskTable; -struct TaskShard; -struct TaskPartition; - - enum class TaskState { Started = 0, @@ -108,6 +107,11 @@ struct TaskStateWithOwner TaskState state{TaskState::Unknown}; String owner; + static String getData(TaskState state, const String & owner) + { + return TaskStateWithOwner(state, owner).toString(); + } + String toString() { WriteBufferFromOwnString wb; @@ -132,26 +136,35 @@ struct TaskStateWithOwner }; +/// Hierarchical task description +struct TaskPartition; +struct TaskShard; +struct TaskTable; +struct TaskCluster; + +using TasksPartition = std::vector; +using ShardInfo = Cluster::ShardInfo; +using TaskShardPtr = TaskShard *; +using TasksShard = std::vector; +using TasksShardPtrs = std::vector; +using TasksTable = std::list; +using PartitionToShards = std::map; + struct TaskPartition { TaskPartition(TaskShard & parent, const String & name_) : task_shard(parent), name(name_) {} - String getCommonPartitionZooKeeperPath() const; - String getZooKeeperPath() const; + String getCommonPartitionPath() const; + String getCommonPartitionIsDirtyPath() const; + String getCommonPartitionActiveWorkersPath() const; + String getActiveWorkerPath() const; + String getCommonPartitionShardsPath() const; + String getShardStatusPath() const; TaskShard & task_shard; String name; - - String create_query_pull; - ASTPtr create_query_pull_ast; - ASTPtr create_query_push; }; -using TasksPartition = std::vector; - - -using ShardInfo = Cluster::ShardInfo; - struct TaskShard { TaskShard(TaskTable & parent, const ShardInfo & info_) : task_table(parent), info(info_) {} @@ -164,11 +177,6 @@ struct TaskShard TasksPartition partitions; }; -using TaskShardPtr = TaskShard *; -using TasksShard = std::vector; -using TasksShardPtrs = std::vector; - - struct TaskTable { TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix, @@ -182,12 +190,10 @@ struct TaskTable /// Source cluster and table String cluster_pull_name; DatabaseAndTableName table_pull; - String db_table_pull; /// Destination cluster and table String cluster_push_name; DatabaseAndTableName table_push; - String db_table_push; /// Storage of destination table String engine_push_str; @@ -197,8 +203,8 @@ struct TaskTable DatabaseAndTableName table_split; String sharding_key_str; ASTPtr sharding_key_ast; - String engine_proxy_str; - ASTPtr engine_proxy_ast; + ASTPtr engine_split_ast; + String engine_split_str; /// Additional WHERE expression to filter input data String where_condition_str; @@ -220,13 +226,12 @@ struct TaskTable /// Prioritized list of shards Shards shards_pull; + PartitionToShards partition_to_shards; + template void initShards(URNG && urng); }; -using TasksTable = std::list; - - struct TaskCluster { TaskCluster(const String & task_zookeeper_path_, const Poco::Util::AbstractConfiguration & config, const String & base_key, const String & default_local_database_); @@ -269,6 +274,43 @@ String getDatabaseDotTable(const DatabaseAndTableName & db_and_table) return getDatabaseDotTable(db_and_table.first, db_and_table.second); } + +/// Detailed status of ZooKeeper multi operation +struct MultiOpStatus +{ + int32_t code = ZOK; + int failed_op_index = 0; + zkutil::OpPtr failed_op; +}; + +/// Atomically checks that is_dirty node is not exists, and made the remaining op +/// Returns relative number of failed operation in the second field (the passed op has 0 index) +static MultiOpStatus checkNoNodeAndCommit( + const zkutil::ZooKeeperPtr & zookeeper, + const String & checking_node_path, + zkutil::OpPtr && op) +{ + zkutil::Ops ops; + ops.emplace_back(std::make_unique(checking_node_path, "", zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); + ops.emplace_back(std::make_unique(checking_node_path, -1)); + ops.emplace_back(std::move(op)); + + MultiOpStatus res; + zkutil::OpResultsPtr results; + + res.code = zookeeper->tryMulti(ops, &results); + if (res.code != ZOK) + { + auto index = zkutil::getFailedOpIndex(results); + res.failed_op_index = static_cast(index) - 2; + res.failed_op = ops.at(index)->clone(); + } + + return res; +} + + +// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key]) std::shared_ptr createASTStorageDistributed( const String & cluster_name, const String & database, const String & table, const ASTPtr & sharding_key_ast = nullptr) { @@ -305,19 +347,38 @@ Block getBlockWithAllStreamData(const BlockInputStreamPtr & stream) } -String TaskPartition::getCommonPartitionZooKeeperPath() const +String TaskPartition::getCommonPartitionPath() const { return task_shard.task_table.task_cluster.task_zookeeper_path // root + "/table_" + escapeForFileName(task_shard.task_table.name_in_config) // table_test.hits + "/" + name; // 201701 } -String TaskPartition::getZooKeeperPath() const +String TaskPartition::getShardStatusPath() const { - return getCommonPartitionZooKeeperPath() // /root/table_test.hits/201701 - + "/" + toString(task_shard.info.shard_num); // 1 (the first shard) + return getCommonPartitionPath() // /root/table_test.hits/201701 + + "/shards/" + toString(task_shard.info.shard_num); // 1 (the first shard) } +String TaskPartition::getCommonPartitionShardsPath() const +{ + return getCommonPartitionPath() + "/shards"; +} + +String TaskPartition::getCommonPartitionActiveWorkersPath() const +{ + return getCommonPartitionPath() + "/active_workers"; +} + +String TaskPartition::getActiveWorkerPath() const +{ + return getCommonPartitionActiveWorkersPath() + "/partition_workers/" + toString(task_shard.info.shard_num); +} + +String TaskPartition::getCommonPartitionIsDirtyPath() const +{ + return getCommonPartitionPath() + "/is_dirty"; +} TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix_, const String & table_key) @@ -332,11 +393,9 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati table_pull.first = config.getString(table_prefix + "database_pull"); table_pull.second = config.getString(table_prefix + "table_pull"); - db_table_pull = getDatabaseDotTable(table_pull); table_push.first = config.getString(table_prefix + "database_push"); table_push.second = config.getString(table_prefix + "table_push"); - db_table_push = getDatabaseDotTable(table_push); engine_push_str = config.getString(table_prefix + "engine"); { @@ -348,8 +407,8 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati { ParserExpressionWithOptionalAlias parser_expression(false); sharding_key_ast = parseQuery(parser_expression, sharding_key_str); - engine_proxy_ast = createASTStorageDistributed(cluster_pull_name, table_pull.first, table_pull.second, sharding_key_ast); - engine_proxy_str = queryToString(engine_proxy_ast); + engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast); + engine_split_str = queryToString(engine_split_ast); table_split = DatabaseAndTableName(task_cluster.default_local_database, ".split." + name_in_config); } @@ -481,6 +540,10 @@ public: settings_pull.max_block_size = std::min(8192UL, settings_pull.max_block_size.value); settings_pull.preferred_block_size_bytes = 0; + Settings & settings_push = task_cluster->settings_push; + settings_push.insert_distributed_timeout = 0; + settings_push.insert_distributed_sync = 1; + /// Set up clusters context.setClustersConfig(task_cluster_config, task_cluster->clusters_prefix); @@ -511,7 +574,7 @@ public: LOG_DEBUG(log, "Set up table task " << task_table.name_in_config << " (" << "cluster " << task_table.cluster_pull_name - << ", table " << task_table.db_table_pull + << ", table " << getDatabaseDotTable(task_table.table_pull) << ", shard " << task_shard->info.shard_num << ")"); LOG_DEBUG(log, "There are " @@ -523,9 +586,12 @@ public: Strings partitions = getRemotePartitions(task_table.table_pull, *task_shard->connection_entry, &task_cluster->settings_pull); for (const String & partition_name : partitions) + { task_shard->partitions.emplace_back(*task_shard, partition_name); + task_table.partition_to_shards[partition_name].emplace_back(task_shard); + } - LOG_DEBUG(log, "Will fetch " << task_shard->partitions.size() << " parts"); + LOG_DEBUG(log, "Will fetch " << task_shard->partitions.size() << " partitions"); } } @@ -537,7 +603,18 @@ public: { for (TaskTable & task_table : task_cluster->table_tasks) { - for (TaskShardPtr task_shard : task_table.shards_pull.all_shards_prioritized) + auto & shards = task_table.shards_pull.all_shards_prioritized; + + if (shards.empty()) + continue; + + /// Try process all partitions of the first shard + for (TaskPartition & task_partition : shards[0]->partitions) + { + processPartitionTask(task_partition); + } + + for (TaskShardPtr task_shard : shards) { for (TaskPartition & task_partition : task_shard->partitions) { @@ -563,8 +640,6 @@ protected: { while (true) { - auto zookeeper = getZooKeeper(); - zkutil::Stat stat; zookeeper->get(getWorkersPath(), &stat); @@ -572,9 +647,7 @@ protected: { LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")" << ". Postpone processing " << task_description); - - using namespace std::literals::chrono_literals; - std::this_thread::sleep_for(1s); + std::this_thread::sleep_for(default_sleep_time); } else { @@ -602,145 +675,333 @@ protected: return res; } - bool processPartitionTask(TaskPartition & task_partition) + bool tryDropPartition(TaskPartition & task_partition, const zkutil::ZooKeeperPtr & zookeeper) { - auto zookeeper = getZooKeeper(); + TaskTable & task_table = task_partition.task_shard.task_table; - String partition_task_node = task_partition.getZooKeeperPath(); - String partition_task_active_node = partition_task_node + "/active_worker"; - String partition_task_status_node = partition_task_node + "/state"; + String current_shards_path = task_partition.getCommonPartitionShardsPath(); + String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath(); + String dirt_cleaner_path = is_dirty_flag_path + "/cleaner"; - /// Load balancing - auto worker_node_holder = createWorkerNodeAndWaitIfNeed(zookeeper, partition_task_node); - - /// Create ephemeral node to mark that we are active - zookeeper->createAncestors(partition_task_active_node); - zkutil::EphemeralNodeHolderPtr partition_task_node_holder; + zkutil::EphemeralNodeHolder::Ptr cleaner_holder; try { - partition_task_node_holder = zkutil::EphemeralNodeHolder::create(partition_task_active_node, *zookeeper, host_id); + cleaner_holder = zkutil::EphemeralNodeHolder::create(dirt_cleaner_path, *zookeeper, host_id); } - catch (const zkutil::KeeperException & e) + catch (zkutil::KeeperException & e) { if (e.code == ZNODEEXISTS) { - LOG_DEBUG(log, "Someone is already processing " << partition_task_node); + LOG_DEBUG(log, "Partition " << task_partition.name << " is cleaning now by somebody, sleep"); + std::this_thread::sleep_for(default_sleep_time); return false; } throw; } - TaskShard & task_shard = task_partition.task_shard; - TaskTable & task_table = task_shard.task_table; - - /// We need to update table definitions for each part, it could be changed after ALTER - - String create_query_pull_str = getRemoteCreateTable(task_table.table_pull, *task_shard.connection_entry, - &task_cluster->settings_pull); - - ParserCreateQuery parser_create_query; - ASTPtr create_query_pull_ast = parseQuery(parser_create_query, create_query_pull_str); - - DatabaseAndTableName table_pull(working_database_name, ".pull." + task_table.name_in_config); - DatabaseAndTableName table_split(working_database_name, ".split." + task_table.name_in_config); - - String table_pull_cluster_name = ".pull." + task_table.cluster_pull_name; - size_t current_shard_index = task_shard.info.shard_num - 1; - ClusterPtr cluster_pull_current_shard = task_table.cluster_pull->getClusterWithSingleShard(current_shard_index); - context.setCluster(table_pull_cluster_name, cluster_pull_current_shard); - - auto storage_pull_ast = createASTStorageDistributed(table_pull_cluster_name, task_table.table_pull.first, task_table.table_pull.second); - const auto & storage_split_ast = task_table.engine_proxy_ast; - - auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_pull, storage_pull_ast); - auto create_table_split_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_split, storage_split_ast); - - LOG_DEBUG(log, "Create current pull table. Query: " << queryToString(create_table_pull_ast)); - dropAndCreateLocalTable(create_table_pull_ast); - - LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast)); - dropAndCreateLocalTable(create_table_split_ast); - - auto create_query_push_ast = rewriteCreateQueryStorage(create_query_pull_ast, task_table.table_push, task_table.engine_push_ast); - LOG_DEBUG(log, "Push table create query: " << queryToString(create_query_push_ast)); - - TaskStateWithOwner start_state(TaskState::Started, host_id); - auto code = zookeeper->tryCreate(partition_task_status_node, start_state.toString(), zkutil::CreateMode::Persistent); - - if (code == ZNODEEXISTS) + zkutil::Stat stat; + if (zookeeper->exists(current_shards_path, &stat)) { - auto status = TaskStateWithOwner::fromString(zookeeper->get(partition_task_status_node)); - - if (status.state == TaskState::Finished) + if (stat.numChildren != 0) { - LOG_DEBUG(log, "Task " << partition_task_node << " has been executed by " << status.owner); - return true; - } - else - { - LOG_DEBUG(log, "Found abandoned task " << partition_task_node); - /// Restart shard + LOG_DEBUG(log, "Partition " << task_partition.name << " contains " << stat.numChildren << " active workers, sleep"); + std::this_thread::sleep_for(default_sleep_time); return false; } } - else if (code != ZOK) - throw zkutil::KeeperException(code, partition_task_status_node); - /// Try create table (if not exists) on each shard + /// Remove all status nodes + zookeeper->tryRemoveRecursive(current_shards_path); + + String query = "ALTER TABLE " + getDatabaseDotTable(task_table.table_push) + + " DROP PARTITION ID " + backQuoteIfNeed(task_partition.name); + + ClusterPtr & cluster_push = task_table.cluster_push; + Settings & settings_push = task_cluster->settings_push; + + LOG_DEBUG(log, "Execute distributed DROP PARTITION: " << query); + /// Limit number of max executing replicas to 1 + size_t num_shards = executeQueryOnCluster(cluster_push, query, nullptr, &settings_push, PoolMode::GET_ALL, 1); + + if (num_shards < cluster_push->getShardCount()) { - auto query_ast = create_query_push_ast->clone(); - typeid_cast(*query_ast).if_not_exists = true; - String query = queryToString(query_ast); - - LOG_DEBUG(log, "Create remote tables " << query); - executeQueryOnOneReplicaAtLeast(task_table.cluster_push, query_ast, query, &task_cluster->settings_push); + LOG_INFO(log, "DROP PARTITION wasn't successfully executed on " << cluster_push->getShardCount() - num_shards << " shards"); + return false; } - /// Do the main action + /// Remove the locking node + zookeeper->remove(is_dirty_flag_path); + return true; + } + + bool processPartitionTask(TaskPartition & task_partition) + { + try { - String query; - { - std::stringstream ss; - ss << "INSERT INTO " << getDatabaseDotTable(table_split) - << " SELECT * FROM " << getDatabaseDotTable(table_pull) - << " WHERE (_part LIKE '" << task_partition.name << "%')"; + return processPartitionTaskImpl(task_partition); + } + catch (...) + { + tryLogCurrentException(log, "An error occurred while processing partition " + task_partition.name); + return false; + } + } - if (!task_table.where_condition_str.empty()) - ss << " AND (" + task_table.where_condition_str + ")"; + bool processPartitionTaskImpl(TaskPartition & task_partition) + { + TaskShard & task_shard = task_partition.task_shard; + TaskTable & task_table = task_shard.task_table; - query = ss.str(); - } + auto zookeeper = getZooKeeper(); + auto acl = zookeeper->getDefaultACL(); - LOG_DEBUG(log, "Executing query: " << query); + String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath(); + String current_task_is_active_path = task_partition.getActiveWorkerPath(); + String current_task_status_path = task_partition.getShardStatusPath(); + + /// Load balancing + auto worker_node_holder = createWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path); + + LOG_DEBUG(log, "Processing " << current_task_status_path); + + /// Do not start if partition is dirty, try to clean it + if (zookeeper->exists(is_dirty_flag_path)) + { + LOG_DEBUG(log, "Partition " << task_partition.name << " is dirty, try to drop it"); try { - Context local_context = context; - local_context.getSettingsRef() = task_cluster->settings_push; - - ReadBufferFromString istr(query); - WriteBufferNull ostr; - executeQuery(istr, ostr, false, local_context, {}); + tryDropPartition(task_partition, zookeeper); } catch (...) { - tryLogCurrentException(log); + tryLogCurrentException(log, "An error occurred while clean partition"); + } + + return false; + } + + /// Create ephemeral node to mark that we are active + zookeeper->createAncestors(current_task_is_active_path); + zkutil::EphemeralNodeHolderPtr partition_task_node_holder; + try + { + partition_task_node_holder = zkutil::EphemeralNodeHolder::create(current_task_is_active_path, *zookeeper, host_id); + } + catch (const zkutil::KeeperException & e) + { + if (e.code == ZNODEEXISTS) + { + LOG_DEBUG(log, "Someone is already processing " << current_task_is_active_path); + return false; + } + + throw; + } + + /// Exit if task has been already processed, create blocking node if it is abandoned + { + String status_data; + if (zookeeper->tryGet(current_task_status_path, status_data)) + { + TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data); + if (status.state == TaskState::Finished) + { + LOG_DEBUG(log, "Task " << current_task_status_path << " has been successfully executed by " << status.owner); + return true; + } + + // Task is abandoned, initialize DROP PARTITION + LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner); + zookeeper->create(is_dirty_flag_path, current_task_status_path, zkutil::CreateMode::Persistent); + + return false; } } - TaskStateWithOwner state_finish(TaskState::Started, host_id); - zkutil::Stat stat; - code = zookeeper->trySet(partition_task_status_node, state_finish.toString(), 0, &stat); + zookeeper->createAncestors(current_task_status_path); - if (code == ZBADVERSION) + /// We need to update table definitions for each part, it could be changed after ALTER + ASTPtr create_query_pull_ast; { - LOG_DEBUG(log, "Someone made the node abandoned. Will refill partition"); - return false; + /// Fetch and parse (possibly) new definition + String create_query_pull_str = getRemoteCreateTable(task_table.table_pull, *task_shard.connection_entry, + &task_cluster->settings_pull); + ParserCreateQuery parser_create_query; + create_query_pull_ast = parseQuery(parser_create_query, create_query_pull_str); } - else if (code != ZOK) - throw zkutil::KeeperException(code, partition_task_status_node); + /// Create local Distributed tables: + /// a table fetching data from current shard and a table inserting data to the whole cluster + DatabaseAndTableName table_shard(working_database_name, ".read_shard." + task_table.name_in_config); + DatabaseAndTableName table_split(working_database_name, ".split." + task_table.name_in_config); + { + /// Create special cluster with single shard + String shard_read_cluster_name = ".read_shard." + task_table.cluster_pull_name; + size_t current_shard_index = task_shard.info.shard_num - 1; + ClusterPtr cluster_pull_current_shard = task_table.cluster_pull->getClusterWithSingleShard(current_shard_index); + context.setCluster(shard_read_cluster_name, cluster_pull_current_shard); + + auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second); + const auto & storage_split_ast = task_table.engine_split_ast; + + auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_shard, storage_shard_ast); + auto create_table_split_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_split, storage_split_ast); + + LOG_DEBUG(log, "Create shard reading table. Query: " << queryToString(create_table_pull_ast)); + dropAndCreateLocalTable(create_table_pull_ast); + + LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast)); + dropAndCreateLocalTable(create_table_split_ast); + } + + /// Try start processing, create node about it + { + String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id); + auto op_create = std::make_unique(current_task_status_path, start_state, acl, zkutil::CreateMode::Persistent); + + auto multi_status = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_create)); + + if (multi_status.code != ZOK) + { + if (multi_status.failed_op_index < 0) + { + LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled"); + return false; + } + + throw zkutil::KeeperException(multi_status.code, current_task_status_path); + } + } + + /// Try create table (if not exists) on each shard + { + auto create_query_push_ast = rewriteCreateQueryStorage(create_query_pull_ast, task_table.table_push, task_table.engine_push_ast); + typeid_cast(*create_query_push_ast).if_not_exists = true; + String query = queryToString(create_query_push_ast); + + LOG_DEBUG(log, "Create remote push tables. Query: " << query); + executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push); + } + + /// Do the copying + { + ASTPtr query_select_ast; + { + String query; + query += "SELECT * FROM " + getDatabaseDotTable(table_shard); + query += " WHERE (_part LIKE '" + task_partition.name + "%')"; + if (!task_table.where_condition_str.empty()) + query += " AND (" + task_table.where_condition_str + ")"; + + ParserQuery p_query(query.data() + query.size()); + query_select_ast = parseQuery(p_query, query); + + LOG_DEBUG(log, "Executing SELECT query: " << query); + } + + ASTPtr query_insert_ast; + { + String query; + query += "INSERT INTO " + getDatabaseDotTable(table_split) + " VALUES "; + + ParserQuery p_query(query.data() + query.size()); + query_insert_ast = parseQuery(p_query, query); + + LOG_DEBUG(log, "Executing INSERT query: " << query); + } + + try + { + /// Custom INSERT SELECT implementation + Context context_select = context; + context_select.getSettingsRef() = task_cluster->settings_pull; + + Context context_insert = context; + context_insert.getSettingsRef() = task_cluster->settings_push; + + InterpreterSelectQuery interpreter_select(query_select_ast, context_select); + BlockIO io_select = interpreter_select.execute(); + + InterpreterInsertQuery interpreter_insert(query_insert_ast, context_insert); + BlockIO io_insert = interpreter_insert.execute(); + + using ExistsFuture = zkutil::ZooKeeper::ExistsFuture; + auto future_is_dirty = std::make_unique(zookeeper->asyncExists(is_dirty_flag_path)); + + Stopwatch watch(CLOCK_MONOTONIC_COARSE); + constexpr size_t check_period_milliseconds = 500; + + /// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copy data + auto cancel_check = [&] () + { + if (zookeeper->expired()) + throw Exception("ZooKeeper session is expired, cancel INSERT SELECT", ErrorCodes::UNFINISHED); + + if (future_is_dirty != nullptr) + { + zkutil::ZooKeeper::StatAndExists status; + try + { + status = future_is_dirty->get(); + future_is_dirty.reset(); + } + catch (zkutil::KeeperException & e) + { + future_is_dirty.reset(); + + if (e.isTemporaryError()) + LOG_INFO(log, "ZooKeeper is lagging: " << e.displayText()); + else + throw; + } + + if (status.exists) + throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED); + } + + if (watch.elapsedMilliseconds() >= check_period_milliseconds) + { + watch.restart(); + future_is_dirty = std::make_unique(zookeeper->asyncExists(is_dirty_flag_path)); + } + + return false; + }; + + /// Main work is here + copyData(*io_select.in, *io_insert.out, cancel_check); + + // Just in case + if (future_is_dirty != nullptr) + future_is_dirty.get(); + } + catch (...) + { + tryLogCurrentException(log, "An error occurred during copying, partition will be marked as dirty"); + return false; + } + } + + /// Finalize the processing, change state of current partition task (and also check is_dirty flag) + { + String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id); + auto op_set = std::make_unique(current_task_status_path, state_finished, 0); + auto multi_status = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_set)); + + if (multi_status.code != ZOK) + { + if (multi_status.failed_op_index < 0) + LOG_INFO(log, "Partition " << task_partition.name << " became dirty and will be dropped and refilled"); + else + LOG_INFO(log, "Someone made the node abandoned. Will refill partition. " << ::zerror(multi_status.code)); + + return false; + } + } + + LOG_DEBUG(log, "Data partition copied"); return true; } @@ -810,64 +1071,94 @@ protected: return res; } - size_t executeQueryOnOneReplicaAtLeast(const ClusterPtr & cluster, const ASTPtr & query_ast, const String & query, - const Settings * settings = nullptr) const + /** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster + * Returns number of shards for which at least one replica executed query successfully + */ + size_t executeQueryOnCluster( + const ClusterPtr & cluster, + const String & query, + const ASTPtr & query_ast_ = nullptr, + const Settings * settings = nullptr, + PoolMode pool_mode = PoolMode::GET_ALL, + size_t max_successful_executions_per_shard = 0) const { auto num_shards = cluster->getShardsInfo().size(); - std::vector per_shard_num_sucessful_replicas(num_shards, 0); + std::vector per_shard_num_successful_replicas(num_shards, 0); + + ASTPtr query_ast; + if (query_ast_ == nullptr) + { + ParserQuery p_query(query.data() + query.size()); + query_ast = parseQuery(p_query, query); + } + else + query_ast = query_ast_; + /// We need to execute query on one replica at least auto do_for_shard = [&] (size_t shard_index) { const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index); - size_t num_sucessful_replicas = 0; + size_t & num_successful_executions = per_shard_num_successful_replicas.at(shard_index); + num_successful_executions = 0; + + auto increment_and_check_exit = [&] () + { + ++num_successful_executions; + return max_successful_executions_per_shard && num_successful_executions >= max_successful_executions_per_shard; + }; /// In that case we don't have local replicas, but do it just in case for (size_t i = 0; i < shard.getLocalNodeCount(); ++i) { - InterpreterCreateQuery interpreter(query_ast, context); - interpreter.execute(); - ++num_sucessful_replicas; + auto interpreter = InterpreterFactory::get(query_ast, context); + interpreter->execute(); + + if (increment_and_check_exit()) + return; } /// Will try to make as many as possible queries if (shard.hasRemoteConnections()) { - std::vector connections = shard.pool->getMany(settings, PoolMode::GET_ALL); + std::vector connections = shard.pool->getMany(settings, pool_mode); for (auto & connection : connections) { if (!connection.isNull()) { - RemoteBlockInputStream stream(*connection, query, context, settings); - NullBlockOutputStream output; try { + RemoteBlockInputStream stream(*connection, query, context, settings); + NullBlockOutputStream output; copyData(stream, output); - ++num_sucessful_replicas; + + if (increment_and_check_exit()) + return; } catch (const Exception & e) { - tryLogCurrentException(log); + LOG_INFO(log, getCurrentExceptionMessage(false, true)); } } } } - - per_shard_num_sucessful_replicas[shard_index] = num_sucessful_replicas; }; - ThreadPool thread_pool(getNumberOfPhysicalCPUCores()); + { + ThreadPool thread_pool(std::min(num_shards, getNumberOfPhysicalCPUCores())); - for (size_t shard_index = 0; shard_index < num_shards; ++shard_index) - thread_pool.schedule([=] { do_for_shard(shard_index); }); - thread_pool.wait(); + for (size_t shard_index = 0; shard_index < num_shards; ++shard_index) + thread_pool.schedule([=] { do_for_shard(shard_index); }); - size_t sucessful_shards = 0; - for (size_t num_replicas : per_shard_num_sucessful_replicas) - sucessful_shards += (num_replicas > 0); + thread_pool.wait(); + } - return sucessful_shards; + size_t successful_shards = 0; + for (size_t num_replicas : per_shard_num_successful_replicas) + successful_shards += (num_replicas > 0); + + return successful_shards; } String getTableStructureAndCheckConsistency(TaskTable & table_task) @@ -896,7 +1187,7 @@ protected: for (size_t i = 0; i < block.rows(); ++i) { if (structure_class_col.getElement(i) != 0) - throw Exception("Structures of table " + table_task.db_table_pull + " are different on cluster " + + throw Exception("Structures of table " + getDatabaseDotTable(table_task.table_pull) + " are different on cluster " + table_task.cluster_pull_name, ErrorCodes::BAD_ARGUMENTS); if (rows == 0) @@ -935,6 +1226,8 @@ private: Context & context; Poco::Logger * log; + + std::chrono::milliseconds default_sleep_time{1000}; }; @@ -965,15 +1258,17 @@ public: po::variables_map options; po::store(po::command_line_parser(argc, argv).options(options_desc).positional(positional_desc).run(), options); - po::notify(options); if (options.count("help")) { std::cerr << "Copies tables from one cluster to another" << std::endl; std::cerr << "Usage: clickhouse copier " << std::endl; std::cerr << options_desc << std::endl; + return; } + po::notify(options); + if (config_xml.empty() || !Poco::File(config_xml).exists()) throw Exception("ZooKeeper configuration file " + config_xml + " doesn't exist", ErrorCodes::BAD_ARGUMENTS);