diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 6207ba53bd8..2510b86716c 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -431,7 +431,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks() { if (schedule) { - std::lock_guard lock(bg_tasks_mutex); + std::unique_lock lock(bg_tasks_mutex); { while (!upload_object_tasks.empty() && upload_object_tasks.front().is_finised) { @@ -442,7 +442,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks() if (exception) { - waitForAllBackGroundTasks(); + waitForAllBackGroundTasksUnlocked(lock); std::rethrow_exception(exception); } @@ -457,7 +457,15 @@ void WriteBufferFromS3::waitForAllBackGroundTasks() if (schedule) { std::unique_lock lock(bg_tasks_mutex); - bg_tasks_condvar.wait(lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; }); + waitForAllBackGroundTasksUnlocked(lock); + } +} + +void WriteBufferFromS3::waitForAllBackGroundTasksUnlocked(std::unique_lock & bg_tasks_lock) +{ + if (schedule) + { + bg_tasks_condvar.wait(bg_tasks_lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; }); while (!upload_object_tasks.empty()) { @@ -472,7 +480,7 @@ void WriteBufferFromS3::waitForAllBackGroundTasks() if (put_object_task) { - bg_tasks_condvar.wait(lock, [this]() { return put_object_task->is_finised; }); + bg_tasks_condvar.wait(bg_tasks_lock, [this]() { return put_object_task->is_finised; }); if (put_object_task->exception) std::rethrow_exception(put_object_task->exception); } diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index 99440654910..712044841d0 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -84,6 +84,7 @@ private: void waitForReadyBackGroundTasks(); void waitForAllBackGroundTasks(); + void waitForAllBackGroundTasksUnlocked(std::unique_lock & bg_tasks_lock); String bucket; String key;