work with comments on PR

This commit is contained in:
Sema Checherinda 2023-05-15 14:41:51 +02:00
parent 22f7aa8d89
commit dccdb3e678
8 changed files with 72 additions and 83 deletions

View File

@ -1,27 +1,24 @@
#include "config.h"
#if USE_AWS_S3
#include <IO/WriteBufferFromS3MemoryStream.h>
#include <IO/StdIStreamFromMemory.h>
namespace DB
{
MemoryStream::MemoryBuf::MemoryBuf(char * begin_, size_t size_)
StdIStreamFromMemory::MemoryBuf::MemoryBuf(char * begin_, size_t size_)
: begin(begin_)
, size(size_)
{
this->setg(begin, begin, begin + size);
}
MemoryStream::MemoryBuf::int_type MemoryStream::MemoryBuf::underflow()
StdIStreamFromMemory::MemoryBuf::int_type StdIStreamFromMemory::MemoryBuf::underflow()
{
if (gptr() < egptr())
return traits_type::to_int_type(*gptr());
return traits_type::eof();
}
MemoryStream::MemoryBuf::pos_type MemoryStream::MemoryBuf::seekoff(off_type off, std::ios_base::seekdir way,
StdIStreamFromMemory::MemoryBuf::pos_type
StdIStreamFromMemory::MemoryBuf::seekoff(off_type off, std::ios_base::seekdir way,
std::ios_base::openmode mode)
{
bool out_mode = (std::ios_base::out & mode) != 0;
@ -49,13 +46,13 @@ MemoryStream::MemoryBuf::pos_type MemoryStream::MemoryBuf::seekoff(off_type off,
return pos_type(ret);
}
MemoryStream::MemoryBuf::pos_type MemoryStream::MemoryBuf::seekpos(pos_type sp,
StdIStreamFromMemory::MemoryBuf::pos_type StdIStreamFromMemory::MemoryBuf::seekpos(pos_type sp,
std::ios_base::openmode mode)
{
return seekoff(off_type(sp), std::ios_base::beg, mode);
}
MemoryStream::MemoryStream(char * begin_, size_t size_)
StdIStreamFromMemory::StdIStreamFromMemory(char * begin_, size_t size_)
: std::iostream(nullptr)
, mem_buf(begin_, size_)
{
@ -63,6 +60,3 @@ MemoryStream::MemoryStream(char * begin_, size_t size_)
}
}
#endif

View File

@ -1,15 +1,15 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <iostream>
namespace DB
{
struct MemoryStream: std::iostream
/// StdIStreamFromMemory is used in WriteBufferFromS3 as a stream which is passed to the S3::Client
/// It provides istream interface (only reading) over the memory.
/// However S3::Client requires iostream interface it only reads from the stream
class StdIStreamFromMemory : public std::iostream
{
struct MemoryBuf: std::streambuf
{
@ -27,11 +27,10 @@ struct MemoryStream: std::iostream
size_t size = 0;
};
MemoryStream(char * begin_, size_t size_);
MemoryBuf mem_buf;
public:
StdIStreamFromMemory(char * begin_, size_t size_);
};
}
#endif

View File

@ -2,8 +2,8 @@
#if USE_AWS_S3
#include "StdIStreamFromMemory.h"
#include "WriteBufferFromS3.h"
#include "WriteBufferFromS3MemoryStream.h"
#include "WriteBufferFromS3TaskTracker.h"
#include <Common/logger_useful.h>
@ -63,7 +63,7 @@ struct WriteBufferFromS3::PartData
std::shared_ptr<std::iostream> createAwsBuffer()
{
auto buffer = std::make_shared<MemoryStream>(memory.data(), data_size);
auto buffer = std::make_shared<StdIStreamFromMemory>(memory.data(), data_size);
buffer->exceptions(std::ios::badbit);
return buffer;
}
@ -108,7 +108,7 @@ void WriteBufferFromS3::nextImpl()
"Cannot write to prefinalized buffer for S3, the file could have been created with PutObjectRequest");
/// Make sense to call to before adding new async task to check if there is an exception
task_tracker->getReady();
task_tracker->waitReady();
hidePartialData();
@ -132,7 +132,7 @@ void WriteBufferFromS3::preFinalize()
LOG_TRACE(log, "preFinalize WriteBufferFromS3. {}", getLogDetails());
task_tracker->getReady();
task_tracker->waitReady();
hidePartialData();
@ -178,7 +178,7 @@ void WriteBufferFromS3::finalizeImpl()
chassert(offset() == 0);
chassert(hidden_size == 0);
task_tracker->getAll();
task_tracker->waitAll();
if (!multipart_upload_id.empty())
{
@ -266,10 +266,10 @@ void WriteBufferFromS3::reallocateFirstBuffer()
{
chassert(offset() == 0);
if (buffer_allocation_policy->getNumber() > 1 || available() > 0)
if (buffer_allocation_policy->getBufferNumber() > 1 || available() > 0)
return;
const size_t max_first_buffer = buffer_allocation_policy->getSize();
const size_t max_first_buffer = buffer_allocation_policy->getBufferSize();
if (memory.size() == max_first_buffer)
return;
@ -299,7 +299,7 @@ void WriteBufferFromS3::detachBuffer()
void WriteBufferFromS3::allocateFirstBuffer()
{
const auto max_first_buffer = buffer_allocation_policy->getSize();
const auto max_first_buffer = buffer_allocation_policy->getBufferSize();
const auto size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), max_first_buffer);
memory = Memory(size);
WriteBuffer::set(memory.data(), memory.size());
@ -309,16 +309,16 @@ void WriteBufferFromS3::allocateFirstBuffer()
void WriteBufferFromS3::allocateBuffer()
{
buffer_allocation_policy->next();
buffer_allocation_policy->nextBuffer();
chassert(0 == hidden_size);
if (buffer_allocation_policy->getNumber() == 1)
if (buffer_allocation_policy->getBufferNumber() == 1)
return allocateFirstBuffer();
memory = Memory(buffer_allocation_policy->getSize());
memory = Memory(buffer_allocation_policy->getBufferSize());
WriteBuffer::set(memory.data(), memory.size());
LOG_TRACE(log, "Allocated buffer with size {}. {}", buffer_allocation_policy->getSize(), getLogDetails());
LOG_TRACE(log, "Allocated buffer with size {}. {}", buffer_allocation_policy->getBufferSize(), getLogDetails());
}
void WriteBufferFromS3::setFakeBufferWhenPreFinalized()

View File

@ -8,7 +8,6 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <IO/WriteBufferFromS3BufferAllocationPolicy.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
@ -41,6 +40,19 @@ public:
void nextImpl() override;
void preFinalize() override;
public:
class IBufferAllocationPolicy
{
public:
virtual size_t getBufferNumber() const = 0;
virtual size_t getBufferSize() const = 0;
virtual void nextBuffer() = 0;
virtual ~IBufferAllocationPolicy() = 0;
};
using IBufferAllocationPolicyPtr = std::unique_ptr<IBufferAllocationPolicy>;
static IBufferAllocationPolicyPtr ChooseBufferPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_);
private:
/// Receives response from the server after sending all data.
void finalizeImpl() override;

View File

@ -2,38 +2,41 @@
#if USE_AWS_S3
#include <IO/WriteBufferFromS3BufferAllocationPolicy.h>
#include <IO/WriteBufferFromS3.h>
#include <memory>
namespace
{
struct FixedSizeBufferAllocationPolicy : DB::IBufferAllocationPolicy
class FixedSizeBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocationPolicy
{
const size_t size = 0;
const size_t buffer_size = 0;
size_t buffer_number = 0;
public:
explicit FixedSizeBufferAllocationPolicy(const DB::S3Settings::RequestSettings::PartUploadSettings & settings_)
: size(settings_.strict_upload_part_size)
: buffer_size(settings_.strict_upload_part_size)
{
chassert(size > 0);
chassert(buffer_size > 0);
}
size_t getNumber() const override { return buffer_number; }
size_t getBufferNumber() const override { return buffer_number; }
size_t getSize() const override
size_t getBufferSize() const override
{
chassert(buffer_number > 0);
return size;
return buffer_size;
}
void next() override
void nextBuffer() override
{
++buffer_number;
}
};
struct ExpBufferAllocationPolicy : DB::IBufferAllocationPolicy
class ExpBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocationPolicy
{
const size_t first_size = 0;
const size_t second_size = 0;
@ -45,6 +48,7 @@ struct ExpBufferAllocationPolicy : DB::IBufferAllocationPolicy
size_t current_size = 0;
size_t buffer_number = 0;
public:
explicit ExpBufferAllocationPolicy(const DB::S3Settings::RequestSettings::PartUploadSettings & settings_)
: first_size(std::max(settings_.max_single_part_upload_size, settings_.min_upload_part_size))
, second_size(settings_.min_upload_part_size)
@ -59,15 +63,15 @@ struct ExpBufferAllocationPolicy : DB::IBufferAllocationPolicy
chassert(max_size > 0);
}
size_t getNumber() const override { return buffer_number; }
size_t getBufferNumber() const override { return buffer_number; }
size_t getSize() const override
size_t getBufferSize() const override
{
chassert(buffer_number > 0);
return current_size;
}
void next() override
void nextBuffer() override
{
++buffer_number;
@ -93,9 +97,9 @@ struct ExpBufferAllocationPolicy : DB::IBufferAllocationPolicy
namespace DB
{
IBufferAllocationPolicy::~IBufferAllocationPolicy() = default;
WriteBufferFromS3::IBufferAllocationPolicy::~IBufferAllocationPolicy() = default;
IBufferAllocationPolicyPtr ChooseBufferPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_)
WriteBufferFromS3::IBufferAllocationPolicyPtr WriteBufferFromS3::ChooseBufferPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_)
{
if (settings_.strict_upload_part_size > 0)
return std::make_unique<FixedSizeBufferAllocationPolicy>(settings_);

View File

@ -1,26 +0,0 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <Storages/StorageS3Settings.h>
namespace DB
{
struct IBufferAllocationPolicy
{
virtual size_t getNumber() const = 0;
virtual size_t getSize() const = 0;
virtual void next() = 0;
virtual ~IBufferAllocationPolicy() = 0;
};
using IBufferAllocationPolicyPtr = std::unique_ptr<IBufferAllocationPolicy>;
IBufferAllocationPolicyPtr ChooseBufferPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_);
}
#endif

View File

@ -28,9 +28,9 @@ ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
};
}
void WriteBufferFromS3::TaskTracker::getReady()
void WriteBufferFromS3::TaskTracker::waitReady()
{
LOG_TEST(log, "getReady, in queue {}", futures.size());
LOG_TEST(log, "waitReady, in queue {}", futures.size());
/// Exceptions are propagated
auto it = futures.begin();
@ -55,12 +55,12 @@ void WriteBufferFromS3::TaskTracker::getReady()
it = futures.erase(it);
}
LOG_TEST(log, "getReady ended, in queue {}", futures.size());
LOG_TEST(log, "waitReady ended, in queue {}", futures.size());
}
void WriteBufferFromS3::TaskTracker::getAll()
void WriteBufferFromS3::TaskTracker::waitAll()
{
LOG_TEST(log, "getAll, in queue {}", futures.size());
LOG_TEST(log, "waitAll, in queue {}", futures.size());
/// Exceptions are propagated
for (auto & future : futures)

View File

@ -9,6 +9,12 @@
namespace DB
{
/// That class is used only in WriteBufferFromS3 for now.
/// Therefore it declared as a part of WriteBufferFromS3.
/// TaskTracker takes a Callback which is run by scheduler in some external shared ThreadPool.
/// TaskTracker brings the methods waitReady, waitAll/safeWaitAll
/// to help with coordination of the running tasks.
class WriteBufferFromS3::TaskTracker
{
public:
@ -20,8 +26,8 @@ public:
static ThreadPoolCallbackRunner<void> syncRunner();
bool isAsync() const;
void getReady();
void getAll();
void waitReady();
void waitAll();
void safeWaitAll();
void add(Callback && func);