Fix logical error "Cannot create a persistent node in /processed since it already exists"

This commit is contained in:
kssenii 2024-10-23 18:26:43 +02:00
parent ac766ae2d8
commit dbc705710f
2 changed files with 70 additions and 21 deletions

View File

@ -381,14 +381,15 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
/// In one zookeeper transaction do the following:
enum RequestType
{
SET_MAX_PROCESSED_PATH = 0,
CHECK_PROCESSING_ID_PATH = 1, /// Optional.
REMOVE_PROCESSING_ID_PATH = 2, /// Optional.
REMOVE_PROCESSING_PATH = 3, /// Optional.
CHECK_PROCESSING_ID_PATH = 0,
REMOVE_PROCESSING_ID_PATH = 1,
REMOVE_PROCESSING_PATH = 2,
SET_MAX_PROCESSED_PATH = 3,
};
const auto zk_client = getZooKeeper();
std::string failure_reason;
std::map<RequestType, UInt8> request_id;
while (true)
{
@ -409,8 +410,18 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
return;
}
bool unexpected_error = false;
if (Coordination::isHardwareError(code))
failure_reason = "Lost connection to keeper";
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
failure_reason = "Version of processing id node changed";
else if (is_request_failed(REMOVE_PROCESSING_PATH))
{
/// Remove processing_id node should not actually fail
/// because we just checked in a previous keeper request that it exists and has a certain version.
unexpected_error = true;
failure_reason = "Failed to remove processing id path";
}
else if (is_request_failed(SET_MAX_PROCESSED_PATH))
{
LOG_TRACE(log, "Cannot set file {} as processed. "
@ -418,13 +429,12 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
"Will retry.", path, code);
continue;
}
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
failure_reason = "Version of processing id node changed";
else if (is_request_failed(REMOVE_PROCESSING_PATH))
failure_reason = "Failed to remove processing path";
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", code);
if (unexpected_error)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}", failure_reason);
LOG_WARNING(log, "Cannot set file {} as processed: {}. Reason: {}", path, code, failure_reason);
return;
}

View File

@ -103,29 +103,45 @@ void ObjectStorageQueueUnorderedFileMetadata::setProcessedImpl()
/// In one zookeeper transaction do the following:
enum RequestType
{
SET_MAX_PROCESSED_PATH = 0,
CHECK_PROCESSING_ID_PATH = 1, /// Optional.
REMOVE_PROCESSING_ID_PATH = 2, /// Optional.
REMOVE_PROCESSING_PATH = 3, /// Optional.
CHECK_PROCESSING_ID_PATH,
REMOVE_PROCESSING_ID_PATH,
REMOVE_PROCESSING_PATH,
SET_PROCESSED_PATH,
};
const auto zk_client = getZooKeeper();
std::string failure_reason;
Coordination::Requests requests;
requests.push_back(
zkutil::makeCreateRequest(
processed_node_path, node_metadata.toString(), zkutil::CreateMode::Persistent));
std::map<RequestType, UInt8> request_id;
if (processing_id_version.has_value())
{
requests.push_back(zkutil::makeCheckRequest(processing_node_id_path, processing_id_version.value()));
requests.push_back(zkutil::makeRemoveRequest(processing_node_id_path, processing_id_version.value()));
requests.push_back(zkutil::makeRemoveRequest(processing_node_path, -1));
/// The order is important:
/// we must first check processing nodes and set processed_path the last.
request_id[CHECK_PROCESSING_ID_PATH] = 0;
request_id[REMOVE_PROCESSING_ID_PATH] = 1;
request_id[REMOVE_PROCESSING_PATH] = 2;
request_id[SET_PROCESSED_PATH] = 3;
}
else
{
request_id[SET_PROCESSED_PATH] = 0;
}
requests.push_back(
zkutil::makeCreateRequest(
processed_node_path, node_metadata.toString(), zkutil::CreateMode::Persistent));
Coordination::Responses responses;
auto is_request_failed = [&](RequestType type) { return responses[type]->error != Coordination::Error::ZOK; };
auto is_request_failed = [&](RequestType type)
{
if (!request_id.contains(type))
return false;
return responses[request_id[type]]->error != Coordination::Error::ZOK;
};
const auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
@ -140,18 +156,41 @@ void ObjectStorageQueueUnorderedFileMetadata::setProcessedImpl()
return;
}
bool unexpected_error = false;
std::string failure_reason;
if (Coordination::isHardwareError(code))
{
failure_reason = "Lost connection to keeper";
else if (is_request_failed(SET_MAX_PROCESSED_PATH))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot create a persistent node in /processed since it already exists");
}
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
{
/// This is normal in case of expired session with keeper.
failure_reason = "Version of processing id node changed";
}
else if (is_request_failed(REMOVE_PROCESSING_ID_PATH))
{
/// Remove processing_id node should not actually fail
/// because we just checked in a previous keeper request that it exists and has a certain version.
unexpected_error = true;
failure_reason = "Failed to remove processing id path";
}
else if (is_request_failed(REMOVE_PROCESSING_PATH))
{
/// This is normal in case of expired session with keeper as this node is ephemeral.
failure_reason = "Failed to remove processing path";
}
else if (is_request_failed(SET_PROCESSED_PATH))
{
unexpected_error = true;
failure_reason = "Cannot create a persistent node in /processed since it already exists";
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", code);
if (unexpected_error)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}", failure_reason);
LOG_WARNING(log, "Cannot set file {} as processed: {}. Reason: {}", path, code, failure_reason);
}