diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index 2b19a401206..bede40d65f5 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -599,11 +599,13 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t toString(current_piece_number)); Settings settings_push = task_cluster->settings_push; - - /// It is important, ALTER ATTACH PARTITION must be done synchronously - /// And we will execute this ALTER query on each replica of a shard. - /// It is correct, because this query is idempotent. - settings_push.replication_alter_partitions_sync = 2; + ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_NODE; + UInt64 max_successful_executions_per_shard = 0; + if (settings_push.replication_alter_partitions_sync == 1) + { + execution_mode = ClusterExecutionMode::ON_EACH_SHARD; + max_successful_executions_per_shard = 1; + } query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) + ((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name + @@ -613,14 +615,33 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t try { - size_t num_nodes = executeQueryOnCluster( - task_table.cluster_push, - query_alter_ast_string, - settings_push, - PoolMode::GET_MANY, - ClusterExecutionMode::ON_EACH_NODE); + /// Try attach partition on each shard + UInt64 num_nodes = executeQueryOnCluster( + task_table.cluster_push, + query_alter_ast_string, + task_cluster->settings_push, + PoolMode::GET_MANY, + execution_mode, + max_successful_executions_per_shard); - LOG_INFO(log, "Number of nodes that executed ALTER query successfully : {}", toString(num_nodes)); + if (settings_push.replication_alter_partitions_sync == 1) + { + LOG_INFO( + log, + "Destination tables {} have been executed alter query successfully on {} shards of {}", + getQuotedTable(task_table.table_push), + num_nodes, + task_table.cluster_push->getShardCount()); + + if (num_nodes != task_table.cluster_push->getShardCount()) + { + return TaskStatus::Error; + } + } + else + { + LOG_INFO(log, "Number of nodes that executed ALTER query successfully : {}", toString(num_nodes)); + } } catch (...) { @@ -856,6 +877,16 @@ bool ClusterCopier::tryDropPartitionPiece( bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table) { + /// Create destination table + TaskStatus task_status = TaskStatus::Error; + + task_status = tryCreateDestinationTable(timeouts, task_table); + /// Exit if success + if (task_status != TaskStatus::Finished) + { + LOG_WARNING(log, "Create destination Tale Failed "); + return false; + } /// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint bool previous_shard_is_instantly_finished = false; @@ -932,7 +963,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab /// Do not sleep if there is a sequence of already processed shards to increase startup bool is_unprioritized_task = !previous_shard_is_instantly_finished && shard->priority.is_remote; - TaskStatus task_status = TaskStatus::Error; + task_status = TaskStatus::Error; bool was_error = false; has_shard_to_process = true; for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num) @@ -1050,6 +1081,44 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab return table_is_done; } +TaskStatus ClusterCopier::tryCreateDestinationTable(const ConnectionTimeouts & timeouts, TaskTable & task_table) +{ + /// Try create original table (if not exists) on each shard + + //TaskTable & task_table = task_shard.task_table; + const TaskShardPtr task_shard = task_table.all_shards.at(0); + /// We need to update table definitions for each part, it could be changed after ALTER + task_shard->current_pull_table_create_query = getCreateTableForPullShard(timeouts, *task_shard); + try + { + auto create_query_push_ast + = rewriteCreateQueryStorage(task_shard->current_pull_table_create_query, task_table.table_push, task_table.engine_push_ast); + auto & create = create_query_push_ast->as(); + create.if_not_exists = true; + InterpreterCreateQuery::prepareOnClusterQuery(create, context, task_table.cluster_push_name); + String query = queryToString(create_query_push_ast); + + LOG_DEBUG(log, "Create destination tables. Query: {}", query); + UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY); + LOG_INFO( + log, + "Destination tables {} have been created on {} shards of {}", + getQuotedTable(task_table.table_push), + shards, + task_table.cluster_push->getShardCount()); + if (shards != task_table.cluster_push->getShardCount()) + { + return TaskStatus::Error; + } + } + catch (...) + { + tryLogCurrentException(log, "Error while creating original table. Maybe we are not first."); + } + + return TaskStatus::Finished; +} + /// Job for copying partition from particular shard. TaskStatus ClusterCopier::tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task) { @@ -1366,8 +1435,17 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( LOG_DEBUG(log, "Create destination tables. Query: {}", query); UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY); - LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", - getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount()); + LOG_INFO( + log, + "Destination tables {} have been created on {} shards of {}", + getQuotedTable(task_table.table_push), + shards, + task_table.cluster_push->getShardCount()); + + if (shards != task_table.cluster_push->getShardCount()) + { + return TaskStatus::Error; + } } /// Do the copying @@ -1477,26 +1555,6 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( LOG_INFO(log, "Partition {} piece {} copied. But not moved to original destination table.", task_partition.name, toString(current_piece_number)); - - /// Try create original table (if not exists) on each shard - try - { - auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query, - task_table.table_push, task_table.engine_push_ast); - auto & create = create_query_push_ast->as(); - create.if_not_exists = true; - InterpreterCreateQuery::prepareOnClusterQuery(create, context, task_table.cluster_push_name); - String query = queryToString(create_query_push_ast); - - LOG_DEBUG(log, "Create destination tables. Query: {}", query); - UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY); - LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount()); - } - catch (...) - { - tryLogCurrentException(log, "Error while creating original table. Maybe we are not first."); - } - /// Finalize the processing, change state of current partition task (and also check is_dirty flag) { String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id); @@ -1538,33 +1596,36 @@ void ClusterCopier::dropLocalTableIfExists(const DatabaseAndTableName & table_na interpreter.execute(); } +void ClusterCopier::dropHelpingTablesByPieceNumber(const TaskTable & task_table, size_t current_piece_number) +{ + LOG_DEBUG(log, "Removing helping tables piece {}", current_piece_number); + + DatabaseAndTableName original_table = task_table.table_push; + DatabaseAndTableName helping_table + = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number)); + + String query = "DROP TABLE IF EXISTS " + getQuotedTable(helping_table); + + const ClusterPtr & cluster_push = task_table.cluster_push; + Settings settings_push = task_cluster->settings_push; + + LOG_DEBUG(log, "Execute distributed DROP TABLE: {}", query); + + /// We have to drop partition_piece on each replica + UInt64 num_nodes = executeQueryOnCluster(cluster_push, query, settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE); + + LOG_INFO(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes)); +} void ClusterCopier::dropHelpingTables(const TaskTable & task_table) { LOG_DEBUG(log, "Removing helping tables"); for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number) { - DatabaseAndTableName original_table = task_table.table_push; - DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number)); - - String query = "DROP TABLE IF EXISTS " + getQuotedTable(helping_table); - - const ClusterPtr & cluster_push = task_table.cluster_push; - Settings settings_push = task_cluster->settings_push; - - LOG_DEBUG(log, "Execute distributed DROP TABLE: {}", query); - /// We have to drop partition_piece on each replica - UInt64 num_nodes = executeQueryOnCluster( - cluster_push, query, - settings_push, - PoolMode::GET_MANY, - ClusterExecutionMode::ON_EACH_NODE); - - LOG_DEBUG(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes)); + dropHelpingTablesByPieceNumber(task_table, current_piece_number); } } - void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name) { LOG_DEBUG(log, "Try drop partition partition from all helping tables."); @@ -1586,7 +1647,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE); - LOG_DEBUG(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes)); + LOG_INFO(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes)); } LOG_DEBUG(log, "All helping tables dropped partition {}", partition_name); } diff --git a/programs/copier/ClusterCopier.h b/programs/copier/ClusterCopier.h index 9aff5493cf8..95bb54cf4e1 100644 --- a/programs/copier/ClusterCopier.h +++ b/programs/copier/ClusterCopier.h @@ -123,12 +123,13 @@ protected: bool tryDropPartitionPiece(ShardPartition & task_partition, const size_t current_piece_number, const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock); - static constexpr UInt64 max_table_tries = 1000; - static constexpr UInt64 max_shard_partition_tries = 600; - static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 100; + static constexpr UInt64 max_table_tries = 3; + static constexpr UInt64 max_shard_partition_tries = 3; + static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 3; bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table); + TaskStatus tryCreateDestinationTable(const ConnectionTimeouts & timeouts, TaskTable & task_table); /// Job for copying partition from particular shard. TaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, @@ -149,6 +150,8 @@ protected: void dropHelpingTables(const TaskTable & task_table); + void dropHelpingTablesByPieceNumber(const TaskTable & task_table, size_t current_piece_number); + /// Is used for usage less disk space. /// After all pieces were successfully moved to original destination /// table we can get rid of partition pieces (partitions in helping tables). diff --git a/programs/copier/TaskCluster.h b/programs/copier/TaskCluster.h index 5b28f461dd8..1a50597d07f 100644 --- a/programs/copier/TaskCluster.h +++ b/programs/copier/TaskCluster.h @@ -98,6 +98,7 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat set_default_value(settings_pull.max_block_size, 8192UL); set_default_value(settings_pull.preferred_block_size_bytes, 0); set_default_value(settings_push.insert_distributed_timeout, 0); + set_default_value(settings_push.replication_alter_partitions_sync, 2); } }