diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index 3d760792c32..49eb644719d 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -549,13 +549,9 @@ void Changelog::writeThread() // we don't protect current_writer because rotate at the same time can be called from compact only // when the node is applying snapshot from leader // in that case, no entry write should happen, i.e. this thread will be inactive - size_t last_appended = 0; - while (true) + WriteOperation write_operation; + while (write_operations.pop(write_operation)) { - WriteOperation write_operation; - if (!write_operations.pop(write_operation)) - break; - std::visit([&, this](const WriteOperationType & operation) -> void { if constexpr (std::same_as) @@ -567,15 +563,18 @@ void Changelog::writeThread() rotate(operation.index); current_writer->appendRecord(buildRecord(operation.index, operation.log_entry)); - - last_appended = operation.index; } else { if (current_writer) current_writer->flush(force_sync); - last_durable_idx = last_appended; + { + std::lock_guard lock{durable_idx_mutex}; + last_durable_idx = operation.index; + } + + durable_idx_cv.notify_all(); // we shouldn't start the raft_server before sending it here assert(raft_server); @@ -597,8 +596,8 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry) logs[index] = log_entry; max_log_id = index; - if (!write_operations.push(AppendLog{index, log_entry})) - std::abort(); + if (!write_operations.tryPush(AppendLog{index, log_entry})) + LOG_WARNING(log, "Changelog is shut down"); } void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry) @@ -789,14 +788,19 @@ void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer) void Changelog::flush() { - flushAsync(); - last_durable_idx.wait(max_log_id); + if (flushAsync()) + { + std::unique_lock lock{durable_idx_mutex}; + durable_idx_cv.wait(lock, [&] { return last_durable_idx == max_log_id; }); + } } -void Changelog::flushAsync() +bool Changelog::flushAsync() { - if (!write_operations.push(Flush{})) - std::abort(); + bool pushed = write_operations.push(Flush{max_log_id}); + if (!pushed) + LOG_WARNING(log, "Changelog is shut down"); + return pushed; } void Changelog::shutdown() @@ -806,6 +810,12 @@ void Changelog::shutdown() if (clean_log_thread.joinable()) clean_log_thread.join(); + + if (!write_operations.isFinished()) + write_operations.finish(); + + if (write_thread.joinable()) + write_thread.join(); } Changelog::~Changelog() diff --git a/src/Coordination/Changelog.h b/src/Coordination/Changelog.h index 9f693382c75..01ce7cdd145 100644 --- a/src/Coordination/Changelog.h +++ b/src/Coordination/Changelog.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -122,7 +123,7 @@ public: /// Fsync latest log to disk and flush buffer void flush(); - void flushAsync(); + bool flushAsync(); void shutdown(); @@ -133,6 +134,7 @@ public: uint64_t lastDurableIndex() const { + std::lock_guard lock{durable_idx_mutex}; return last_durable_idx; } @@ -192,7 +194,9 @@ private: }; struct Flush - {}; + { + uint64_t index; + }; using WriteOperation = std::variant; @@ -201,7 +205,11 @@ private: ThreadFromGlobalPool write_thread; ConcurrentBoundedQueue write_operations; - std::atomic last_durable_idx{0}; + + mutable std::mutex durable_idx_mutex; + std::condition_variable durable_idx_cv; + uint64_t last_durable_idx{0}; + nuraft::ptr raft_server{nullptr}; }; diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 6151aa47ef7..9ca3f219928 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -449,8 +449,8 @@ void KeeperServer::shutdownRaftServer() void KeeperServer::shutdown() { - state_manager->flushAndShutDownLogStore(); shutdownRaftServer(); + state_manager->flushAndShutDownLogStore(); state_machine->shutdownStorage(); }