From 79627798c429bf8068ae3e89a16792d5708cf415 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 4 Apr 2022 20:44:39 +0200 Subject: [PATCH] Fix race --- src/IO/WriteBufferFromS3.cpp | 53 +++++++++++++++++++++++++++++++----- src/IO/WriteBufferFromS3.h | 2 +- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 20d9a054230..86f4366ec8d 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -52,6 +52,7 @@ struct WriteBufferFromS3::PutObjectTask Aws::S3::Model::PutObjectRequest req; bool is_finised = false; std::exception_ptr exception; + std::optional cache_files; }; WriteBufferFromS3::WriteBufferFromS3( @@ -279,6 +280,13 @@ void WriteBufferFromS3::writePart() } fillUploadRequest(task->req, part_number); + + if (file_segments_holder) + { + task->cache_files.emplace(std::move(*file_segments_holder)); + file_segments_holder.reset(); + } + schedule([this, task]() { try @@ -290,6 +298,15 @@ void WriteBufferFromS3::writePart() task->exception = std::current_exception(); } + try + { + finalizeCacheIfNeeded(task->cache_files); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + { std::lock_guard lock(bg_tasks_mutex); task->is_finised = true; @@ -300,17 +317,20 @@ void WriteBufferFromS3::writePart() /// Releasing lock and condvar notification. bg_tasks_condvar.notify_one(); } - - finalizeCacheIfNeeded(); }, query_context); } else { UploadPartTask task; fillUploadRequest(task.req, part_tags.size() + 1); + if (file_segments_holder) + { + task.cache_files.emplace(std::move(*file_segments_holder)); + file_segments_holder.reset(); + } processUploadRequest(task); part_tags.push_back(task.tag); - finalizeCacheIfNeeded(); + finalizeCacheIfNeeded(task.cache_files); } } @@ -397,7 +417,14 @@ void WriteBufferFromS3::makeSinglepartUpload() if (schedule) { put_object_task = std::make_unique(); + fillPutRequest(put_object_task->req); + if (file_segments_holder) + { + put_object_task->cache_files.emplace(std::move(*file_segments_holder)); + file_segments_holder.reset(); + } + schedule([this]() { try @@ -409,6 +436,15 @@ void WriteBufferFromS3::makeSinglepartUpload() put_object_task->exception = std::current_exception(); } + try + { + finalizeCacheIfNeeded(put_object_task->cache_files); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + { std::lock_guard lock(bg_tasks_mutex); put_object_task->is_finised = true; @@ -418,16 +454,19 @@ void WriteBufferFromS3::makeSinglepartUpload() /// Releasing lock and condvar notification. bg_tasks_condvar.notify_one(); } - - finalizeCacheIfNeeded(); }, query_context); } else { PutObjectTask task; fillPutRequest(task.req); + if (file_segments_holder) + { + task.cache_files.emplace(std::move(*file_segments_holder)); + file_segments_holder.reset(); + } processPutRequest(task); - finalizeCacheIfNeeded(); + finalizeCacheIfNeeded(task.cache_files); } } @@ -455,7 +494,7 @@ void WriteBufferFromS3::processPutRequest(PutObjectTask & task) throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); } -void WriteBufferFromS3::finalizeCacheIfNeeded() +void WriteBufferFromS3::finalizeCacheIfNeeded(std::optional & file_segments_holder) { if (!file_segments_holder) return; diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index 1987bbe76a5..d1e51b0c7f9 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -127,7 +127,7 @@ private: FileCachePtr cache; size_t current_download_offset = 0; std::optional file_segments_holder; - void finalizeCacheIfNeeded(); + static void finalizeCacheIfNeeded(std::optional &); ContextMutablePtr shared_query_context; ContextPtr query_context; };