diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 7c39148c38c..f6c25e7439f 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -9,6 +9,7 @@ #include +#include #include #include @@ -262,7 +263,20 @@ std::unique_ptr DiskS3::writeFile(const String & path, LOG_TRACE(log, "{} to file by path: {}. S3 path: {}", mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + s3_path); - ThreadPool * writer = &getThreadPoolWriter(); + ScheduleFunc schedule = [](auto && callback) + { + getThreadPoolWriter().scheduleOrThrow([&callback, thread_group = CurrentThread::getGroup()]() + { + if (thread_group) + CurrentThread::attachTo(thread_group); + + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachQueryIfNotDetached(); + ); + callback(); + }); + }; auto s3_buffer = std::make_unique( settings->client, @@ -272,7 +286,7 @@ std::unique_ptr DiskS3::writeFile(const String & path, settings->s3_max_single_part_upload_size, std::move(object_metadata), buf_size, - writer); + std::move(schedule)); return std::make_unique>(std::move(s3_buffer), std::move(metadata), s3_path); } diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 6cb44fb5e52..66eedee9bd3 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -11,7 +11,6 @@ # include # include # include -# include # include @@ -58,7 +57,7 @@ WriteBufferFromS3::WriteBufferFromS3( size_t max_single_part_upload_size_, std::optional> object_metadata_, size_t buffer_size_, - ThreadPool * thread_pool_) + ScheduleFunc && schedule_) : BufferWithOwnMemory(buffer_size_, nullptr, 0) , bucket(bucket_) , key(key_) @@ -66,7 +65,7 @@ WriteBufferFromS3::WriteBufferFromS3( , client_ptr(std::move(client_ptr_)) , minimum_upload_part_size(minimum_upload_part_size_) , max_single_part_upload_size(max_single_part_upload_size_) - , thread_pool(thread_pool_) + , schedule(std::move(schedule_)) { allocateBuffer(); } @@ -175,7 +174,7 @@ void WriteBufferFromS3::writePart() LOG_WARNING(log, "Maximum part number in S3 protocol has reached (too many parts). Server may not accept this whole upload."); } - if (thread_pool) + if (schedule) { UploadPartTask * task = nullptr; int part_number; @@ -187,16 +186,8 @@ void WriteBufferFromS3::writePart() } fillUploadRequest(task->req, part_number); - thread_pool->scheduleOrThrow([this, task, thread_group = CurrentThread::getGroup()]() + schedule([this, task]() { - if (thread_group) - CurrentThread::attachTo(thread_group); - - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); - try { processUploadRequest(*task); @@ -283,7 +274,7 @@ void WriteBufferFromS3::completeMultipartUpload() void WriteBufferFromS3::makeSinglepartUpload() { auto size = temporary_buffer->tellp(); - bool with_pool = thread_pool != nullptr; + bool with_pool = bool(schedule); LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}, Size: {}, WithPool: {}", bucket, key, size, with_pool); @@ -296,20 +287,12 @@ void WriteBufferFromS3::makeSinglepartUpload() return; } - if (thread_pool) + if (schedule) { put_object_task = std::make_unique(); fillPutRequest(put_object_task->req); - thread_pool->scheduleOrThrow([this, thread_group = CurrentThread::getGroup()]() + schedule([this]() { - if (thread_group) - CurrentThread::attachTo(thread_group); - - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); - try { processPutRequest(*put_object_task); @@ -348,7 +331,7 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req) void WriteBufferFromS3::processPutRequest(PutObjectTask & task) { auto outcome = client_ptr->PutObject(task.req); - bool with_pool = thread_pool != nullptr; + bool with_pool = bool(schedule); if (outcome.IsSuccess()) LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool); @@ -358,7 +341,7 @@ void WriteBufferFromS3::processPutRequest(PutObjectTask & task) void WriteBufferFromS3::waitForReadyBackGroundTasks() { - if (thread_pool) + if (schedule) { std::lock_guard lock(bg_tasks_mutex); { @@ -383,7 +366,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks() void WriteBufferFromS3::waitForAllBackGroundTasks() { - if (thread_pool) + if (schedule) { std::unique_lock lock(bg_tasks_mutex); bg_tasks_condvar.wait(lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; }); diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index aaea718fe70..f322dd4b567 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -30,6 +30,8 @@ namespace Aws::S3::Model namespace DB { +using ScheduleFunc = std::function)>; + /** * Buffer to write a data to a S3 object with specified bucket and key. * If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload. @@ -48,7 +50,7 @@ public: size_t max_single_part_upload_size_, std::optional> object_metadata_ = std::nullopt, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, - ThreadPool * thread_pool_ = nullptr); + ScheduleFunc && schedule_ = {}); ~WriteBufferFromS3() override; @@ -97,7 +99,8 @@ private: bool is_prefinalized = false; /// Following fields are for background uploads in thread pool (if specified). - ThreadPool * thread_pool; + /// We use std::function to avoid dependency of Interpreters + ScheduleFunc schedule; std::unique_ptr put_object_task; std::list upload_object_tasks; size_t num_added_bg_tasks = 0;