diff --git a/dbms/programs/copier/ClusterCopier.cpp b/dbms/programs/copier/ClusterCopier.cpp index f308a9a55f5..d7be72022f1 100644 --- a/dbms/programs/copier/ClusterCopier.cpp +++ b/dbms/programs/copier/ClusterCopier.cpp @@ -433,8 +433,14 @@ bool ClusterCopier::checkPartitionPieceIsClean( bool ClusterCopier::checkAllPiecesInPartitionAreDone(const TaskTable & task_table, const String & partition_name, const TasksShard & shards_with_partition) { bool answer = true; - for (size_t piece_number = 0; piece_number < task_table.number_of_splits; piece_number++) - answer &= checkPartitionPieceIsDone(task_table, partition_name, piece_number, shards_with_partition); + for (size_t piece_number = 0; piece_number < task_table.number_of_splits; ++piece_number) + { + bool piece_is_done = checkPartitionPieceIsDone(task_table, partition_name, piece_number, shards_with_partition); + if (!piece_is_done) + LOG_DEBUG(log, "Partition " << partition_name << " piece " + toString(piece_number) + " is not already done."); + answer &= piece_is_done; + } + return answer; } @@ -528,10 +534,156 @@ bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, cons } } - LOG_INFO(log, "Partition " << partition_name << " is copied successfully"); + LOG_INFO(log, "Partition " << partition_name << " piece number " << toString(piece_number) << " is copied successfully"); return true; } + +PartitionTaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name) +{ + LOG_DEBUG(log, "Try to move " << partition_name << " to destionation table"); + + auto zookeeper = context.getZooKeeper(); + + const auto current_partition_attach_is_active = task_table.getPartitionAttachIsActivePath(partition_name); + const auto current_partition_attach_is_done = task_table.getPartitionAttachIsDonePath(partition_name); + + /// Create ephemeral node to mark that we are active and process the partition + zookeeper->createAncestors(current_partition_attach_is_active); + zkutil::EphemeralNodeHolderPtr partition_attach_node_holder; + try + { + partition_attach_node_holder = zkutil::EphemeralNodeHolder::create(current_partition_attach_is_active, *zookeeper, host_id); + } + catch (const Coordination::Exception & e) + { + if (e.code == Coordination::ZNODEEXISTS) + { + LOG_DEBUG(log, "Someone is already moving pieces " << current_partition_attach_is_active); + return PartitionTaskStatus::Active; + } + + throw; + } + + + /// Exit if task has been already processed; + /// create blocking node to signal cleaning up if it is abandoned + { + String status_data; + if (zookeeper->tryGet(current_partition_attach_is_done, status_data)) + { + TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data); + if (status.state == TaskState::Finished) + { + LOG_DEBUG(log, "All pieces for partition from this task " << current_partition_attach_is_active + << " has been successfully moved to destination table by " << status.owner); + return PartitionTaskStatus::Finished; + } + + /// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process. + /// Initialize DROP PARTITION + LOG_DEBUG(log, "Moving piece for partition " << current_partition_attach_is_active + << " has not been successfully finished by " << status.owner + << ". Will try to move by myself."); + } + } + + + /// Try start processing, create node about it + { + String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id); + zookeeper->create(current_partition_attach_is_done, start_state, zkutil::CreateMode::Persistent); + } + + for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number) + { + /// Move partition to original destination table. + { + /// TODO: Execute alter table move partition. + + LOG_DEBUG(log, "Trying to move partition " << partition_name + << " piece " << toString(current_piece_number) << " to original table"); + + ASTPtr query_alter_ast; + String query_alter_ast_string; + + DatabaseAndTableName original_table = task_table.table_push; + DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number)); + + query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) + + " ATTACH PARTITION " + partition_name + + " FROM " + getQuotedTable(helping_table) + + " SETTINGS replication_alter_partitions_sync=2;"; + + LOG_DEBUG(log, "Executing ALTER query: " << query_alter_ast_string); + + try + { + size_t num_nodes = 0; + + for (UInt64 try_num = 0; try_num < max_shard_partition_piece_tries_for_alter; ++try_num) + { + ///FIXME: We have to be sure that every node in cluster executed this query + UInt64 current_num_nodes = executeQueryOnCluster( + task_table.cluster_push, + query_alter_ast_string, + nullptr, + &task_cluster->settings_push, + PoolMode::GET_MANY, + ClusterExecutionMode::ON_EACH_NODE); + + num_nodes = std::max(current_num_nodes, num_nodes); + } + + LOG_INFO(log, "Number of nodes that executed ALTER query successfully : " << toString(num_nodes)); + } + catch (...) + { + LOG_DEBUG(log, "Error while moving partition " << partition_name + << " piece " << toString(current_piece_number) << "to original table"); + throw; + } + + + try + { + String query_deduplicate_ast_string; + if (!task_table.isReplicatedTable()) + { + query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) + + " PARTITION " + partition_name + " DEDUPLICATE;"; + + LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: " << query_alter_ast_string); + + UInt64 num_nodes = executeQueryOnCluster( + task_table.cluster_push, + query_deduplicate_ast_string, + nullptr, + &task_cluster->settings_push, + PoolMode::GET_MANY); + + LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : " << toString(num_nodes)); + } + } + catch(...) + { + LOG_DEBUG(log, "Error while executing OPTIMIZE DEDUPLICATE partition " << partition_name << "in the original table"); + throw; + } + } + } + + + /// Create node to signal that we finished moving + { + String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id); + zookeeper->set(current_partition_attach_is_done, state_finished, 0); + } + + return PartitionTaskStatus::Finished; +} + /// Removes MATERIALIZED and ALIAS columns from create table query ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast) { @@ -838,20 +990,37 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab /// Check that whole cluster partition is done /// Firstly check the number of failed partition tasks, then look into ZooKeeper and ensure that each partition is done - bool partition_is_done = num_failed_shards == 0; + bool partition_copying_is_done = num_failed_shards == 0; try { - partition_is_done = + partition_copying_is_done = !has_shard_to_process - || (partition_is_done && checkAllPiecesInPartitionAreDone(task_table, partition_name, expected_shards)); + || (partition_copying_is_done && checkAllPiecesInPartitionAreDone(task_table, partition_name, expected_shards)); } catch (...) { tryLogCurrentException(log); - partition_is_done = false; + partition_copying_is_done = false; } - if (partition_is_done) + + bool partition_moving_is_done = false; + /// Try to move only if all pieces were copied. + if (partition_copying_is_done) + { + try + { + auto res = tryMoveAllPiecesToDestinationTable(task_table, partition_name); + if (res == PartitionTaskStatus::Finished) + partition_moving_is_done = true; + } + catch (...) + { + tryLogCurrentException(log, "Some error occured while moving pieces to destination table for partition " + partition_name); + } + } + + if (partition_copying_is_done && partition_moving_is_done) { task_table.finished_cluster_partitions.emplace(partition_name); @@ -1350,80 +1519,6 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl( tryLogCurrentException(log, "Error while creating original table. Maybe we are not first."); } - /// Move partition to original destination table. - { - /// TODO: Execute alter table move partition. - - LOG_DEBUG(log, "Trying to move partition " << task_partition.name - << " piece " << toString(current_piece_number) << " to original table"); - - ASTPtr query_alter_ast; - String query_alter_ast_string; - - DatabaseAndTableName original_table = task_table.table_push; - DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number)); - - query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) + - " ATTACH PARTITION " + task_partition.name + - " FROM " + getQuotedTable(helping_table) + - " SETTINGS replication_alter_partitions_sync=2;"; - - LOG_DEBUG(log, "Executing ALTER query: " << query_alter_ast_string); - - try - { - size_t num_nodes = 0; - for (UInt64 try_num = 0; try_num < max_shard_partition_piece_tries_for_alter; ++try_num) - { - ///FIXME: We have to be sure that every node in cluster executed this query - UInt64 current_num_nodes = executeQueryOnCluster( - task_table.cluster_push, - query_alter_ast_string, - nullptr, - &task_cluster->settings_push, - PoolMode::GET_MANY, - ClusterExecutionMode::ON_EACH_NODE); - - num_nodes = std::max(current_num_nodes, num_nodes); - } - LOG_INFO(log, "Number of shard that executed ALTER query successfully : " << toString(num_nodes)); - } - catch (...) - { - LOG_DEBUG(log, "Error while moving partition " << task_partition.name - << " piece " << toString(current_piece_number) << "to original table"); - throw; - } - - - try - { - String query_deduplicate_ast_string; - if (!task_table.isReplicatedTable()) - { - query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) + - " PARTITION " + task_partition.name + " DEDUPLICATE;"; - - LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: " << query_alter_ast_string); - - UInt64 num_nodes = executeQueryOnCluster( - task_table.cluster_push, - query_deduplicate_ast_string, - nullptr, - &task_cluster->settings_push, - PoolMode::GET_MANY); - - LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : " << toString(num_nodes)); - } - } - catch(...) - { - LOG_DEBUG(log, "Error while executing OPTIMIZE DEDUPLICATE partition " << task_partition.name << "in the original table"); - throw; - } - - } - /// Finalize the processing, change state of current partition task (and also check is_dirty flag) { String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id); diff --git a/dbms/programs/copier/ClusterCopier.h b/dbms/programs/copier/ClusterCopier.h index bed6352e129..a39653c6c87 100644 --- a/dbms/programs/copier/ClusterCopier.h +++ b/dbms/programs/copier/ClusterCopier.h @@ -104,6 +104,10 @@ protected: bool checkPartitionPieceIsDone(const TaskTable & task_table, const String & partition_name, size_t piece_number, const TasksShard & shards_with_partition); + + /*Alter successful insertion to helping tables it will move all pieces to destination table*/ + PartitionTaskStatus tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name); + /// Removes MATERIALIZED and ALIAS columns from create table query ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast); diff --git a/dbms/programs/copier/TaskTableAndShard.h b/dbms/programs/copier/TaskTableAndShard.h index b33876c679e..9a09deb911f 100644 --- a/dbms/programs/copier/TaskTableAndShard.h +++ b/dbms/programs/copier/TaskTableAndShard.h @@ -25,6 +25,10 @@ struct TaskTable { String getPartitionPath(const String & partition_name) const; + String getPartitionAttachIsActivePath(const String & partition_name) const; + + String getPartitionAttachIsDonePath(const String & partition_name) const; + String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const; String getCertainPartitionIsDirtyPath(const String & partition_name) const; @@ -42,7 +46,7 @@ struct TaskTable { [[maybe_unused]] String getReplicatedEngineFirstArgumentForCurrentPiece(const size_t piece_number) const; - bool isReplicatedTable() { return engine_push_zk_path != ""; } + bool isReplicatedTable() const { return engine_push_zk_path != ""; } /// Partitions will be splitted into number-of-splits pieces. /// Each piece will be copied independently. (10 by default) @@ -191,12 +195,23 @@ struct TaskShard }; -inline String TaskTable::getPartitionPath(const String &partition_name) const { +inline String TaskTable::getPartitionPath(const String & partition_name) const +{ return task_cluster.task_zookeeper_path // root + "/tables/" + table_id // tables/dst_cluster.merge.hits + "/" + escapeForFileName(partition_name); // 201701 } +inline String TaskTable::getPartitionAttachIsActivePath(const String & partition_name) const +{ + return getPartitionPath(partition_name) + "/attach_active"; +} + +inline String TaskTable::getPartitionAttachIsDonePath(const String & partition_name) const +{ + return getPartitionPath(partition_name) + "/attach_is_done"; +} + inline String TaskTable::getPartitionPiecePath(const String & partition_name, size_t piece_number) const { assert(piece_number < number_of_splits); @@ -241,7 +256,7 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf name_in_config = table_key; - number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 2); + number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 10); cluster_pull_name = config.getString(table_prefix + "cluster_pull"); cluster_push_name = config.getString(table_prefix + "cluster_push");