Fix shutdown

This commit is contained in:
Antonio Andelic 2022-11-21 15:01:18 +00:00
parent c9472d67b6
commit aeb0d39b50
3 changed files with 38 additions and 20 deletions

View File

@ -549,13 +549,9 @@ void Changelog::writeThread()
// we don't protect current_writer because rotate at the same time can be called from compact only // we don't protect current_writer because rotate at the same time can be called from compact only
// when the node is applying snapshot from leader // when the node is applying snapshot from leader
// in that case, no entry write should happen, i.e. this thread will be inactive // in that case, no entry write should happen, i.e. this thread will be inactive
size_t last_appended = 0;
while (true)
{
WriteOperation write_operation; WriteOperation write_operation;
if (!write_operations.pop(write_operation)) while (write_operations.pop(write_operation))
break; {
std::visit([&, this]<typename WriteOperationType>(const WriteOperationType & operation) -> void std::visit([&, this]<typename WriteOperationType>(const WriteOperationType & operation) -> void
{ {
if constexpr (std::same_as<WriteOperationType, AppendLog>) if constexpr (std::same_as<WriteOperationType, AppendLog>)
@ -567,15 +563,18 @@ void Changelog::writeThread()
rotate(operation.index); rotate(operation.index);
current_writer->appendRecord(buildRecord(operation.index, operation.log_entry)); current_writer->appendRecord(buildRecord(operation.index, operation.log_entry));
last_appended = operation.index;
} }
else else
{ {
if (current_writer) if (current_writer)
current_writer->flush(force_sync); current_writer->flush(force_sync);
last_durable_idx = last_appended; {
std::lock_guard lock{durable_idx_mutex};
last_durable_idx = operation.index;
}
durable_idx_cv.notify_all();
// we shouldn't start the raft_server before sending it here // we shouldn't start the raft_server before sending it here
assert(raft_server); assert(raft_server);
@ -597,8 +596,8 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry)
logs[index] = log_entry; logs[index] = log_entry;
max_log_id = index; max_log_id = index;
if (!write_operations.push(AppendLog{index, log_entry})) if (!write_operations.tryPush(AppendLog{index, log_entry}))
std::abort(); LOG_WARNING(log, "Changelog is shut down");
} }
void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry) void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
@ -789,14 +788,19 @@ void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer)
void Changelog::flush() void Changelog::flush()
{ {
flushAsync(); if (flushAsync())
last_durable_idx.wait(max_log_id); {
std::unique_lock lock{durable_idx_mutex};
durable_idx_cv.wait(lock, [&] { return last_durable_idx == max_log_id; });
}
} }
void Changelog::flushAsync() bool Changelog::flushAsync()
{ {
if (!write_operations.push(Flush{})) bool pushed = write_operations.push(Flush{max_log_id});
std::abort(); if (!pushed)
LOG_WARNING(log, "Changelog is shut down");
return pushed;
} }
void Changelog::shutdown() void Changelog::shutdown()
@ -806,6 +810,12 @@ void Changelog::shutdown()
if (clean_log_thread.joinable()) if (clean_log_thread.joinable())
clean_log_thread.join(); clean_log_thread.join();
if (!write_operations.isFinished())
write_operations.finish();
if (write_thread.joinable())
write_thread.join();
} }
Changelog::~Changelog() Changelog::~Changelog()

View File

@ -4,6 +4,7 @@
#include <libnuraft/raft_server.hxx> #include <libnuraft/raft_server.hxx>
#include <city.h> #include <city.h>
#include <optional> #include <optional>
#include <base/defines.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/HashingWriteBuffer.h> #include <IO/HashingWriteBuffer.h>
#include <IO/CompressionMethod.h> #include <IO/CompressionMethod.h>
@ -122,7 +123,7 @@ public:
/// Fsync latest log to disk and flush buffer /// Fsync latest log to disk and flush buffer
void flush(); void flush();
void flushAsync(); bool flushAsync();
void shutdown(); void shutdown();
@ -133,6 +134,7 @@ public:
uint64_t lastDurableIndex() const uint64_t lastDurableIndex() const
{ {
std::lock_guard lock{durable_idx_mutex};
return last_durable_idx; return last_durable_idx;
} }
@ -192,7 +194,9 @@ private:
}; };
struct Flush struct Flush
{}; {
uint64_t index;
};
using WriteOperation = std::variant<AppendLog, Flush>; using WriteOperation = std::variant<AppendLog, Flush>;
@ -201,7 +205,11 @@ private:
ThreadFromGlobalPool write_thread; ThreadFromGlobalPool write_thread;
ConcurrentBoundedQueue<WriteOperation> write_operations; ConcurrentBoundedQueue<WriteOperation> write_operations;
std::atomic<uint64_t> last_durable_idx{0};
mutable std::mutex durable_idx_mutex;
std::condition_variable durable_idx_cv;
uint64_t last_durable_idx{0};
nuraft::ptr<nuraft::raft_server> raft_server{nullptr}; nuraft::ptr<nuraft::raft_server> raft_server{nullptr};
}; };

View File

@ -449,8 +449,8 @@ void KeeperServer::shutdownRaftServer()
void KeeperServer::shutdown() void KeeperServer::shutdown()
{ {
state_manager->flushAndShutDownLogStore();
shutdownRaftServer(); shutdownRaftServer();
state_manager->flushAndShutDownLogStore();
state_machine->shutdownStorage(); state_machine->shutdownStorage();
} }