This commit is contained in:
kssenii 2022-09-02 15:32:46 +02:00
parent 963c0111bf
commit c9b512e33e
2 changed files with 13 additions and 4 deletions

View File

@ -431,7 +431,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks()
{
if (schedule)
{
std::lock_guard lock(bg_tasks_mutex);
std::unique_lock lock(bg_tasks_mutex);
{
while (!upload_object_tasks.empty() && upload_object_tasks.front().is_finised)
{
@ -442,7 +442,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks()
if (exception)
{
waitForAllBackGroundTasks();
waitForAllBackGroundTasksUnlocked(lock);
std::rethrow_exception(exception);
}
@ -457,7 +457,15 @@ void WriteBufferFromS3::waitForAllBackGroundTasks()
if (schedule)
{
std::unique_lock lock(bg_tasks_mutex);
bg_tasks_condvar.wait(lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; });
waitForAllBackGroundTasksUnlocked(lock);
}
}
void WriteBufferFromS3::waitForAllBackGroundTasksUnlocked(std::unique_lock<std::mutex> & bg_tasks_lock)
{
if (schedule)
{
bg_tasks_condvar.wait(bg_tasks_lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; });
while (!upload_object_tasks.empty())
{
@ -472,7 +480,7 @@ void WriteBufferFromS3::waitForAllBackGroundTasks()
if (put_object_task)
{
bg_tasks_condvar.wait(lock, [this]() { return put_object_task->is_finised; });
bg_tasks_condvar.wait(bg_tasks_lock, [this]() { return put_object_task->is_finised; });
if (put_object_task->exception)
std::rethrow_exception(put_object_task->exception);
}

View File

@ -84,6 +84,7 @@ private:
void waitForReadyBackGroundTasks();
void waitForAllBackGroundTasks();
void waitForAllBackGroundTasksUnlocked(std::unique_lock<std::mutex> & bg_tasks_lock);
String bucket;
String key;