diff --git a/dbms/programs/copier/ClusterCopier.cpp b/dbms/programs/copier/ClusterCopier.cpp index 6eb0ffcfac9..45b386a7503 100644 --- a/dbms/programs/copier/ClusterCopier.cpp +++ b/dbms/programs/copier/ClusterCopier.cpp @@ -1065,6 +1065,26 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl( throw; } + + /// Exit if current piece is absent on this shard. Also mark it as finished, because we will check + /// whether each shard have processed each partitition (and its pieces). + if (partition_piece.is_absent_piece) + { + std::cout << "current partition piece is clean?? " << is_clean << std::endl; + std::cout << "######" << "Partition " << task_partition.name + << " piece " + toString(current_piece_number) + " IS ABSENT ON CURRENT SHARD" << std::endl; + std::cout << "current_task_piece_status_path " << current_task_piece_status_path << std::endl; + String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id); + auto res = zookeeper->tryCreate(current_task_piece_status_path, state_finished, zkutil::CreateMode::Persistent); + if (res == Coordination::ZNODEEXISTS) + LOG_DEBUG(log, "Partition " << task_partition.name << " piece " + + toString(current_piece_number) + " is absent on current replica of a shard. But other replica has already marked it as done."); + if (res == Coordination::ZOK) + LOG_DEBUG(log, "Partition " << task_partition.name << " piece " + + toString(current_piece_number) + " is absent on current replica of a shard. Will mark it as done. Other replicas will do the same."); + return PartitionTaskStatus::Finished; + } + /// Exit if task has been already processed; /// create blocking node to signal cleaning up if it is abandoned { @@ -1376,8 +1396,6 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl( /// TODO: LOG_INFO (Piece copied and moved to destination table) - - return PartitionTaskStatus::Finished; }