Fix deadlock in Changelog flush

This commit is contained in:
Antonio Andelic 2023-01-05 14:51:05 +00:00
parent 0a60fe6e60
commit 9b3314a931
2 changed files with 33 additions and 6 deletions

View File

@ -281,6 +281,7 @@ Changelog::Changelog(
, log(log_)
, compress_logs(compress_logs_)
, write_operations(std::numeric_limits<size_t>::max())
, append_completion_queue(std::numeric_limits<size_t>::max())
{
/// Load all files in changelog directory
namespace fs = std::filesystem;
@ -302,6 +303,8 @@ Changelog::Changelog(
clean_log_thread = ThreadFromGlobalPool([this] { cleanLogThread(); });
write_thread = ThreadFromGlobalPool([this] { writeThread(); });
append_completion_thread = ThreadFromGlobalPool([this] { appendCompletionThread(); });
}
void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep)
@ -546,6 +549,18 @@ ChangelogRecord Changelog::buildRecord(uint64_t index, const LogEntryPtr & log_e
return record;
}
void Changelog::appendCompletionThread()
{
uint64_t flushed_index = 0;
while (append_completion_queue.pop(flushed_index))
{
// we shouldn't start the raft_server before sending it here
if (auto raft_server_locked = raft_server.lock())
raft_server_locked->notify_log_append_completion(true);
else
LOG_WARNING(log, "Raft server is not set in LogStore.");
}
}
void Changelog::writeThread()
{
@ -584,11 +599,12 @@ void Changelog::writeThread()
durable_idx_cv.notify_all();
// we shouldn't start the raft_server before sending it here
if (auto raft_server_locked = raft_server.lock())
raft_server_locked->notify_log_append_completion(true);
else
LOG_WARNING(log, "Raft server is not set in LogStore.");
// we need to call completion callback in another thread because it takes a global lock for the NuRaft server
// NuRaft will in some places wait for flush to be done while having the same global lock leading to deadlock
// -> future write operations are blocked by flush that cannot be completed because it cannot take NuRaft lock
// -> NuRaft won't leave lock until its flush is done
if (!append_completion_queue.push(flush.index))
LOG_WARNING(log, "Changelog is shut down");
}
}
}
@ -605,7 +621,7 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry)
logs[index] = log_entry;
max_log_id = index;
if (!write_operations.tryPush(AppendLog{index, log_entry}))
if (!write_operations.push(AppendLog{index, log_entry}))
LOG_WARNING(log, "Changelog is shut down");
}
@ -829,6 +845,12 @@ void Changelog::shutdown()
if (write_thread.joinable())
write_thread.join();
if (!append_completion_queue.isFinished())
append_completion_queue.finish();
if (append_completion_thread.joinable())
append_completion_thread.join();
}
Changelog::~Changelog()

View File

@ -206,6 +206,11 @@ private:
ThreadFromGlobalPool write_thread;
ConcurrentBoundedQueue<WriteOperation> write_operations;
void appendCompletionThread();
ThreadFromGlobalPool append_completion_thread;
ConcurrentBoundedQueue<uint64_t> append_completion_queue;
// last_durable_index needs to be exposed through const getter so we make mutex mutable
mutable std::mutex durable_idx_mutex;
std::condition_variable durable_idx_cv;