Updated lambda and name of BufferAllocationPolicy

This commit is contained in:
Smita Kulkarni 2024-03-21 11:10:54 +01:00
parent 9f2d44fc65
commit e0d14a1eaf
6 changed files with 28 additions and 29 deletions

View File

@ -5,13 +5,13 @@
namespace DB
{
class FixedSizeBufferAllocationPolicy : public IBufferAllocationPolicy
class FixedSizeBufferAllocationPolicy : public BufferAllocationPolicy
{
const size_t buffer_size = 0;
size_t buffer_number = 0;
public:
explicit FixedSizeBufferAllocationPolicy(const IBufferAllocationPolicy::Settings & settings_)
explicit FixedSizeBufferAllocationPolicy(const BufferAllocationPolicy::Settings & settings_)
: buffer_size(settings_.strict_size)
{
chassert(buffer_size > 0);
@ -32,7 +32,7 @@ public:
};
class ExpBufferAllocationPolicy : public DB::IBufferAllocationPolicy
class ExpBufferAllocationPolicy : public DB::BufferAllocationPolicy
{
const size_t first_size = 0;
const size_t second_size = 0;
@ -45,7 +45,7 @@ class ExpBufferAllocationPolicy : public DB::IBufferAllocationPolicy
size_t buffer_number = 0;
public:
explicit ExpBufferAllocationPolicy(const IBufferAllocationPolicy::Settings & settings_)
explicit ExpBufferAllocationPolicy(const BufferAllocationPolicy::Settings & settings_)
: first_size(std::max(settings_.max_single_size, settings_.min_size))
, second_size(settings_.min_size)
, multiply_factor(settings_.multiply_factor)
@ -89,9 +89,9 @@ public:
};
IBufferAllocationPolicy::~IBufferAllocationPolicy() = default;
BufferAllocationPolicy::~BufferAllocationPolicy() = default;
IBufferAllocationPolicy::IBufferAllocationPolicyPtr IBufferAllocationPolicy::create(IBufferAllocationPolicy::Settings settings_)
BufferAllocationPolicyPtr BufferAllocationPolicy::create(BufferAllocationPolicy::Settings settings_)
{
if (settings_.strict_size > 0)
return std::make_unique<FixedSizeBufferAllocationPolicy>(settings_);

View File

@ -9,8 +9,11 @@
namespace DB
{
class BufferAllocationPolicy;
using BufferAllocationPolicyPtr = std::unique_ptr<BufferAllocationPolicy>;
/// Buffer number starts with 0
class IBufferAllocationPolicy
class BufferAllocationPolicy
{
public:
@ -24,15 +27,12 @@ public:
size_t max_single_size = 32 * 1024 * 1024; /// Max size for a single buffer/block
};
virtual size_t getBufferNumber() const = 0;
virtual size_t getBufferSize() const = 0;
virtual void nextBuffer() = 0;
virtual ~IBufferAllocationPolicy() = 0;
virtual ~BufferAllocationPolicy() = 0;
using IBufferAllocationPolicyPtr = std::unique_ptr<IBufferAllocationPolicy>;
static IBufferAllocationPolicyPtr create(Settings settings_);
static BufferAllocationPolicyPtr create(Settings settings_);
};

View File

@ -22,11 +22,12 @@ struct WriteBufferFromAzureBlobStorage::PartData
{
Memory<> memory;
size_t data_size = 0;
std::string block_id;
};
IBufferAllocationPolicy::IBufferAllocationPolicyPtr createBufferAllocationPolicy(const AzureObjectStorageSettings & settings)
BufferAllocationPolicyPtr createBufferAllocationPolicy(const AzureObjectStorageSettings & settings)
{
IBufferAllocationPolicy::Settings allocation_settings;
BufferAllocationPolicy::Settings allocation_settings;
allocation_settings.strict_size = settings.strict_upload_part_size;
allocation_settings.min_size = settings.min_upload_part_size;
allocation_settings.max_size = settings.max_upload_part_size;
@ -34,7 +35,7 @@ IBufferAllocationPolicy::IBufferAllocationPolicyPtr createBufferAllocationPolicy
allocation_settings.multiply_parts_count_threshold = settings.upload_part_size_multiply_parts_count_threshold;
allocation_settings.max_single_size = settings.max_single_part_upload_size;
return IBufferAllocationPolicy::create(allocation_settings);
return BufferAllocationPolicy::create(allocation_settings);
}
WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
@ -146,21 +147,20 @@ void WriteBufferFromAzureBlobStorage::allocateBuffer()
void WriteBufferFromAzureBlobStorage::writePart()
{
std::shared_ptr<PartData> part_data;
auto data_size = size_t(position() - memory.data());
part_data = std::make_shared<PartData>(std::move(memory), data_size);
WriteBuffer::set(nullptr, 0);
if (part_data->data_size == 0)
if (data_size == 0)
return;
auto upload_worker = [&, part_data] ()
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));
std::shared_ptr<PartData> part_data = std::make_shared<PartData>(std::move(memory), data_size, block_id);
WriteBuffer::set(nullptr, 0);
auto upload_worker = [this, part_data] ()
{
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));
Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(part_data->memory.data()), part_data->data_size);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, max_unexpected_write_error_retries, part_data->data_size);
execWithRetry([&](){ block_blob_client.StageBlock(part_data->block_id, memory_stream); }, max_unexpected_write_error_retries, part_data->data_size);
if (write_settings.remote_throttler)
write_settings.remote_throttler->add(part_data->data_size, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);

View File

@ -58,7 +58,7 @@ private:
LoggerPtr log;
LogSeriesLimiterPtr limitedLog = std::make_shared<LogSeriesLimiter>(log, 1, 5);
IBufferAllocationPolicy::IBufferAllocationPolicyPtr buffer_allocation_policy;
BufferAllocationPolicyPtr buffer_allocation_policy;
const size_t max_single_part_upload_size;
const size_t max_unexpected_write_error_retries;

View File

@ -74,9 +74,9 @@ struct WriteBufferFromS3::PartData
}
};
IBufferAllocationPolicy::IBufferAllocationPolicyPtr createBufferAllocationPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings)
BufferAllocationPolicyPtr createBufferAllocationPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings)
{
IBufferAllocationPolicy::Settings allocation_settings;
BufferAllocationPolicy::Settings allocation_settings;
allocation_settings.strict_size = settings.strict_upload_part_size;
allocation_settings.min_size = settings.min_upload_part_size;
allocation_settings.max_size = settings.max_upload_part_size;
@ -84,7 +84,7 @@ IBufferAllocationPolicy::IBufferAllocationPolicyPtr createBufferAllocationPolicy
allocation_settings.multiply_parts_count_threshold = settings.upload_part_size_multiply_parts_count_threshold;
allocation_settings.max_single_size = settings.max_single_part_upload_size;
return IBufferAllocationPolicy::create(allocation_settings);
return BufferAllocationPolicy::create(allocation_settings);
}

View File

@ -85,7 +85,7 @@ private:
LoggerPtr log = getLogger("WriteBufferFromS3");
LogSeriesLimiterPtr limitedLog = std::make_shared<LogSeriesLimiter>(log, 1, 5);
IBufferAllocationPolicy::IBufferAllocationPolicyPtr buffer_allocation_policy;
BufferAllocationPolicyPtr buffer_allocation_policy;
/// Upload in S3 is made in parts.
/// We initiate upload, then upload each part and get ETag as a response, and then finalizeImpl() upload with listing all our parts.
@ -109,7 +109,6 @@ private:
size_t total_size = 0;
size_t hidden_size = 0;
// class TaskTracker;
std::unique_ptr<TaskTracker> task_tracker;
BlobStorageLogWriterPtr blob_log;