Try fix azure tests

This commit is contained in:
kssenii 2022-09-23 15:24:10 +02:00
parent 0eeb2de074
commit 3725397040
3 changed files with 55 additions and 42 deletions

View File

@ -16,14 +16,13 @@ WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
const String & blob_path_, const String & blob_path_,
size_t max_single_part_upload_size_, size_t max_single_part_upload_size_,
size_t buf_size_, size_t buf_size_,
const WriteSettings & write_settings_, const WriteSettings & write_settings_)
std::optional<std::map<std::string, std::string>> attributes_)
: BufferWithOwnMemory<WriteBuffer>(buf_size_, nullptr, 0) : BufferWithOwnMemory<WriteBuffer>(buf_size_, nullptr, 0)
, blob_container_client(blob_container_client_) , log(&Poco::Logger::get("WriteBufferFromAzureBlobStorage"))
, max_single_part_upload_size(max_single_part_upload_size_) , max_single_part_upload_size(max_single_part_upload_size_)
, blob_path(blob_path_) , blob_path(blob_path_)
, write_settings(write_settings_) , write_settings(write_settings_)
, attributes(attributes_) , blob_container_client(blob_container_client_)
{ {
} }
@ -33,63 +32,72 @@ WriteBufferFromAzureBlobStorage::~WriteBufferFromAzureBlobStorage()
finalize(); finalize();
} }
void WriteBufferFromAzureBlobStorage::finalizeImpl() void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func, size_t num_tries)
{ {
if (attributes.has_value()) auto can_retry_exception = [&]<typename Exception>(const Exception & e, size_t i) -> bool
{ {
auto blob_client = blob_container_client->GetBlobClient(blob_path); if (i == num_tries - 1)
Azure::Storage::Metadata metadata; return false;
for (const auto & [key, value] : *attributes)
metadata[key] = value;
blob_client.SetMetadata(metadata);
}
const size_t max_tries = 3; LOG_DEBUG(log, "Write at attempt {} for blob `{}` failed: {}", i + 1, blob_path, e.Message);
for (size_t i = 0; i < max_tries; ++i) return true;
};
for (size_t i = 0; i < num_tries; ++i)
{ {
try try
{ {
next(); func();
break; break;
} }
catch (const Azure::Core::Http::TransportException & e)
{
if (!can_retry_exception(e, i))
throw;
}
catch (const Azure::Core::RequestFailedException & e) catch (const Azure::Core::RequestFailedException & e)
{ {
if (i == max_tries - 1) if (!can_retry_exception(e, i))
throw; throw;
LOG_INFO(&Poco::Logger::get("WriteBufferFromAzureBlobStorage"),
"Exception caught during finalizing azure storage write at attempt {}: {}", i + 1, e.Message);
} }
} }
} }
void WriteBufferFromAzureBlobStorage::finalizeImpl()
{
execWithRetry([this](){ next(); }, 3);
}
void WriteBufferFromAzureBlobStorage::nextImpl() void WriteBufferFromAzureBlobStorage::nextImpl()
{ {
if (!offset()) if (!offset())
return; return;
auto * buffer_begin = working_buffer.begin(); char * buffer_begin = working_buffer.begin();
auto len = offset(); size_t total_size = offset();
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path); auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
size_t read = 0; size_t current_size = 0;
std::vector<std::string> block_ids; std::vector<std::string> block_ids;
while (read < len)
while (current_size < total_size)
{ {
auto part_len = std::min(len - read, max_single_part_upload_size); size_t part_len = std::min(total_size - current_size, max_single_part_upload_size);
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));
auto block_id = getRandomASCIIString(64); Azure::Core::IO::MemoryBodyStream tmp_buffer(reinterpret_cast<uint8_t *>(buffer_begin + current_size), part_len);
block_ids.push_back(block_id); execWithRetry([&block_blob_client, &block_id, &tmp_buffer](){ block_blob_client.StageBlock(block_id, tmp_buffer); }, 3);
Azure::Core::IO::MemoryBodyStream tmp_buffer(reinterpret_cast<uint8_t *>(buffer_begin + read), part_len); current_size += part_len;
block_blob_client.StageBlock(block_id, tmp_buffer); LOG_TRACE(log, "Staged block (id: {}) of size {} (written {}/{}, blob path: {}).", block_id, part_len, current_size, total_size, blob_path);
read += part_len;
} }
block_blob_client.CommitBlockList(block_ids); execWithRetry([&block_blob_client, &block_ids](){ block_blob_client.CommitBlockList(block_ids); }, 3);
LOG_TRACE(log, "Commited {} blocks for blob `{}`", block_ids.size(), blob_path);
if (write_settings.remote_throttler) if (write_settings.remote_throttler)
write_settings.remote_throttler->add(read); write_settings.remote_throttler->add(total_size);
} }
} }

View File

@ -13,20 +13,25 @@
#include <azure/core/io/body_stream.hpp> #include <azure/core/io/body_stream.hpp>
namespace Poco
{
class Logger;
}
namespace DB namespace DB
{ {
class WriteBufferFromAzureBlobStorage : public BufferWithOwnMemory<WriteBuffer> class WriteBufferFromAzureBlobStorage : public BufferWithOwnMemory<WriteBuffer>
{ {
public: public:
using AzureClientPtr = std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient>;
WriteBufferFromAzureBlobStorage( WriteBufferFromAzureBlobStorage(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_, AzureClientPtr blob_container_client_,
const String & blob_path_, const String & blob_path_,
size_t max_single_part_upload_size_, size_t max_single_part_upload_size_,
size_t buf_size_, size_t buf_size_,
const WriteSettings & write_settings_, const WriteSettings & write_settings_);
std::optional<std::map<std::string, std::string>> attributes_ = {});
~WriteBufferFromAzureBlobStorage() override; ~WriteBufferFromAzureBlobStorage() override;
@ -34,12 +39,15 @@ public:
private: private:
void finalizeImpl() override; void finalizeImpl() override;
void execWithRetry(std::function<void()> func, size_t num_tries);
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client; Poco::Logger * log;
size_t max_single_part_upload_size;
const String blob_path; const size_t max_single_part_upload_size;
WriteSettings write_settings; const std::string blob_path;
std::optional<std::map<std::string, std::string>> attributes; const WriteSettings write_settings;
AzureClientPtr blob_container_client;
}; };
} }

View File

@ -4,9 +4,6 @@ import os
import pytest import pytest
# FIXME Test is temporarily disabled due to flakyness
# https://github.com/ClickHouse/ClickHouse/issues/39700
pytestmark = pytest.mark.skip pytestmark = pytest.mark.skip
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster