Merge pull request #43602 from lzydmxy/fix_bug_in_copier

Ensure consistency when copier update `status` and `attach_is_done`
This commit is contained in:
Kruglov Pavel 2022-12-07 13:47:56 +01:00 committed by GitHub
commit d4cd53ccea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -670,24 +670,30 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
}
/// Create node to signal that we finished moving
/// Also increment a counter of processed partitions
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
zookeeper->set(current_partition_attach_is_done, state_finished, 0);
/// Also increment a counter of processed partitions
const auto state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
const auto task_status = task_zookeeper_path + "/status";
/// Try until success
while (true)
{
Coordination::Stat stat;
auto status_json = zookeeper->get(task_zookeeper_path + "/status", &stat);
auto status_json = zookeeper->get(task_status, &stat);
auto statuses = StatusAccumulator::fromJSON(status_json);
/// Increment status for table.
auto status_for_table = (*statuses)[task_table.name_in_config];
status_for_table.processed_partitions_count += 1;
(*statuses)[task_table.name_in_config] = status_for_table;
(*statuses)[task_table.name_in_config].processed_partitions_count += 1;
auto statuses_to_commit = StatusAccumulator::serializeToJSON(statuses);
auto error = zookeeper->trySet(task_zookeeper_path + "/status", statuses_to_commit, stat.version, &stat);
if (error == Coordination::Error::ZOK)
Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(current_partition_attach_is_done, state_finished, 0));
ops.emplace_back(zkutil::makeSetRequest(task_status, statuses_to_commit, stat.version));
Coordination::Responses responses;
Coordination::Error code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
break;
}
}