Fix link order issue of WriteBufferFromS3

This commit is contained in:
Amos Bird 2022-02-01 16:19:26 +08:00
parent 321fa4a9e8
commit f22b09f4fc
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
3 changed files with 31 additions and 31 deletions

View File

@ -9,6 +9,7 @@
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
#include <base/scope_guard_safe.h>
#include <base/unit.h> #include <base/unit.h>
#include <base/FnTraits.h> #include <base/FnTraits.h>
@ -262,7 +263,20 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
LOG_TRACE(log, "{} to file by path: {}. S3 path: {}", LOG_TRACE(log, "{} to file by path: {}. S3 path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + s3_path); mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + s3_path);
ThreadPool * writer = &getThreadPoolWriter(); ScheduleFunc schedule = [](auto && callback)
{
getThreadPoolWriter().scheduleOrThrow([&callback, thread_group = CurrentThread::getGroup()]()
{
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
callback();
});
};
auto s3_buffer = std::make_unique<WriteBufferFromS3>( auto s3_buffer = std::make_unique<WriteBufferFromS3>(
settings->client, settings->client,
@ -272,7 +286,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
settings->s3_max_single_part_upload_size, settings->s3_max_single_part_upload_size,
std::move(object_metadata), std::move(object_metadata),
buf_size, buf_size,
writer); std::move(schedule));
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>>(std::move(s3_buffer), std::move(metadata), s3_path); return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>>(std::move(s3_buffer), std::move(metadata), s3_path);
} }

View File

@ -11,7 +11,6 @@
# include <aws/s3/model/PutObjectRequest.h> # include <aws/s3/model/PutObjectRequest.h>
# include <aws/s3/model/UploadPartRequest.h> # include <aws/s3/model/UploadPartRequest.h>
# include <base/logger_useful.h> # include <base/logger_useful.h>
# include <base/scope_guard_safe.h>
# include <utility> # include <utility>
@ -58,7 +57,7 @@ WriteBufferFromS3::WriteBufferFromS3(
size_t max_single_part_upload_size_, size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_, std::optional<std::map<String, String>> object_metadata_,
size_t buffer_size_, size_t buffer_size_,
ThreadPool * thread_pool_) ScheduleFunc && schedule_)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0) : BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, bucket(bucket_) , bucket(bucket_)
, key(key_) , key(key_)
@ -66,7 +65,7 @@ WriteBufferFromS3::WriteBufferFromS3(
, client_ptr(std::move(client_ptr_)) , client_ptr(std::move(client_ptr_))
, minimum_upload_part_size(minimum_upload_part_size_) , minimum_upload_part_size(minimum_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_) , max_single_part_upload_size(max_single_part_upload_size_)
, thread_pool(thread_pool_) , schedule(std::move(schedule_))
{ {
allocateBuffer(); allocateBuffer();
} }
@ -175,7 +174,7 @@ void WriteBufferFromS3::writePart()
LOG_WARNING(log, "Maximum part number in S3 protocol has reached (too many parts). Server may not accept this whole upload."); LOG_WARNING(log, "Maximum part number in S3 protocol has reached (too many parts). Server may not accept this whole upload.");
} }
if (thread_pool) if (schedule)
{ {
UploadPartTask * task = nullptr; UploadPartTask * task = nullptr;
int part_number; int part_number;
@ -187,16 +186,8 @@ void WriteBufferFromS3::writePart()
} }
fillUploadRequest(task->req, part_number); fillUploadRequest(task->req, part_number);
thread_pool->scheduleOrThrow([this, task, thread_group = CurrentThread::getGroup()]() schedule([this, task]()
{ {
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
try try
{ {
processUploadRequest(*task); processUploadRequest(*task);
@ -283,7 +274,7 @@ void WriteBufferFromS3::completeMultipartUpload()
void WriteBufferFromS3::makeSinglepartUpload() void WriteBufferFromS3::makeSinglepartUpload()
{ {
auto size = temporary_buffer->tellp(); auto size = temporary_buffer->tellp();
bool with_pool = thread_pool != nullptr; bool with_pool = bool(schedule);
LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}, Size: {}, WithPool: {}", bucket, key, size, with_pool); LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}, Size: {}, WithPool: {}", bucket, key, size, with_pool);
@ -296,20 +287,12 @@ void WriteBufferFromS3::makeSinglepartUpload()
return; return;
} }
if (thread_pool) if (schedule)
{ {
put_object_task = std::make_unique<PutObjectTask>(); put_object_task = std::make_unique<PutObjectTask>();
fillPutRequest(put_object_task->req); fillPutRequest(put_object_task->req);
thread_pool->scheduleOrThrow([this, thread_group = CurrentThread::getGroup()]() schedule([this]()
{ {
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
try try
{ {
processPutRequest(*put_object_task); processPutRequest(*put_object_task);
@ -348,7 +331,7 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
void WriteBufferFromS3::processPutRequest(PutObjectTask & task) void WriteBufferFromS3::processPutRequest(PutObjectTask & task)
{ {
auto outcome = client_ptr->PutObject(task.req); auto outcome = client_ptr->PutObject(task.req);
bool with_pool = thread_pool != nullptr; bool with_pool = bool(schedule);
if (outcome.IsSuccess()) if (outcome.IsSuccess())
LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool); LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool);
@ -358,7 +341,7 @@ void WriteBufferFromS3::processPutRequest(PutObjectTask & task)
void WriteBufferFromS3::waitForReadyBackGroundTasks() void WriteBufferFromS3::waitForReadyBackGroundTasks()
{ {
if (thread_pool) if (schedule)
{ {
std::lock_guard lock(bg_tasks_mutex); std::lock_guard lock(bg_tasks_mutex);
{ {
@ -383,7 +366,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks()
void WriteBufferFromS3::waitForAllBackGroundTasks() void WriteBufferFromS3::waitForAllBackGroundTasks()
{ {
if (thread_pool) if (schedule)
{ {
std::unique_lock lock(bg_tasks_mutex); std::unique_lock lock(bg_tasks_mutex);
bg_tasks_condvar.wait(lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; }); bg_tasks_condvar.wait(lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; });

View File

@ -30,6 +30,8 @@ namespace Aws::S3::Model
namespace DB namespace DB
{ {
using ScheduleFunc = std::function<void(std::function<void()>)>;
/** /**
* Buffer to write a data to a S3 object with specified bucket and key. * Buffer to write a data to a S3 object with specified bucket and key.
* If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload. * If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload.
@ -48,7 +50,7 @@ public:
size_t max_single_part_upload_size_, size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt, std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ThreadPool * thread_pool_ = nullptr); ScheduleFunc && schedule_ = {});
~WriteBufferFromS3() override; ~WriteBufferFromS3() override;
@ -97,7 +99,8 @@ private:
bool is_prefinalized = false; bool is_prefinalized = false;
/// Following fields are for background uploads in thread pool (if specified). /// Following fields are for background uploads in thread pool (if specified).
ThreadPool * thread_pool; /// We use std::function to avoid dependency of Interpreters
ScheduleFunc schedule;
std::unique_ptr<PutObjectTask> put_object_task; std::unique_ptr<PutObjectTask> put_object_task;
std::list<UploadPartTask> upload_object_tasks; std::list<UploadPartTask> upload_object_tasks;
size_t num_added_bg_tasks = 0; size_t num_added_bg_tasks = 0;