Backport #70984 to 24.8: Fix logical error in storage s3queue

This commit is contained in:
robot-clickhouse 2024-10-29 17:09:04 +00:00
parent 427cc2ac39
commit 63a2858f40
2 changed files with 70 additions and 21 deletions

View File

@ -383,10 +383,10 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
/// In one zookeeper transaction do the following:
enum RequestType
{
SET_MAX_PROCESSED_PATH = 0,
CHECK_PROCESSING_ID_PATH = 1, /// Optional.
REMOVE_PROCESSING_ID_PATH = 2, /// Optional.
REMOVE_PROCESSING_PATH = 3, /// Optional.
CHECK_PROCESSING_ID_PATH = 0,
REMOVE_PROCESSING_ID_PATH = 1,
REMOVE_PROCESSING_PATH = 2,
SET_MAX_PROCESSED_PATH = 3,
};
const auto zk_client = getZooKeeper();
@ -411,8 +411,18 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
return;
}
bool unexpected_error = false;
if (Coordination::isHardwareError(code))
failure_reason = "Lost connection to keeper";
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
failure_reason = "Version of processing id node changed";
else if (is_request_failed(REMOVE_PROCESSING_PATH))
{
/// Remove processing_id node should not actually fail
/// because we just checked in a previous keeper request that it exists and has a certain version.
unexpected_error = true;
failure_reason = "Failed to remove processing id path";
}
else if (is_request_failed(SET_MAX_PROCESSED_PATH))
{
LOG_TRACE(log, "Cannot set file {} as processed. "
@ -420,13 +430,12 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
"Will retry.", path, code);
continue;
}
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
failure_reason = "Version of processing id node changed";
else if (is_request_failed(REMOVE_PROCESSING_PATH))
failure_reason = "Failed to remove processing path";
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", code);
if (unexpected_error)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}", failure_reason);
LOG_WARNING(log, "Cannot set file {} as processed: {}. Reason: {}", path, code, failure_reason);
return;
}

View File

@ -103,29 +103,46 @@ void ObjectStorageQueueUnorderedFileMetadata::setProcessedImpl()
/// In one zookeeper transaction do the following:
enum RequestType
{
SET_MAX_PROCESSED_PATH = 0,
CHECK_PROCESSING_ID_PATH = 1, /// Optional.
REMOVE_PROCESSING_ID_PATH = 2, /// Optional.
REMOVE_PROCESSING_PATH = 3, /// Optional.
CHECK_PROCESSING_ID_PATH,
REMOVE_PROCESSING_ID_PATH,
REMOVE_PROCESSING_PATH,
SET_PROCESSED_PATH,
};
const auto zk_client = getZooKeeper();
std::string failure_reason;
Coordination::Requests requests;
requests.push_back(
zkutil::makeCreateRequest(
processed_node_path, node_metadata.toString(), zkutil::CreateMode::Persistent));
std::map<RequestType, UInt8> request_index;
if (processing_id_version.has_value())
{
requests.push_back(zkutil::makeCheckRequest(processing_node_id_path, processing_id_version.value()));
requests.push_back(zkutil::makeRemoveRequest(processing_node_id_path, processing_id_version.value()));
requests.push_back(zkutil::makeRemoveRequest(processing_node_path, -1));
/// The order is important:
/// we must first check processing nodes and set processed_path the last.
request_index[CHECK_PROCESSING_ID_PATH] = 0;
request_index[REMOVE_PROCESSING_ID_PATH] = 1;
request_index[REMOVE_PROCESSING_PATH] = 2;
request_index[SET_PROCESSED_PATH] = 3;
}
else
{
request_index[SET_PROCESSED_PATH] = 0;
}
requests.push_back(
zkutil::makeCreateRequest(
processed_node_path, node_metadata.toString(), zkutil::CreateMode::Persistent));
Coordination::Responses responses;
auto is_request_failed = [&](RequestType type) { return responses[type]->error != Coordination::Error::ZOK; };
auto is_request_failed = [&](RequestType type)
{
if (!request_index.contains(type))
return false;
chassert(request_index[type] < responses.size());
return responses[request_index[type]]->error != Coordination::Error::ZOK;
};
const auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
@ -140,18 +157,41 @@ void ObjectStorageQueueUnorderedFileMetadata::setProcessedImpl()
return;
}
bool unexpected_error = false;
std::string failure_reason;
if (Coordination::isHardwareError(code))
{
failure_reason = "Lost connection to keeper";
else if (is_request_failed(SET_MAX_PROCESSED_PATH))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot create a persistent node in /processed since it already exists");
}
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
{
/// This is normal in case of expired session with keeper.
failure_reason = "Version of processing id node changed";
}
else if (is_request_failed(REMOVE_PROCESSING_ID_PATH))
{
/// Remove processing_id node should not actually fail
/// because we just checked in a previous keeper request that it exists and has a certain version.
unexpected_error = true;
failure_reason = "Failed to remove processing id path";
}
else if (is_request_failed(REMOVE_PROCESSING_PATH))
{
/// This is normal in case of expired session with keeper as this node is ephemeral.
failure_reason = "Failed to remove processing path";
}
else if (is_request_failed(SET_PROCESSED_PATH))
{
unexpected_error = true;
failure_reason = "Cannot create a persistent node in /processed since it already exists";
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", code);
if (unexpected_error)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}", failure_reason);
LOG_WARNING(log, "Cannot set file {} as processed: {}. Reason: {}", path, code, failure_reason);
}