Fix link order issue of WriteBufferFromS3

(cherry picked from commit f22b09f4fc)
This commit is contained in:
Amos Bird 2022-02-01 16:19:26 +08:00 committed by Nikolai Kochetov
parent 2a6eb593be
commit e29c77f793
3 changed files with 31 additions and 31 deletions

View File

@ -9,6 +9,7 @@
#include <boost/algorithm/string.hpp>
#include <base/scope_guard_safe.h>
#include <base/unit.h>
#include <base/FnTraits.h>
@ -262,7 +263,20 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
LOG_TRACE(log, "{} to file by path: {}. S3 path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + s3_path);
ThreadPool * writer = &getThreadPoolWriter();
ScheduleFunc schedule = [](auto && callback)
{
getThreadPoolWriter().scheduleOrThrow([&callback, thread_group = CurrentThread::getGroup()]()
{
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
callback();
});
};
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
settings->client,
@ -272,7 +286,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
settings->s3_max_single_part_upload_size,
std::move(object_metadata),
buf_size,
writer);
std::move(schedule));
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>>(std::move(s3_buffer), std::move(metadata), s3_path);
}

View File

@ -11,7 +11,6 @@
# include <aws/s3/model/PutObjectRequest.h>
# include <aws/s3/model/UploadPartRequest.h>
# include <base/logger_useful.h>
# include <base/scope_guard_safe.h>
# include <utility>
@ -58,7 +57,7 @@ WriteBufferFromS3::WriteBufferFromS3(
size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_,
size_t buffer_size_,
ThreadPool * thread_pool_)
ScheduleFunc && schedule_)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, bucket(bucket_)
, key(key_)
@ -66,7 +65,7 @@ WriteBufferFromS3::WriteBufferFromS3(
, client_ptr(std::move(client_ptr_))
, minimum_upload_part_size(minimum_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, thread_pool(thread_pool_)
, schedule(std::move(schedule_))
{
allocateBuffer();
}
@ -175,7 +174,7 @@ void WriteBufferFromS3::writePart()
LOG_WARNING(log, "Maximum part number in S3 protocol has reached (too many parts). Server may not accept this whole upload.");
}
if (thread_pool)
if (schedule)
{
UploadPartTask * task = nullptr;
int part_number;
@ -187,16 +186,8 @@ void WriteBufferFromS3::writePart()
}
fillUploadRequest(task->req, part_number);
thread_pool->scheduleOrThrow([this, task, thread_group = CurrentThread::getGroup()]()
schedule([this, task]()
{
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
try
{
processUploadRequest(*task);
@ -283,7 +274,7 @@ void WriteBufferFromS3::completeMultipartUpload()
void WriteBufferFromS3::makeSinglepartUpload()
{
auto size = temporary_buffer->tellp();
bool with_pool = thread_pool != nullptr;
bool with_pool = bool(schedule);
LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}, Size: {}, WithPool: {}", bucket, key, size, with_pool);
@ -296,20 +287,12 @@ void WriteBufferFromS3::makeSinglepartUpload()
return;
}
if (thread_pool)
if (schedule)
{
put_object_task = std::make_unique<PutObjectTask>();
fillPutRequest(put_object_task->req);
thread_pool->scheduleOrThrow([this, thread_group = CurrentThread::getGroup()]()
schedule([this]()
{
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
try
{
processPutRequest(*put_object_task);
@ -348,7 +331,7 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
void WriteBufferFromS3::processPutRequest(PutObjectTask & task)
{
auto outcome = client_ptr->PutObject(task.req);
bool with_pool = thread_pool != nullptr;
bool with_pool = bool(schedule);
if (outcome.IsSuccess())
LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool);
@ -358,7 +341,7 @@ void WriteBufferFromS3::processPutRequest(PutObjectTask & task)
void WriteBufferFromS3::waitForReadyBackGroundTasks()
{
if (thread_pool)
if (schedule)
{
std::lock_guard lock(bg_tasks_mutex);
{
@ -383,7 +366,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks()
void WriteBufferFromS3::waitForAllBackGroundTasks()
{
if (thread_pool)
if (schedule)
{
std::unique_lock lock(bg_tasks_mutex);
bg_tasks_condvar.wait(lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; });

View File

@ -30,6 +30,8 @@ namespace Aws::S3::Model
namespace DB
{
using ScheduleFunc = std::function<void(std::function<void()>)>;
/**
* Buffer to write a data to a S3 object with specified bucket and key.
* If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload.
@ -48,7 +50,7 @@ public:
size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ThreadPool * thread_pool_ = nullptr);
ScheduleFunc && schedule_ = {});
~WriteBufferFromS3() override;
@ -97,7 +99,8 @@ private:
bool is_prefinalized = false;
/// Following fields are for background uploads in thread pool (if specified).
ThreadPool * thread_pool;
/// We use std::function to avoid dependency of Interpreters
ScheduleFunc schedule;
std::unique_ptr<PutObjectTask> put_object_task;
std::list<UploadPartTask> upload_object_tasks;
size_t num_added_bg_tasks = 0;