Backport #62306 to 24.3: Better handling of errors from azure storage

This commit is contained in:
robot-clickhouse 2024-09-11 11:07:30 +00:00
parent 7035fddfa5
commit bdb4dd02f9
10 changed files with 161 additions and 85 deletions

View File

@ -3,6 +3,7 @@
#if USE_AZURE_BLOB_STORAGE
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <IO/AzureBlobStorage/isRetryableAzureException.h>
#include <IO/ReadBufferFromString.h>
#include <Common/logger_useful.h>
#include <Common/Throttler.h>
@ -101,18 +102,6 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
size_t sleep_time_with_backoff_milliseconds = 100;
auto handle_exception = [&, this](const auto & e, size_t i)
{
LOG_DEBUG(log, "Exception caught during Azure Read for file {} at attempt {}/{}: {}", path, i + 1, max_single_read_retries, e.Message);
if (i + 1 == max_single_read_retries)
throw;
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
initialized = false;
initialize();
};
for (size_t i = 0; i < max_single_read_retries; ++i)
{
try
@ -124,7 +113,14 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
}
catch (const Azure::Core::RequestFailedException & e)
{
handle_exception(e, i);
LOG_DEBUG(log, "Exception caught during Azure Read for file {} at attempt {}/{}: {}", path, i + 1, max_single_read_retries, e.Message);
if (i + 1 == max_single_read_retries || !isRetryableAzureException(e))
throw;
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
initialized = false;
initialize();
}
}
@ -213,16 +209,6 @@ void ReadBufferFromAzureBlobStorage::initialize()
size_t sleep_time_with_backoff_milliseconds = 100;
auto handle_exception = [&, this](const auto & e, size_t i)
{
LOG_DEBUG(log, "Exception caught during Azure Download for file {} at offset {} at attempt {}/{}: {}", path, offset, i + 1, max_single_download_retries, e.Message);
if (i + 1 == max_single_download_retries)
throw;
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
};
for (size_t i = 0; i < max_single_download_retries; ++i)
{
try
@ -233,7 +219,12 @@ void ReadBufferFromAzureBlobStorage::initialize()
}
catch (const Azure::Core::RequestFailedException & e)
{
handle_exception(e,i);
LOG_DEBUG(log, "Exception caught during Azure Download for file {} at offset {} at attempt {}/{}: {}", path, offset, i + 1, max_single_download_retries, e.Message);
if (i + 1 == max_single_download_retries || !isRetryableAzureException(e))
throw;
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
}
}
@ -283,7 +274,7 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran
catch (const Azure::Core::RequestFailedException & e)
{
LOG_DEBUG(log, "Exception caught during Azure Download for file {} at offset {} at attempt {}/{}: {}", path, offset, i + 1, max_single_download_retries, e.Message);
if (i + 1 == max_single_download_retries)
if (i + 1 == max_single_download_retries || !isRetryableAzureException(e))
throw;
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);

View File

@ -3,6 +3,7 @@
#if USE_AZURE_BLOB_STORAGE
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
#include <IO/AzureBlobStorage/isRetryableAzureException.h>
#include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
#include <Common/Throttler.h>
@ -70,17 +71,6 @@ WriteBufferFromAzureBlobStorage::~WriteBufferFromAzureBlobStorage()
void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func, size_t num_tries, size_t cost)
{
auto handle_exception = [&, this](const auto & e, size_t i)
{
if (cost)
write_settings.resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it
if (i == num_tries - 1)
throw;
LOG_DEBUG(log, "Write at attempt {} for blob `{}` failed: {} {}", i + 1, blob_path, e.what(), e.Message);
};
for (size_t i = 0; i < num_tries; ++i)
{
try
@ -91,7 +81,13 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
}
catch (const Azure::Core::RequestFailedException & e)
{
handle_exception(e, i);
if (cost)
write_settings.resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it
if (i == num_tries - 1 || !isRetryableAzureException(e))
throw;
LOG_DEBUG(log, "Write at attempt {} for blob `{}` failed: {} {}", i + 1, blob_path, e.what(), e.Message);
}
catch (...)
{

View File

@ -0,0 +1,21 @@
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <IO/AzureBlobStorage/isRetryableAzureException.h>
namespace DB
{
bool isRetryableAzureException(const Azure::Core::RequestFailedException & e)
{
/// Always retry transport errors.
if (dynamic_cast<const Azure::Core::Http::TransportException *>(&e))
return true;
/// Retry other 5xx errors just in case.
return e.StatusCode >= Azure::Core::Http::HttpStatusCode::InternalServerError;
}
}
#endif

View File

@ -0,0 +1,14 @@
#pragma once
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <azure/core/http/http.hpp>
namespace DB
{
bool isRetryableAzureException(const Azure::Core::RequestFailedException & e);
}
#endif

View File

@ -8,7 +8,6 @@
#include <Backups/BackupEntryWrappedWith.h>
#include <Backups/IBackup.h>
#include <Backups/RestorerFromBackup.h>
#include "Common/logger_useful.h"
#include <Common/Config/ConfigHelper.h>
#include <Common/CurrentMetrics.h>
#include <Common/Increment.h>
@ -1310,7 +1309,8 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
/// during loading, such as "not enough memory" or network error.
if (isRetryableException(std::current_exception()))
throw;
LOG_DEBUG(log, "Failed to load data part {}, unknown exception", part_name);
LOG_DEBUG(log, "Failed to load data part {} with exception: {}", part_name, getExceptionMessage(std::current_exception(), false));
mark_broken();
return res;
}
@ -1341,6 +1341,7 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
/// during loading, such as "not enough memory" or network error.
if (isRetryableException(std::current_exception()))
throw;
mark_broken();
return res;
}
@ -1459,25 +1460,9 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPartWithRetries(
if (try_no + 1 == max_tries)
throw;
String exception_message;
try
{
rethrow_exception(exception_ptr);
}
catch (const Exception & e)
{
exception_message = e.message();
}
#if USE_AZURE_BLOB_STORAGE
catch (const Azure::Core::RequestFailedException & e)
{
exception_message = e.Message;
}
#endif
LOG_DEBUG(log, "Failed to load data part {} at try {} with retryable error: {}. Will retry in {} ms",
part_name, try_no, exception_message, initial_backoff_ms);
LOG_DEBUG(log,
"Failed to load data part {} at try {} with retryable error: {}. Will retry in {} ms",
part_name, try_no, getExceptionMessage(exception_ptr, false), initial_backoff_ms);
std::this_thread::sleep_for(std::chrono::milliseconds(initial_backoff_ms));
initial_backoff_ms = std::min(initial_backoff_ms * 2, max_backoff_ms);

View File

@ -1,5 +1,4 @@
#include <Poco/Logger.h>
#include <algorithm>
#include <optional>
#include <Poco/DirectoryIterator.h>
@ -16,11 +15,9 @@
#include <Common/CurrentMetrics.h>
#include <Common/SipHash.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <IO/AzureBlobStorage/isRetryableAzureException.h>
#include <Poco/Net/NetException.h>
#if USE_AZURE_BLOB_STORAGE
#include <azure/core/http/http.hpp>
#endif
namespace CurrentMetrics
{
@ -68,25 +65,22 @@ bool isRetryableException(std::exception_ptr exception_ptr)
#if USE_AWS_S3
catch (const S3Exception & s3_exception)
{
if (s3_exception.isRetryableError())
return true;
return s3_exception.isRetryableError();
}
#endif
#if USE_AZURE_BLOB_STORAGE
catch (const Azure::Core::RequestFailedException &)
catch (const Azure::Core::RequestFailedException & e)
{
return true;
return isRetryableAzureException(e);
}
#endif
catch (const ErrnoException & e)
{
if (e.getErrno() == EMFILE)
return true;
return e.getErrno() == EMFILE;
}
catch (const Coordination::Exception & e)
catch (const Coordination::Exception & e)
{
if (Coordination::isHardwareError(e.code))
return true;
return Coordination::isHardwareError(e.code);
}
catch (const Exception & e)
{
@ -104,10 +98,12 @@ bool isRetryableException(std::exception_ptr exception_ptr)
{
return true;
}
/// In fact, there can be other similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
return false;
catch (...)
{
/// In fact, there can be other similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
return false;
}
}

View File

@ -432,7 +432,8 @@ AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration co
try
{
result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value);
} catch (const Azure::Storage::StorageException & e)
}
catch (const Azure::Storage::StorageException & e)
{
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict
&& e.ReasonPhrase == "The specified container already exists.")

View File

@ -66,11 +66,11 @@ def cluster():
def azure_query(
node, query, expect_error="false", try_num=10, settings={}, query_on_retry=None
node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None
):
for i in range(try_num):
try:
if expect_error == "true":
if expect_error:
return node.query_and_get_error(query, settings=settings)
else:
return node.query(query, settings=settings)

View File

@ -714,7 +714,7 @@ def test_endpoint_error_check(cluster):
"""
expected_err_msg = "Expected container_name in endpoint"
assert expected_err_msg in azure_query(node, query, expect_error="true")
assert expected_err_msg in azure_query(node, query, expect_error=True)
query = f"""
DROP TABLE IF EXISTS test SYNC;
@ -731,7 +731,7 @@ def test_endpoint_error_check(cluster):
"""
expected_err_msg = "Expected account_name in endpoint"
assert expected_err_msg in azure_query(node, query, expect_error="true")
assert expected_err_msg in azure_query(node, query, expect_error=True)
query = f"""
DROP TABLE IF EXISTS test SYNC;
@ -748,4 +748,76 @@ def test_endpoint_error_check(cluster):
"""
expected_err_msg = "Expected container_name in endpoint"
assert expected_err_msg in azure_query(node, query, expect_error="true")
assert expected_err_msg in azure_query(node, query, expect_error=True)
def get_azure_client(container_name, port):
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
return blob_service_client.get_container_client(container_name)
def test_azure_broken_parts(cluster):
node = cluster.instances[NODE_NAME]
account_name = "devstoreaccount1"
container_name = "cont5"
port = cluster.azurite_port
query = f"""
DROP TABLE IF EXISTS t_azure_broken_parts SYNC;
CREATE TABLE t_azure_broken_parts (a Int32)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(
type = azure_blob_storage,
endpoint = 'http://azurite1:{port}/{account_name}/{container_name}',
endpoint_contains_account_name = 'true',
account_name = 'devstoreaccount1',
account_key = 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==',
skip_access_check = 0), min_bytes_for_wide_part = 0, min_bytes_for_full_part_storage = 0;
INSERT INTO t_azure_broken_parts VALUES (1);
"""
azure_query(node, query)
result = azure_query(node, "SELECT count() FROM t_azure_broken_parts").strip()
assert int(result) == 1
result = azure_query(
node,
"SELECT count() FROM system.detached_parts WHERE table = 't_azure_broken_parts'",
).strip()
assert int(result) == 0
data_path = azure_query(
node,
"SELECT data_paths[1] FROM system.tables WHERE name = 't_azure_broken_parts'",
).strip()
remote_path = azure_query(
node,
f"SELECT remote_path FROM system.remote_data_paths WHERE path || local_path = '{data_path}' || 'all_1_1_0/columns.txt'",
).strip()
client = get_azure_client(container_name, port)
client.delete_blob(remote_path)
azure_query(node, "DETACH TABLE t_azure_broken_parts")
azure_query(node, "ATTACH TABLE t_azure_broken_parts")
result = azure_query(node, "SELECT count() FROM t_azure_broken_parts").strip()
assert int(result) == 0
result = azure_query(
node,
"SELECT count() FROM system.detached_parts WHERE table = 't_azure_broken_parts'",
).strip()
assert int(result) == 1

View File

@ -36,11 +36,11 @@ def cluster():
def azure_query(
node, query, expect_error="false", try_num=10, settings={}, query_on_retry=None
node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None
):
for i in range(try_num):
try:
if expect_error == "true":
if expect_error:
return node.query_and_get_error(query, settings=settings)
else:
return node.query(query, settings=settings)
@ -793,7 +793,7 @@ def test_read_from_not_existing_container(cluster):
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto')"
)
expected_err_msg = "container does not exist"
assert expected_err_msg in azure_query(node, query, expect_error="true")
assert expected_err_msg in azure_query(node, query, expect_error=True)
def test_function_signatures(cluster):
@ -966,7 +966,7 @@ def test_union_schema_inference_mode(cluster):
error = azure_query(
node,
f"desc azureBlobStorage('{storage_account_url}', 'cont', 'test_union_schema_inference*.jsonl', '{account_name}', '{account_key}', 'auto', 'auto', 'auto') settings schema_inference_mode='union', describe_compact_output=1 format TSV",
expect_error="true",
expect_error=True,
)
assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in error