Merge pull request #59929 from ClickHouse/Azure_write_buffer

Asynchronous WriteBuffer for AzureBlobStorage
This commit is contained in:
SmitaRKulkarni 2024-03-25 10:32:56 +01:00 committed by GitHub
commit a642f4d3ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 251 additions and 141 deletions

View File

@ -276,10 +276,9 @@ std::unique_ptr<WriteBuffer> BackupWriterAzureBlobStorage::writeFile(const Strin
return std::make_unique<WriteBufferFromAzureBlobStorage>(
client,
key,
settings->max_single_part_upload_size,
settings->max_unexpected_write_error_retries,
DBMS_DEFAULT_BUFFER_SIZE,
write_settings);
write_settings,
settings);
}
void BackupWriterAzureBlobStorage::removeFile(const String & file_name)

View File

@ -1,22 +1,18 @@
#include "config.h"
#if USE_AWS_S3
#include <IO/WriteBufferFromS3.h>
#include "BufferAllocationPolicy.h"
#include <memory>
namespace
namespace DB
{
class FixedSizeBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocationPolicy
class FixedSizeBufferAllocationPolicy : public BufferAllocationPolicy
{
const size_t buffer_size = 0;
size_t buffer_number = 0;
public:
explicit FixedSizeBufferAllocationPolicy(const DB::S3Settings::RequestSettings::PartUploadSettings & settings_)
: buffer_size(settings_.strict_upload_part_size)
explicit FixedSizeBufferAllocationPolicy(const BufferAllocationPolicy::Settings & settings_)
: buffer_size(settings_.strict_size)
{
chassert(buffer_size > 0);
}
@ -36,7 +32,7 @@ public:
};
class ExpBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocationPolicy
class ExpBufferAllocationPolicy : public DB::BufferAllocationPolicy
{
const size_t first_size = 0;
const size_t second_size = 0;
@ -49,12 +45,12 @@ class ExpBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocatio
size_t buffer_number = 0;
public:
explicit ExpBufferAllocationPolicy(const DB::S3Settings::RequestSettings::PartUploadSettings & settings_)
: first_size(std::max(settings_.max_single_part_upload_size, settings_.min_upload_part_size))
, second_size(settings_.min_upload_part_size)
, multiply_factor(settings_.upload_part_size_multiply_factor)
, multiply_threshold(settings_.upload_part_size_multiply_parts_count_threshold)
, max_size(settings_.max_upload_part_size)
explicit ExpBufferAllocationPolicy(const BufferAllocationPolicy::Settings & settings_)
: first_size(std::max(settings_.max_single_size, settings_.min_size))
, second_size(settings_.min_size)
, multiply_factor(settings_.multiply_factor)
, multiply_threshold(settings_.multiply_parts_count_threshold)
, max_size(settings_.max_size)
{
chassert(first_size > 0);
chassert(second_size > 0);
@ -92,16 +88,12 @@ public:
}
};
}
namespace DB
BufferAllocationPolicy::~BufferAllocationPolicy() = default;
BufferAllocationPolicyPtr BufferAllocationPolicy::create(BufferAllocationPolicy::Settings settings_)
{
WriteBufferFromS3::IBufferAllocationPolicy::~IBufferAllocationPolicy() = default;
WriteBufferFromS3::IBufferAllocationPolicyPtr WriteBufferFromS3::ChooseBufferPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_)
{
if (settings_.strict_upload_part_size > 0)
if (settings_.strict_size > 0)
return std::make_unique<FixedSizeBufferAllocationPolicy>(settings_);
else
return std::make_unique<ExpBufferAllocationPolicy>(settings_);
@ -109,4 +101,3 @@ WriteBufferFromS3::IBufferAllocationPolicyPtr WriteBufferFromS3::ChooseBufferPol
}
#endif

View File

@ -0,0 +1,39 @@
#pragma once
#include "config.h"
#include "logger_useful.h"
#include <list>
namespace DB
{
class BufferAllocationPolicy;
using BufferAllocationPolicyPtr = std::unique_ptr<BufferAllocationPolicy>;
/// Buffer number starts with 0
class BufferAllocationPolicy
{
public:
struct Settings
{
size_t strict_size = 0;
size_t min_size = 16 * 1024 * 1024;
size_t max_size = 5ULL * 1024 * 1024 * 1024;
size_t multiply_factor = 2;
size_t multiply_parts_count_threshold = 500;
size_t max_single_size = 32 * 1024 * 1024; /// Max size for a single buffer/block
};
virtual size_t getBufferNumber() const = 0;
virtual size_t getBufferSize() const = 0;
virtual void nextBuffer() = 0;
virtual ~BufferAllocationPolicy() = 0;
static BufferAllocationPolicyPtr create(Settings settings_);
};
}

View File

@ -1,8 +1,6 @@
#include "config.h"
#if USE_AWS_S3
#include <IO/WriteBufferFromS3TaskTracker.h>
#include "ThreadPoolTaskTracker.h"
namespace ProfileEvents
{
@ -12,19 +10,19 @@ namespace ProfileEvents
namespace DB
{
WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_, LogSeriesLimiterPtr limitedLog_)
TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_, LogSeriesLimiterPtr limitedLog_)
: is_async(bool(scheduler_))
, scheduler(scheduler_ ? std::move(scheduler_) : syncRunner())
, max_tasks_inflight(max_tasks_inflight_)
, limitedLog(limitedLog_)
{}
WriteBufferFromS3::TaskTracker::~TaskTracker()
TaskTracker::~TaskTracker()
{
safeWaitAll();
}
ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
ThreadPoolCallbackRunner<void> TaskTracker::syncRunner()
{
return [](Callback && callback, int64_t) mutable -> std::future<void>
{
@ -35,7 +33,7 @@ ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
};
}
void WriteBufferFromS3::TaskTracker::waitAll()
void TaskTracker::waitAll()
{
/// Exceptions are propagated
for (auto & future : futures)
@ -48,7 +46,7 @@ void WriteBufferFromS3::TaskTracker::waitAll()
finished_futures.clear();
}
void WriteBufferFromS3::TaskTracker::safeWaitAll()
void TaskTracker::safeWaitAll()
{
for (auto & future : futures)
{
@ -71,7 +69,7 @@ void WriteBufferFromS3::TaskTracker::safeWaitAll()
finished_futures.clear();
}
void WriteBufferFromS3::TaskTracker::waitIfAny()
void TaskTracker::waitIfAny()
{
if (futures.empty())
return;
@ -99,7 +97,7 @@ void WriteBufferFromS3::TaskTracker::waitIfAny()
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds());
}
void WriteBufferFromS3::TaskTracker::add(Callback && func)
void TaskTracker::add(Callback && func)
{
/// All this fuzz is about 2 things. This is the most critical place of TaskTracker.
/// The first is not to fail insertion in the list `futures`.
@ -134,7 +132,7 @@ void WriteBufferFromS3::TaskTracker::add(Callback && func)
waitTilInflightShrink();
}
void WriteBufferFromS3::TaskTracker::waitTilInflightShrink()
void TaskTracker::waitTilInflightShrink()
{
if (!max_tasks_inflight)
return;
@ -166,11 +164,10 @@ void WriteBufferFromS3::TaskTracker::waitTilInflightShrink()
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds());
}
bool WriteBufferFromS3::TaskTracker::isAsync() const
bool TaskTracker::isAsync() const
{
return is_async;
}
}
#endif

View File

@ -1,20 +1,16 @@
#pragma once
#include "config.h"
#include "threadPoolCallbackRunner.h"
#include "IO/WriteBufferFromS3.h"
#if USE_AWS_S3
#include "WriteBufferFromS3.h"
#include <Common/logger_useful.h>
#include "logger_useful.h"
#include <list>
namespace DB
{
/// That class is used only in WriteBufferFromS3 for now.
/// Therefore it declared as a part of WriteBufferFromS3.
/// TaskTracker takes a Callback which is run by scheduler in some external shared ThreadPool.
/// TaskTracker brings the methods waitIfAny, waitAll/safeWaitAll
/// to help with coordination of the running tasks.
@ -22,7 +18,7 @@ namespace DB
/// Basic exception safety is provided. If exception occurred the object has to be destroyed.
/// No thread safety is provided. Use this object with no concurrency.
class WriteBufferFromS3::TaskTracker
class TaskTracker
{
public:
using Callback = std::function<void()>;
@ -68,5 +64,3 @@ private:
};
}
#endif

View File

@ -78,11 +78,17 @@ class IColumn;
M(UInt64, distributed_connections_pool_size, 1024, "Maximum number of connections with one remote server in the pool.", 0) \
M(UInt64, connections_with_failover_max_tries, 3, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to S3 (some implementations does not supports variable size parts).", 0) \
M(UInt64, azure_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, azure_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, s3_upload_part_size_multiply_factor, 2, "Multiply s3_min_upload_part_size by this factor each time s3_multiply_parts_count_threshold parts were uploaded from a single write to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3, s3_min_upload_part_size is multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited. You ", 0) \
M(UInt64, azure_upload_part_size_multiply_factor, 2, "Multiply azure_min_upload_part_size by this factor each time azure_multiply_parts_count_threshold parts were uploaded from a single write to Azure blob storage.", 0) \
M(UInt64, azure_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to Azure blob storage, azure_min_upload_part_size is multiplied by azure_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited.", 0) \
M(UInt64, azure_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited.", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_single_part_copy_size, 256*1024*1024, "The maximum size of object to copy using single part copy to Azure blob storage.", 0) \

View File

@ -114,6 +114,12 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"output_format_parquet_compression_method", "lz4", "zstd", "Parquet/ORC/Arrow support many compression methods, including lz4 and zstd. ClickHouse supports each and every compression method. Some inferior tools, such as 'duckdb', lack support for the faster `lz4` compression method, that's why we set zstd by default."},
{"output_format_orc_compression_method", "lz4", "zstd", "Parquet/ORC/Arrow support many compression methods, including lz4 and zstd. ClickHouse supports each and every compression method. Some inferior tools, such as 'duckdb', lack support for the faster `lz4` compression method, that's why we set zstd by default."},
{"output_format_pretty_highlight_digit_groups", false, true, "If enabled and if output is a terminal, highlight every digit corresponding to the number of thousands, millions, etc. with underline."},
{"azure_max_inflight_parts_for_one_file", 20, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited."},
{"azure_strict_upload_part_size", 0, 0, "The exact size of part to upload during multipart upload to Azure blob storage."},
{"azure_min_upload_part_size", 16*1024*1024, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage."},
{"azure_max_upload_part_size", 5ull*1024*1024*1024, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to Azure blob storage."},
{"azure_upload_part_size_multiply_factor", 2, 2, "Multiply azure_min_upload_part_size by this factor each time azure_multiply_parts_count_threshold parts were uploaded from a single write to Azure blob storage."},
{"azure_upload_part_size_multiply_parts_count_threshold", 500, 500, "Each time this number of parts was uploaded to Azure blob storage, azure_min_upload_part_size is multiplied by azure_upload_part_size_multiply_factor."},
}},
{"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"},
{"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"},

View File

@ -18,21 +18,48 @@ namespace ProfileEvents
namespace DB
{
struct WriteBufferFromAzureBlobStorage::PartData
{
Memory<> memory;
size_t data_size = 0;
std::string block_id;
};
BufferAllocationPolicyPtr createBufferAllocationPolicy(const AzureObjectStorageSettings & settings)
{
BufferAllocationPolicy::Settings allocation_settings;
allocation_settings.strict_size = settings.strict_upload_part_size;
allocation_settings.min_size = settings.min_upload_part_size;
allocation_settings.max_size = settings.max_upload_part_size;
allocation_settings.multiply_factor = settings.upload_part_size_multiply_factor;
allocation_settings.multiply_parts_count_threshold = settings.upload_part_size_multiply_parts_count_threshold;
allocation_settings.max_single_size = settings.max_single_part_upload_size;
return BufferAllocationPolicy::create(allocation_settings);
}
WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & blob_path_,
size_t max_single_part_upload_size_,
size_t max_unexpected_write_error_retries_,
size_t buf_size_,
const WriteSettings & write_settings_)
const WriteSettings & write_settings_,
std::shared_ptr<const AzureObjectStorageSettings> settings_,
ThreadPoolCallbackRunner<void> schedule_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, log(getLogger("WriteBufferFromAzureBlobStorage"))
, max_single_part_upload_size(max_single_part_upload_size_)
, max_unexpected_write_error_retries(max_unexpected_write_error_retries_)
, buffer_allocation_policy(createBufferAllocationPolicy(*settings_))
, max_single_part_upload_size(settings_->max_single_part_upload_size)
, max_unexpected_write_error_retries(settings_->max_unexpected_write_error_retries)
, blob_path(blob_path_)
, write_settings(write_settings_)
, blob_container_client(blob_container_client_)
, task_tracker(
std::make_unique<TaskTracker>(
std::move(schedule_),
settings_->max_inflight_parts_for_one_file,
limitedLog))
{
allocateBuffer();
}
@ -77,62 +104,73 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
void WriteBufferFromAzureBlobStorage::finalizeImpl()
{
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
/// If there is only one block and size is less than or equal to max_single_part_upload_size
/// then we use single part upload instead of multi part upload
if (buffer_allocation_policy->getBufferNumber() == 1)
{
size_t data_size = size_t(position() - memory.data());
if (data_size <= max_single_part_upload_size)
{
Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(memory.data()), data_size);
execWithRetry([&](){ block_blob_client.Upload(memory_stream); }, max_unexpected_write_error_retries, data_size);
LOG_TRACE(log, "Committed single block for blob `{}`", blob_path);
return;
}
}
execWithRetry([this](){ next(); }, max_unexpected_write_error_retries);
if (tmp_buffer_write_offset > 0)
uploadBlock(tmp_buffer->data(), tmp_buffer_write_offset);
task_tracker->waitAll();
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
execWithRetry([&](){ block_blob_client.CommitBlockList(block_ids); }, max_unexpected_write_error_retries);
LOG_TRACE(log, "Committed {} blocks for blob `{}`", block_ids.size(), blob_path);
}
void WriteBufferFromAzureBlobStorage::uploadBlock(const char * data, size_t size)
{
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));
Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(data), size);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, max_unexpected_write_error_retries, size);
tmp_buffer_write_offset = 0;
LOG_TRACE(log, "Staged block (id: {}) of size {} (blob path: {}).", block_id, size, blob_path);
}
WriteBufferFromAzureBlobStorage::MemoryBufferPtr WriteBufferFromAzureBlobStorage::allocateBuffer() const
{
return std::make_unique<Memory<>>(max_single_part_upload_size);
}
void WriteBufferFromAzureBlobStorage::nextImpl()
{
size_t size_to_upload = offset();
task_tracker->waitIfAny();
writePart();
allocateBuffer();
}
if (size_to_upload == 0)
void WriteBufferFromAzureBlobStorage::allocateBuffer()
{
buffer_allocation_policy->nextBuffer();
auto size = buffer_allocation_policy->getBufferSize();
if (buffer_allocation_policy->getBufferNumber() == 1)
size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), size);
memory = Memory(size);
WriteBuffer::set(memory.data(), memory.size());
}
void WriteBufferFromAzureBlobStorage::writePart()
{
auto data_size = size_t(position() - memory.data());
if (data_size == 0)
return;
if (!tmp_buffer)
tmp_buffer = allocateBuffer();
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));
std::shared_ptr<PartData> part_data = std::make_shared<PartData>(std::move(memory), data_size, block_id);
WriteBuffer::set(nullptr, 0);
size_t uploaded_size = 0;
while (uploaded_size != size_to_upload)
auto upload_worker = [this, part_data] ()
{
size_t memory_buffer_remaining_size = max_single_part_upload_size - tmp_buffer_write_offset;
if (memory_buffer_remaining_size == 0)
uploadBlock(tmp_buffer->data(), tmp_buffer->size());
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
size_t size = std::min(memory_buffer_remaining_size, size_to_upload - uploaded_size);
memcpy(tmp_buffer->data() + tmp_buffer_write_offset, working_buffer.begin() + uploaded_size, size);
uploaded_size += size;
tmp_buffer_write_offset += size;
}
Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(part_data->memory.data()), part_data->data_size);
execWithRetry([&](){ block_blob_client.StageBlock(part_data->block_id, memory_stream); }, max_unexpected_write_error_retries, part_data->data_size);
if (tmp_buffer_write_offset == max_single_part_upload_size)
uploadBlock(tmp_buffer->data(), tmp_buffer->size());
if (write_settings.remote_throttler)
write_settings.remote_throttler->add(part_data->data_size, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);
};
if (write_settings.remote_throttler)
write_settings.remote_throttler->add(size_to_upload, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);
task_tracker->add(std::move(upload_worker));
}
}

View File

@ -11,7 +11,9 @@
#include <IO/WriteSettings.h>
#include <azure/storage/blobs.hpp>
#include <azure/core/io/body_stream.hpp>
#include <Common/ThreadPoolTaskTracker.h>
#include <Common/BufferAllocationPolicy.h>
#include <Storages/StorageAzureBlob.h>
namespace Poco
{
@ -21,6 +23,8 @@ class Logger;
namespace DB
{
class TaskTracker;
class WriteBufferFromAzureBlobStorage : public WriteBufferFromFileBase
{
public:
@ -29,10 +33,10 @@ public:
WriteBufferFromAzureBlobStorage(
AzureClientPtr blob_container_client_,
const String & blob_path_,
size_t max_single_part_upload_size_,
size_t max_unexpected_write_error_retries_,
size_t buf_size_,
const WriteSettings & write_settings_);
const WriteSettings & write_settings_,
std::shared_ptr<const AzureObjectStorageSettings> settings_,
ThreadPoolCallbackRunner<void> schedule_ = {});
~WriteBufferFromAzureBlobStorage() override;
@ -42,11 +46,19 @@ public:
void sync() override { next(); }
private:
struct PartData;
void writePart();
void allocateBuffer();
void finalizeImpl() override;
void execWithRetry(std::function<void()> func, size_t num_tries, size_t cost = 0);
void uploadBlock(const char * data, size_t size);
LoggerPtr log;
LogSeriesLimiterPtr limitedLog = std::make_shared<LogSeriesLimiter>(log, 1, 5);
BufferAllocationPolicyPtr buffer_allocation_policy;
const size_t max_single_part_upload_size;
const size_t max_unexpected_write_error_retries;
@ -61,6 +73,10 @@ private:
size_t tmp_buffer_write_offset = 0;
MemoryBufferPtr allocateBuffer() const;
bool first_buffer=true;
std::unique_ptr<TaskTracker> task_tracker;
};
}

View File

@ -212,17 +212,23 @@ std::unique_ptr<BlobContainerClient> getAzureBlobContainerClient(
std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context)
{
return std::make_unique<AzureObjectStorageSettings>(
config.getUInt64(config_prefix + ".max_single_part_upload_size", 100 * 1024 * 1024),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".max_single_read_retries", 3),
config.getInt(config_prefix + ".max_single_download_retries", 3),
config.getInt(config_prefix + ".list_object_keys_size", 1000),
config.getUInt64(config_prefix + ".max_upload_part_size", 5ULL * 1024 * 1024 * 1024),
config.getUInt64(config_prefix + ".max_single_part_copy_size", context->getSettings().azure_max_single_part_copy_size),
config.getBool(config_prefix + ".use_native_copy", false),
config.getUInt64(config_prefix + ".max_unexpected_write_error_retries", context->getSettings().azure_max_unexpected_write_error_retries)
);
std::unique_ptr<AzureObjectStorageSettings> settings = std::make_unique<AzureObjectStorageSettings>();
settings->max_single_part_upload_size = config.getUInt64(config_prefix + ".max_single_part_upload_size", context->getSettings().azure_max_single_part_upload_size);
settings->min_bytes_for_seek = config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024);
settings->max_single_read_retries = config.getInt(config_prefix + ".max_single_read_retries", 3);
settings->max_single_download_retries = config.getInt(config_prefix + ".max_single_download_retries", 3);
settings->list_object_keys_size = config.getInt(config_prefix + ".list_object_keys_size", 1000);
settings->min_upload_part_size = config.getUInt64(config_prefix + ".min_upload_part_size", context->getSettings().azure_min_upload_part_size);
settings->max_upload_part_size = config.getUInt64(config_prefix + ".max_upload_part_size", context->getSettings().azure_max_upload_part_size);
settings->max_single_part_copy_size = config.getUInt64(config_prefix + ".max_single_part_copy_size", context->getSettings().azure_max_single_part_copy_size);
settings->use_native_copy = config.getBool(config_prefix + ".use_native_copy", false);
settings->max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".max_unexpected_write_error_retries", context->getSettings().azure_max_unexpected_write_error_retries);
settings->max_inflight_parts_for_one_file = config.getUInt64(config_prefix + ".max_inflight_parts_for_one_file", context->getSettings().azure_max_inflight_parts_for_one_file);
settings->strict_upload_part_size = config.getUInt64(config_prefix + ".strict_upload_part_size", context->getSettings().azure_strict_upload_part_size);
settings->upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".upload_part_size_multiply_factor", context->getSettings().azure_upload_part_size_multiply_factor);
settings->upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".upload_part_size_multiply_parts_count_threshold", context->getSettings().azure_upload_part_size_multiply_parts_count_threshold);
return settings;
}
}

View File

@ -265,10 +265,9 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
return std::make_unique<WriteBufferFromAzureBlobStorage>(
client.get(),
object.remote_path,
settings.get()->max_single_part_upload_size,
settings.get()->max_unexpected_write_error_retries,
buf_size,
patchSettings(write_settings));
patchSettings(write_settings),
settings.get());
}
/// Remove file. Throws exception if file doesn't exists or it's a directory.

View File

@ -24,19 +24,29 @@ struct AzureObjectStorageSettings
int max_single_read_retries_,
int max_single_download_retries_,
int list_object_keys_size_,
size_t min_upload_part_size_,
size_t max_upload_part_size_,
size_t max_single_part_copy_size_,
bool use_native_copy_,
size_t max_unexpected_write_error_retries_)
size_t max_unexpected_write_error_retries_,
size_t max_inflight_parts_for_one_file_,
size_t strict_upload_part_size_,
size_t upload_part_size_multiply_factor_,
size_t upload_part_size_multiply_parts_count_threshold_)
: max_single_part_upload_size(max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
, max_single_read_retries(max_single_read_retries_)
, max_single_download_retries(max_single_download_retries_)
, list_object_keys_size(list_object_keys_size_)
, min_upload_part_size(min_upload_part_size_)
, max_upload_part_size(max_upload_part_size_)
, max_single_part_copy_size(max_single_part_copy_size_)
, use_native_copy(use_native_copy_)
, max_unexpected_write_error_retries (max_unexpected_write_error_retries_)
, max_unexpected_write_error_retries(max_unexpected_write_error_retries_)
, max_inflight_parts_for_one_file(max_inflight_parts_for_one_file_)
, strict_upload_part_size(strict_upload_part_size_)
, upload_part_size_multiply_factor(upload_part_size_multiply_factor_)
, upload_part_size_multiply_parts_count_threshold(upload_part_size_multiply_parts_count_threshold_)
{
}
@ -52,6 +62,10 @@ struct AzureObjectStorageSettings
size_t max_single_part_copy_size = 256 * 1024 * 1024;
bool use_native_copy = false;
size_t max_unexpected_write_error_retries = 4;
size_t max_inflight_parts_for_one_file = 20;
size_t strict_upload_part_size = 0;
size_t upload_part_size_multiply_factor = 2;
size_t upload_part_size_multiply_parts_count_threshold = 500;
};
using AzureClient = Azure::Storage::Blobs::BlobContainerClient;

View File

@ -4,8 +4,8 @@
#include "StdIStreamFromMemory.h"
#include "WriteBufferFromS3.h"
#include "WriteBufferFromS3TaskTracker.h"
#include <Common/ThreadPoolTaskTracker.h>
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/Throttler.h>
@ -72,6 +72,19 @@ struct WriteBufferFromS3::PartData
}
};
BufferAllocationPolicyPtr createBufferAllocationPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings)
{
BufferAllocationPolicy::Settings allocation_settings;
allocation_settings.strict_size = settings.strict_upload_part_size;
allocation_settings.min_size = settings.min_upload_part_size;
allocation_settings.max_size = settings.max_upload_part_size;
allocation_settings.multiply_factor = settings.upload_part_size_multiply_factor;
allocation_settings.multiply_parts_count_threshold = settings.upload_part_size_multiply_parts_count_threshold;
allocation_settings.max_single_size = settings.max_single_part_upload_size;
return BufferAllocationPolicy::create(allocation_settings);
}
WriteBufferFromS3::WriteBufferFromS3(
std::shared_ptr<const S3::Client> client_ptr_,
@ -91,9 +104,9 @@ WriteBufferFromS3::WriteBufferFromS3(
, write_settings(write_settings_)
, client_ptr(std::move(client_ptr_))
, object_metadata(std::move(object_metadata_))
, buffer_allocation_policy(ChooseBufferPolicy(upload_settings))
, buffer_allocation_policy(createBufferAllocationPolicy(upload_settings))
, task_tracker(
std::make_unique<WriteBufferFromS3::TaskTracker>(
std::make_unique<TaskTracker>(
std::move(schedule_),
upload_settings.max_inflight_parts_for_one_file,
limitedLog))
@ -320,14 +333,6 @@ void WriteBufferFromS3::detachBuffer()
detached_part_data.push_back({std::move(buf), data_size});
}
void WriteBufferFromS3::allocateFirstBuffer()
{
const auto max_first_buffer = buffer_allocation_policy->getBufferSize();
const auto size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), max_first_buffer);
memory = Memory(size);
WriteBuffer::set(memory.data(), memory.size());
}
void WriteBufferFromS3::allocateBuffer()
{
buffer_allocation_policy->nextBuffer();
@ -340,6 +345,14 @@ void WriteBufferFromS3::allocateBuffer()
WriteBuffer::set(memory.data(), memory.size());
}
void WriteBufferFromS3::allocateFirstBuffer()
{
const auto max_first_buffer = buffer_allocation_policy->getBufferSize();
const auto size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), max_first_buffer);
memory = Memory(size);
WriteBuffer::set(memory.data(), memory.size());
}
void WriteBufferFromS3::setFakeBufferWhenPreFinalized()
{
WriteBuffer::set(fake_buffer_when_prefinalized, sizeof(fake_buffer_when_prefinalized));

View File

@ -12,6 +12,8 @@
#include <Storages/StorageS3Settings.h>
#include <Common/threadPoolCallbackRunner.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Common/ThreadPoolTaskTracker.h>
#include <Common/BufferAllocationPolicy.h>
#include <memory>
#include <vector>
@ -26,6 +28,8 @@ namespace DB
* Data is divided on chunks with size greater than 'minimum_upload_part_size'. Last chunk can be less than this threshold.
* Each chunk is written as a part to S3.
*/
class TaskTracker;
class WriteBufferFromS3 final : public WriteBufferFromFileBase
{
public:
@ -46,18 +50,6 @@ public:
std::string getFileName() const override { return key; }
void sync() override { next(); }
class IBufferAllocationPolicy
{
public:
virtual size_t getBufferNumber() const = 0;
virtual size_t getBufferSize() const = 0;
virtual void nextBuffer() = 0;
virtual ~IBufferAllocationPolicy() = 0;
};
using IBufferAllocationPolicyPtr = std::unique_ptr<IBufferAllocationPolicy>;
static IBufferAllocationPolicyPtr ChooseBufferPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_);
private:
/// Receives response from the server after sending all data.
void finalizeImpl() override;
@ -67,10 +59,10 @@ private:
struct PartData;
void hidePartialData();
void allocateFirstBuffer();
void reallocateFirstBuffer();
void detachBuffer();
void allocateBuffer();
void allocateFirstBuffer();
void setFakeBufferWhenPreFinalized();
S3::UploadPartRequest getUploadRequest(size_t part_number, PartData & data);
@ -94,7 +86,7 @@ private:
LoggerPtr log = getLogger("WriteBufferFromS3");
LogSeriesLimiterPtr limitedLog = std::make_shared<LogSeriesLimiter>(log, 1, 5);
IBufferAllocationPolicyPtr buffer_allocation_policy;
BufferAllocationPolicyPtr buffer_allocation_policy;
/// Upload in S3 is made in parts.
/// We initiate upload, then upload each part and get ETag as a response, and then finalizeImpl() upload with listing all our parts.
@ -119,7 +111,6 @@ private:
size_t total_size = 0;
size_t hidden_size = 0;
class TaskTracker;
std::unique_ptr<TaskTracker> task_tracker;
BlobStorageLogWriterPtr blob_log;

View File

@ -37,6 +37,7 @@ def generate_cluster_def(port):
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<max_single_part_upload_size>100000</max_single_part_upload_size>
<min_upload_part_size>100000</min_upload_part_size>
<max_single_download_retries>10</max_single_download_retries>
<max_single_read_retries>10</max_single_read_retries>
</blob_storage_disk>