Merge pull request #62608 from CurtizJ/azure-retries

Better retries in azure sdk
This commit is contained in:
Anton Popov 2024-04-19 12:01:42 +00:00 committed by GitHub
commit f09d0b6536
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 91 additions and 26 deletions

View File

@ -138,6 +138,7 @@
#if USE_AZURE_BLOB_STORAGE
# include <azure/storage/common/internal/xml_wrapper.hpp>
# include <azure/core/diagnostics/logger.hpp>
#endif
#include <incbin.h>
@ -612,6 +613,45 @@ static void sanityChecks(Server & server)
}
}
static void initializeAzureSDKLogger(
[[ maybe_unused ]] const ServerSettings & server_settings,
[[ maybe_unused ]] int server_logs_level)
{
#if USE_AZURE_BLOB_STORAGE
if (!server_settings.enable_azure_sdk_logging)
return;
using AzureLogsLevel = Azure::Core::Diagnostics::Logger::Level;
static const std::unordered_map<AzureLogsLevel, std::pair<Poco::Message::Priority, DB::LogsLevel>> azure_to_server_mapping =
{
{AzureLogsLevel::Error, {Poco::Message::PRIO_DEBUG, LogsLevel::debug}},
{AzureLogsLevel::Warning, {Poco::Message::PRIO_DEBUG, LogsLevel::debug}},
{AzureLogsLevel::Informational, {Poco::Message::PRIO_TRACE, LogsLevel::trace}},
{AzureLogsLevel::Verbose, {Poco::Message::PRIO_TEST, LogsLevel::test}},
};
static const std::map<Poco::Message::Priority, AzureLogsLevel> server_to_azure_mapping =
{
{Poco::Message::PRIO_DEBUG, AzureLogsLevel::Warning},
{Poco::Message::PRIO_TRACE, AzureLogsLevel::Informational},
{Poco::Message::PRIO_TEST, AzureLogsLevel::Verbose},
};
static const LoggerPtr azure_sdk_logger = getLogger("AzureSDK");
auto it = server_to_azure_mapping.lower_bound(static_cast<Poco::Message::Priority>(server_logs_level));
chassert(it != server_to_azure_mapping.end());
Azure::Core::Diagnostics::Logger::SetLevel(it->second);
Azure::Core::Diagnostics::Logger::SetListener([](AzureLogsLevel level, const std::string & message)
{
auto [poco_level, db_level] = azure_to_server_mapping.at(level);
LOG_IMPL(azure_sdk_logger, db_level, poco_level, fmt::runtime(message));
});
#endif
}
int Server::main(const std::vector<std::string> & /*args*/)
try
{
@ -1860,6 +1900,7 @@ try
/// Build loggers before tables startup to make log messages from tables
/// attach available in system.text_log
buildLoggers(config(), logger());
initializeAzureSDKLogger(server_settings, logger().getLevel());
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper);
attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA));

View File

@ -140,6 +140,7 @@ namespace DB
M(UInt64, http_connections_store_limit, 5000, "Connections above this limit reset after use. Set to 0 to turn connection cache off. The limit applies to the http connections which do not belong to any disk or storage.", 0) \
M(UInt64, global_profiler_real_time_period_ns, 0, "Period for real clock timer of global profiler (in nanoseconds). Set 0 value to turn off the real clock global profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, global_profiler_cpu_time_period_ns, 0, "Period for CPU clock timer of global profiler (in nanoseconds). Set 0 value to turn off the CPU clock global profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(Bool, enable_azure_sdk_logging, false, "Enables logging from Azure sdk", 0) \
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp

View File

@ -4,8 +4,8 @@
#include <Common/Exception.h>
#include <Common/re2.h>
#include <optional>
#include <azure/identity/managed_identity_credential.hpp>
#include <azure/core/http/curl_transport.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
@ -138,35 +138,34 @@ AzureBlobStorageEndpoint processAzureBlobStorageEndpoint(const Poco::Util::Abstr
template <class T>
std::unique_ptr<T> getClientWithConnectionString(const String & connection_str, const String & container_name) = delete;
std::unique_ptr<T> getClientWithConnectionString(const String & connection_str, const String & container_name, const BlobClientOptions & client_options) = delete;
template<>
std::unique_ptr<BlobServiceClient> getClientWithConnectionString(
const String & connection_str, const String & /*container_name*/)
std::unique_ptr<BlobServiceClient> getClientWithConnectionString(const String & connection_str, const String & /*container_name*/, const BlobClientOptions & client_options)
{
return std::make_unique<BlobServiceClient>(BlobServiceClient::CreateFromConnectionString(connection_str));
return std::make_unique<BlobServiceClient>(BlobServiceClient::CreateFromConnectionString(connection_str, client_options));
}
template<>
std::unique_ptr<BlobContainerClient> getClientWithConnectionString(
const String & connection_str, const String & container_name)
std::unique_ptr<BlobContainerClient> getClientWithConnectionString(const String & connection_str, const String & container_name, const BlobClientOptions & client_options)
{
return std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(connection_str, container_name));
return std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(connection_str, container_name, client_options));
}
template <class T>
std::unique_ptr<T> getAzureBlobStorageClientWithAuth(
const String & url, const String & container_name, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
const String & url,
const String & container_name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Azure::Storage::Blobs::BlobClientOptions & client_options)
{
std::string connection_str;
if (config.has(config_prefix + ".connection_string"))
connection_str = config.getString(config_prefix + ".connection_string");
if (!connection_str.empty())
return getClientWithConnectionString<T>(connection_str, container_name);
return getClientWithConnectionString<T>(connection_str, container_name, client_options);
if (config.has(config_prefix + ".account_key") && config.has(config_prefix + ".account_name"))
{
@ -174,38 +173,63 @@ std::unique_ptr<T> getAzureBlobStorageClientWithAuth(
config.getString(config_prefix + ".account_name"),
config.getString(config_prefix + ".account_key")
);
return std::make_unique<T>(url, storage_shared_key_credential);
return std::make_unique<T>(url, storage_shared_key_credential, client_options);
}
auto managed_identity_credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>();
return std::make_unique<T>(url, managed_identity_credential);
return std::make_unique<T>(url, managed_identity_credential, client_options);
}
Azure::Storage::Blobs::BlobClientOptions getAzureBlobClientOptions(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
Azure::Core::Http::Policies::RetryOptions retry_options;
retry_options.MaxRetries = config.getUInt(config_prefix + ".max_tries", 10);
retry_options.RetryDelay = std::chrono::milliseconds(config.getUInt(config_prefix + ".retry_initial_backoff_ms", 10));
retry_options.MaxRetryDelay = std::chrono::milliseconds(config.getUInt(config_prefix + ".retry_max_backoff_ms", 1000));
std::unique_ptr<BlobContainerClient> getAzureBlobContainerClient(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
using CurlOptions = Azure::Core::Http::CurlTransportOptions;
CurlOptions curl_options{.NoSignal = true};
if (config.has(config_prefix + ".curl_ip_resolve"))
{
auto value = config.getString(config_prefix + ".curl_ip_resolve");
if (value == "ipv4")
curl_options.IPResolve = CurlOptions::CURL_IPRESOLVE_V4;
else if (value == "ipv6")
curl_options.IPResolve = CurlOptions::CURL_IPRESOLVE_V6;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected value for option 'curl_ip_resolve': {}. Expected one of 'ipv4' or 'ipv6'", value);
}
Azure::Storage::Blobs::BlobClientOptions client_options;
client_options.Retry = retry_options;
client_options.Transport.Transport = std::make_shared<Azure::Core::Http::CurlTransport>(curl_options);
return client_options;
}
std::unique_ptr<BlobContainerClient> getAzureBlobContainerClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
auto endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
auto container_name = endpoint.container_name;
auto final_url = endpoint.getEndpoint();
auto client_options = getAzureBlobClientOptions(config, config_prefix);
if (endpoint.container_already_exists.value_or(false))
return getAzureBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
return getAzureBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix, client_options);
auto blob_service_client = getAzureBlobStorageClientWithAuth<BlobServiceClient>(
endpoint.getEndpointWithoutContainer(), container_name, config, config_prefix);
auto blob_service_client = getAzureBlobStorageClientWithAuth<BlobServiceClient>(endpoint.getEndpointWithoutContainer(), container_name, config, config_prefix, client_options);
try
{
return std::make_unique<BlobContainerClient>(
blob_service_client->CreateBlobContainer(container_name).Value);
return std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(container_name).Value);
}
catch (const Azure::Storage::StorageException & e)
{
/// If container_already_exists is not set (in config), ignore already exists error.
/// (Conflict - The specified container already exists)
if (!endpoint.container_already_exists.has_value() && e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
return getAzureBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
return getAzureBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix, client_options);
throw;
}
}

View File

@ -45,12 +45,11 @@ struct AzureBlobStorageEndpoint
}
};
std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> getAzureBlobContainerClient(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> getAzureBlobContainerClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
AzureBlobStorageEndpoint processAzureBlobStorageEndpoint(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /*context*/);
std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
}