Merge pull request #40070 from kitaisreal/write-buffer-from-s3-potential-deadlock-fix

WriteBufferFromS3 potential deadlock fix
This commit is contained in:
Nikolai Kochetov 2022-08-17 12:14:17 +02:00 committed by GitHub
commit 335a5e3d0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -256,6 +256,7 @@ void WriteBufferFromS3::writePart()
if (schedule) if (schedule)
{ {
UploadPartTask * task = nullptr; UploadPartTask * task = nullptr;
int part_number; int part_number;
{ {
std::lock_guard lock(bg_tasks_mutex); std::lock_guard lock(bg_tasks_mutex);
@ -264,45 +265,57 @@ void WriteBufferFromS3::writePart()
part_number = num_added_bg_tasks; part_number = num_added_bg_tasks;
} }
fillUploadRequest(task->req, part_number); /// Notify waiting thread when task finished
auto task_finish_notify = [&, task]()
if (file_segments_holder)
{ {
task->cache_files.emplace(std::move(*file_segments_holder)); std::lock_guard lock(bg_tasks_mutex);
file_segments_holder.reset(); task->is_finised = true;
++num_finished_bg_tasks;
/// Notification under mutex is important here.
/// Otherwise, WriteBuffer could be destroyed in between
/// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one();
};
try
{
fillUploadRequest(task->req, part_number);
if (file_segments_holder)
{
task->cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
schedule([this, task, task_finish_notify]()
{
try
{
processUploadRequest(*task);
}
catch (...)
{
task->exception = std::current_exception();
}
try
{
finalizeCacheIfNeeded(task->cache_files);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
task_finish_notify();
});
} }
catch (...)
schedule([this, task]()
{ {
try task_finish_notify();
{ throw;
processUploadRequest(*task); }
}
catch (...)
{
task->exception = std::current_exception();
}
try
{
finalizeCacheIfNeeded(task->cache_files);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
{
std::lock_guard lock(bg_tasks_mutex);
task->is_finised = true;
++num_finished_bg_tasks;
/// Notification under mutex is important here.
/// Otherwise, WriteBuffer could be destroyed in between
/// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one();
}
});
} }
else else
{ {
@ -397,43 +410,56 @@ void WriteBufferFromS3::makeSinglepartUpload()
{ {
put_object_task = std::make_unique<PutObjectTask>(); put_object_task = std::make_unique<PutObjectTask>();
fillPutRequest(put_object_task->req); /// Notify waiting thread when put object task finished
if (file_segments_holder) auto task_notify_finish = [&]()
{ {
put_object_task->cache_files.emplace(std::move(*file_segments_holder)); std::lock_guard lock(bg_tasks_mutex);
file_segments_holder.reset(); put_object_task->is_finised = true;
/// Notification under mutex is important here.
/// Othervies, WriteBuffer could be destroyed in between
/// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one();
};
try
{
fillPutRequest(put_object_task->req);
if (file_segments_holder)
{
put_object_task->cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
schedule([this, task_notify_finish]()
{
try
{
processPutRequest(*put_object_task);
}
catch (...)
{
put_object_task->exception = std::current_exception();
}
try
{
finalizeCacheIfNeeded(put_object_task->cache_files);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
task_notify_finish();
});
} }
catch (...)
schedule([this]()
{ {
try task_notify_finish();
{ throw;
processPutRequest(*put_object_task); }
}
catch (...)
{
put_object_task->exception = std::current_exception();
}
try
{
finalizeCacheIfNeeded(put_object_task->cache_files);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
{
std::lock_guard lock(bg_tasks_mutex);
put_object_task->is_finised = true;
/// Notification under mutex is important here.
/// Othervies, WriteBuffer could be destroyed in between
/// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one();
}
});
} }
else else
{ {