From 40a48c5670554a1dac98f69ea720b0cbecff3249 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 18 Feb 2022 18:32:04 +0000 Subject: [PATCH] Enable async writes to S3. --- src/Disks/S3/DiskS3.cpp | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 8aa4f8f0dff..b7e752edafd 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -262,21 +262,20 @@ std::unique_ptr DiskS3::writeFile(const String & path, LOG_TRACE(log, "{} to file by path: {}. S3 path: {}", mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + blob_name); - /// FIXME -- thread pool lead to obscure segfaults - /// ScheduleFunc schedule = [pool = &getThreadPoolWriter(), thread_group = CurrentThread::getGroup()](auto callback) - /// { - /// pool->scheduleOrThrow([callback = std::move(callback), thread_group]() - /// { - /// if (thread_group) - /// CurrentThread::attachTo(thread_group); + ScheduleFunc schedule = [pool = &getThreadPoolWriter(), thread_group = CurrentThread::getGroup()](auto callback) + { + pool->scheduleOrThrow([callback = std::move(callback), thread_group]() + { + if (thread_group) + CurrentThread::attachTo(thread_group); - /// SCOPE_EXIT_SAFE( - /// if (thread_group) - /// CurrentThread::detachQueryIfNotDetached(); - /// ); - /// callback(); - /// }); - /// }; + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachQueryIfNotDetached(); + ); + callback(); + }); + }; auto s3_buffer = std::make_unique( settings->client,