From 1be4a35f15c74ca327668185fd3f24bd9865c2b7 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Thu, 20 Feb 2020 20:26:20 +0300 Subject: [PATCH] successful copying --- dbms/programs/copier/ClusterCopier.cpp | 112 +++++++++++++-------- dbms/programs/copier/ClusterCopier.h | 4 +- dbms/programs/copier/ShardPartitionPiece.h | 2 +- dbms/programs/copier/TaskTableAndShard.h | 13 ++- dbms/programs/copier/ZooKeeperStaff.h | 7 +- 5 files changed, 81 insertions(+), 57 deletions(-) diff --git a/dbms/programs/copier/ClusterCopier.cpp b/dbms/programs/copier/ClusterCopier.cpp index 65a82983d09..d6859b54cbf 100644 --- a/dbms/programs/copier/ClusterCopier.cpp +++ b/dbms/programs/copier/ClusterCopier.cpp @@ -17,7 +17,7 @@ void ClusterCopier::init() { if (response.error != Coordination::ZOK) return; - UInt64 version = ++task_descprtion_version; + UInt64 version = ++task_description_version; LOG_DEBUG(log, "Task description should be updated, local version " << version); }; @@ -227,9 +227,9 @@ void ClusterCopier::reloadTaskDescription() void ClusterCopier::updateConfigIfNeeded() { - UInt64 version_to_update = task_descprtion_version; - bool is_outdated_version = task_descprtion_current_version != version_to_update; - bool is_expired_session = !task_description_watch_zookeeper || task_description_watch_zookeeper->expired(); + UInt64 version_to_update = task_description_version; + bool is_outdated_version = task_description_current_version != version_to_update; + bool is_expired_session = !task_description_watch_zookeeper || task_description_watch_zookeeper->expired(); if (!is_outdated_version && !is_expired_session) return; @@ -237,7 +237,7 @@ void ClusterCopier::updateConfigIfNeeded() LOG_DEBUG(log, "Updating task description"); reloadTaskDescription(); - task_descprtion_current_version = version_to_update; + task_description_current_version = version_to_update; } void ClusterCopier::process(const ConnectionTimeouts & timeouts) @@ -312,6 +312,13 @@ void ClusterCopier::process(const ConnectionTimeouts & timeouts) /// Protected section + +/* + * Creates task worker node and checks maximum number of workers not to exceed the limit. + * To achive this we have to check version of workers_version_path node and create current_worker_path + * node atomically. + * */ + zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNeed( const zkutil::ZooKeeperPtr & zookeeper, const String & description, @@ -324,8 +331,8 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee std::this_thread::sleep_for(current_sleep_time); String workers_version_path = getWorkersPathVersion(); - String workers_path = getWorkersPath(); - String current_worker_path = getCurrentWorkerNodePath(); + String workers_path = getWorkersPath(); + String current_worker_path = getCurrentWorkerNodePath(); UInt64 num_bad_version_errors = 0; @@ -629,6 +636,9 @@ std::shared_ptr ClusterCopier::rewriteCreateQueryStorage(const A return res; } + +/// TODO: implement tryDropPartitionPiece which is simply tryDropPartition, but on different table. + bool ClusterCopier::tryDropPartition(ShardPartition & task_partition, const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock) { if (is_safe_mode) @@ -636,11 +646,11 @@ bool ClusterCopier::tryDropPartition(ShardPartition & task_partition, const zkut TaskTable & task_table = task_partition.task_shard.task_table; - const String current_shards_path = task_partition.getPartitionShardsPath(); + const String current_shards_path = task_partition.getPartitionShardsPath(); const String current_partition_active_workers_dir = task_partition.getPartitionActiveWorkersPath(); - const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath(); - const String dirty_cleaner_path = is_dirty_flag_path + "/cleaner"; - const String is_dirty_cleaned_path = task_partition.getCommonPartitionIsCleanedPath(); + const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath(); + const String dirty_cleaner_path = is_dirty_flag_path + "/cleaner"; + const String is_dirty_cleaned_path = task_partition.getCommonPartitionIsCleanedPath(); zkutil::EphemeralNodeHolder::Ptr cleaner_holder; try @@ -957,12 +967,15 @@ PartitionTaskStatus ClusterCopier::iterateThroughAllPiecesInPartition(const Conn } -PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, - const size_t current_piece_number, bool is_unprioritized_task) + +/*...*/ +PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl( + const ConnectionTimeouts & timeouts, ShardPartition & task_partition, + const size_t current_piece_number, bool is_unprioritized_task) { TaskShard & task_shard = task_partition.task_shard; TaskTable & task_table = task_shard.task_table; - ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name); + ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name); ShardPartitionPiece & partition_piece = task_partition.pieces[current_piece_number]; const size_t number_of_splits = task_table.number_of_splits; @@ -974,15 +987,15 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio auto zookeeper = context.getZooKeeper(); - const String piece_is_dirty_flag_path = partition_piece.getPartitionPieceIsDirtyPath(); - const String piece_is_dirty_cleaned_path = partition_piece.getPartitionPieceIsCleanedPath(); + const String piece_is_dirty_flag_path = partition_piece.getPartitionPieceIsDirtyPath(); + const String piece_is_dirty_cleaned_path = partition_piece.getPartitionPieceIsCleanedPath(); const String current_task_piece_is_active_path = partition_piece.getActiveWorkerPath(); - const String current_task_piece_status_path = partition_piece.getShardStatusPath(); + const String current_task_piece_status_path = partition_piece.getShardStatusPath(); /// Auxiliary functions: /// Creates is_dirty node to initialize DROP PARTITION - auto create_is_dirty_node = [&, this] (const CleanStateClock & clock) + auto create_is_dirty_node = [&] (const CleanStateClock & clock) { if (clock.is_stale()) LOG_DEBUG(log, "Clean state clock is stale while setting dirty flag, cowardly bailing"); @@ -1001,16 +1014,17 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio }; /// Returns SELECT query filtering current partition and applying user filter - auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, String limit = "", - bool enable_splitting = false) + auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, bool enable_splitting, String limit = "") { String query; query += "SELECT " + fields + " FROM " + getQuotedTable(from_table); /// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field) query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + task_partition.name + " AS partition_key))"; + if (enable_splitting) - query += " AND ( cityHash64(" + primary_key_comma_separated + ") = " + toString(current_piece_number) + " )"; + query += " AND ( cityHash64(" + primary_key_comma_separated + ") %" + toString(number_of_splits) + + " = " + toString(current_piece_number) + " )"; if (!task_table.where_condition_str.empty()) query += " AND (" + task_table.where_condition_str + ")"; @@ -1031,23 +1045,26 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio LogicalClock task_start_clock; { Coordination::Stat stat{}; - if (zookeeper->exists(task_partition.getPartitionShardsPath(), &stat)) + if (zookeeper->exists(partition_piece.getPartitionPieceShardsPath(), &stat)) task_start_clock = LogicalClock(stat.mzxid); } - /// Do not start if partition is dirty, try to clean it + /// Do not start if partition piece is dirty, try to clean it if (clean_state_clock.is_clean() && (!task_start_clock.hasHappened() || clean_state_clock.discovery_zxid <= task_start_clock)) { - LOG_DEBUG(log, "Partition " << task_partition.name << " appears to be clean"); + LOG_DEBUG(log, "Partition " << task_partition.name + << " piece " + toString(current_piece_number) + " appears to be clean"); zookeeper->createAncestors(current_task_piece_status_path); } else { - LOG_DEBUG(log, "Partition " << task_partition.name << " is dirty, try to drop it"); + LOG_DEBUG(log, "Partition " << task_partition.name + << " piece " + toString(current_piece_number) + " is dirty, try to drop it"); try { + /// TODO: tryDropPartitionPiece. tryDropPartition(task_partition, zookeeper, clean_state_clock); } catch (...) @@ -1085,13 +1102,16 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data); if (status.state == TaskState::Finished) { - LOG_DEBUG(log, "Task " << current_task_piece_status_path << " has been successfully executed by " << status.owner); + LOG_DEBUG(log, "Task " << current_task_piece_status_path + << " has been successfully executed by " << status.owner); return PartitionTaskStatus::Finished; } - // Task is abandoned, initialize DROP PARTITION - LOG_DEBUG(log, "Task " << current_task_piece_status_path << " has not been successfully finished by " << - status.owner << ". Partition will be dropped and refilled."); + /// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process. + /// Initialize DROP PARTITION + LOG_DEBUG(log, "Task " << current_task_piece_status_path + << " has not been successfully finished by " << status.owner + << ". Partition will be dropped and refilled."); create_is_dirty_node(clean_state_clock); return PartitionTaskStatus::Error; @@ -1101,13 +1121,14 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio /// Check that destination partition is empty if we are first worker /// NOTE: this check is incorrect if pull and push tables have different partition key! String clean_start_status; - if (!zookeeper->tryGet(task_partition.getPartitionCleanStartPath(), clean_start_status) || clean_start_status != "ok") + if (!zookeeper->tryGet(partition_piece.getPartitionPieceCleanStartPath(), clean_start_status) || clean_start_status != "ok") { - zookeeper->createIfNotExists(task_partition.getPartitionCleanStartPath(), ""); - auto checker = zkutil::EphemeralNodeHolder::create(task_partition.getPartitionCleanStartPath() + "/checker", + zookeeper->createIfNotExists(partition_piece.getPartitionPieceCleanStartPath(), ""); + auto checker = zkutil::EphemeralNodeHolder::create(partition_piece.getPartitionPieceCleanStartPath() + "/checker", *zookeeper, host_id); // Maybe we are the first worker - ASTPtr query_select_ast = get_select_query(task_shard.table_split_shard, "count()"); + /// TODO: Why table_split_shard??? + ASTPtr query_select_ast = get_select_query(task_shard.table_split_shard, "count()", /*enable_splitting*/ true); UInt64 count; { Context local_context = context; @@ -1122,20 +1143,21 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio if (count != 0) { Coordination::Stat stat_shards{}; - zookeeper->get(task_partition.getPartitionShardsPath(), & stat_shards); + zookeeper->get(partition_piece.getPartitionPieceShardsPath(), &stat_shards); /// NOTE: partition is still fresh if dirt discovery happens before cleaning if (stat_shards.numChildren == 0) { LOG_WARNING(log, "There are no workers for partition " << task_partition.name - << ", but destination table contains " << count << " rows" - << ". Partition will be dropped and refilled."); + << " piece " << toString(current_piece_number) + << ", but destination table contains " << count << " rows" + << ". Partition will be dropped and refilled."); create_is_dirty_node(clean_state_clock); return PartitionTaskStatus::Error; } } - zookeeper->set(task_partition.getPartitionCleanStartPath(), "ok"); + zookeeper->set(partition_piece.getPartitionPieceCleanStartPath(), "ok"); } /// At this point, we need to sync that the destination table is clean /// before any actual work @@ -1146,12 +1168,14 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path); if (clean_state_clock != new_clean_state_clock) { - LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing"); + LOG_INFO(log, "Partition " << task_partition.name << " piece " + << toString(current_piece_number) << " clean state changed, cowardly bailing"); return PartitionTaskStatus::Error; } else if (!new_clean_state_clock.is_clean()) { - LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled"); + LOG_INFO(log, "Partition " << task_partition.name << " piece " + << toString(current_piece_number) << " is dirty and will be dropped and refilled"); create_is_dirty_node(new_clean_state_clock); return PartitionTaskStatus::Error; } @@ -1163,7 +1187,7 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio /// Define push table for current partition piece auto database_and_table_for_current_piece= std::pair( task_table.table_push.first, - task_table.table_push.second + ".piece_" + toString(current_piece_number)); + task_table.table_push.second + "_piece_" + toString(current_piece_number)); auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query, database_and_table_for_current_piece, task_table.engine_push_ast); @@ -1174,8 +1198,8 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push, PoolMode::GET_MANY); - LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push) << - " have been created on " << shards << " shards of " << task_table.cluster_push->getShardCount()); + LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push) + << " have been created on " << shards << " shards of " << task_table.cluster_push->getShardCount()); } /// Do the copying @@ -1188,7 +1212,7 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio } // Select all fields - ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", inject_fault ? "1" : ""); + ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", /*enable_splitting*/ inject_fault ? "1" : ""); LOG_DEBUG(log, "Executing SELECT query and pull from " << task_shard.getDescription() << " : " << queryToString(query_select_ast)); @@ -1361,7 +1385,7 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout String split_shard_prefix = ".split."; task_shard.table_read_shard = DatabaseAndTableName(working_database_name, read_shard_prefix + task_table.table_id); task_shard.table_split_shard = DatabaseAndTableName( - working_database_name, split_shard_prefix + task_table.table_id + ".piece_" + toString(piece_number)); + working_database_name, split_shard_prefix + task_table.table_id + "_piece_" + toString(piece_number)); /// Create special cluster with single shard String shard_read_cluster_name = read_shard_prefix + task_table.cluster_pull_name; diff --git a/dbms/programs/copier/ClusterCopier.h b/dbms/programs/copier/ClusterCopier.h index 0555b2d5c04..41fdc326ed2 100644 --- a/dbms/programs/copier/ClusterCopier.h +++ b/dbms/programs/copier/ClusterCopier.h @@ -156,8 +156,8 @@ private: String working_database_name; /// Auto update config stuff - UInt64 task_descprtion_current_version = 1; - std::atomic task_descprtion_version{1}; + UInt64 task_description_current_version = 1; + std::atomic task_description_version{1}; Coordination::WatchCallback task_description_watch_callback; /// ZooKeeper session used to set the callback zkutil::ZooKeeperPtr task_description_watch_zookeeper; diff --git a/dbms/programs/copier/ShardPartitionPiece.h b/dbms/programs/copier/ShardPartitionPiece.h index f7ae8013b47..a99221b8a97 100644 --- a/dbms/programs/copier/ShardPartitionPiece.h +++ b/dbms/programs/copier/ShardPartitionPiece.h @@ -56,7 +56,7 @@ inline String ShardPartitionPiece::getPartitionPieceIsCleanedPath() const inline String ShardPartitionPiece::getPartitionPieceActiveWorkersPath() const { - return getPartitionPiecePath() + "/partition_active_workers"; + return getPartitionPiecePath() + "/partition_piece_active_workers"; } inline String ShardPartitionPiece::getActiveWorkerPath() const diff --git a/dbms/programs/copier/TaskTableAndShard.h b/dbms/programs/copier/TaskTableAndShard.h index d585fb184ed..c0795340e47 100644 --- a/dbms/programs/copier/TaskTableAndShard.h +++ b/dbms/programs/copier/TaskTableAndShard.h @@ -20,11 +20,11 @@ struct TaskTable { String getPartitionPath(const String & partition_name) const; - [[maybe_unused]] String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const; + String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const; String getCertainPartitionIsDirtyPath(const String & partition_name) const; - [[maybe_unused]] String getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const + String getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const { UNUSED(partition_name); UNUSED(piece_number); @@ -33,7 +33,7 @@ struct TaskTable { String getCertainPartitionIsCleanedPath(const String & partition_name) const; - [[maybe_unused]] String getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const + String getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const { UNUSED(partition_name); UNUSED(piece_number); @@ -42,7 +42,7 @@ struct TaskTable { String getCertainPartitionTaskStatusPath(const String & partition_name) const; - [[maybe_unused]] String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const + String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const { UNUSED(partition_name); UNUSED(piece_number); @@ -181,7 +181,10 @@ struct TaskShard /// Internal distributed tables DatabaseAndTableName table_read_shard; + DatabaseAndTableName table_split_shard; + + std::vector list_of_split_tables_on_shard; }; @@ -255,7 +258,7 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf auxiliary_engine_split_asts.emplace_back ( createASTStorageDistributed(cluster_push_name, table_push.first, - table_push.second + ".piece_" + toString(piece_number), sharding_key_ast) + table_push.second + "_piece_" + toString(piece_number), sharding_key_ast) ); } } diff --git a/dbms/programs/copier/ZooKeeperStaff.h b/dbms/programs/copier/ZooKeeperStaff.h index 3133c68933d..2fc4d35400d 100644 --- a/dbms/programs/copier/ZooKeeperStaff.h +++ b/dbms/programs/copier/ZooKeeperStaff.h @@ -157,11 +157,8 @@ public: bool is_clean() const { - return - !is_stale() - && ( - !discovery_zxid.hasHappened() - || (clean_state_zxid.hasHappened() && discovery_zxid <= clean_state_zxid)); + return !is_stale() + && (!discovery_zxid.hasHappened() || (clean_state_zxid.hasHappened() && discovery_zxid <= clean_state_zxid)); } bool is_stale() const