Ensure consistency when copier update status and current_partition_attach_is_done after partition attach is done

This commit is contained in:
lzydmxy 2022-11-24 11:15:44 +08:00
parent eea32a3beb
commit 0462132dd6

View File

@ -670,24 +670,30 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
}
/// Create node to signal that we finished moving
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
zookeeper->set(current_partition_attach_is_done, state_finished, 0);
/// Also increment a counter of processed partitions
{
const auto state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
const auto task_status = task_zookeeper_path + "/status";
/// Try until success
while (true)
{
Coordination::Stat stat;
auto status_json = zookeeper->get(task_zookeeper_path + "/status", &stat);
auto status_json = zookeeper->get(task_status, &stat);
auto statuses = StatusAccumulator::fromJSON(status_json);
/// Increment status for table.
auto status_for_table = (*statuses)[task_table.name_in_config];
status_for_table.processed_partitions_count += 1;
(*statuses)[task_table.name_in_config] = status_for_table;
(*statuses)[task_table.name_in_config].processed_partitions_count += 1;
auto statuses_to_commit = StatusAccumulator::serializeToJSON(statuses);
auto error = zookeeper->trySet(task_zookeeper_path + "/status", statuses_to_commit, stat.version, &stat);
if (error == Coordination::Error::ZOK)
Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(current_partition_attach_is_done, state_finished, 0));
ops.emplace_back(zkutil::makeSetRequest(task_status, statuses_to_commit, stat.version));
Coordination::Responses responses;
Coordination::Error code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
break;
}
}