Async version

This commit is contained in:
alesapin 2021-03-05 13:40:24 +03:00
parent c9c301e10c
commit 15ea9a9c0c
10 changed files with 123 additions and 33 deletions

View File

@ -22,10 +22,11 @@ NuKeeperServer::NuKeeperServer(
int server_id_,
const CoordinationSettingsPtr & coordination_settings_,
const Poco::Util::AbstractConfiguration & config,
ResponsesQueue & responses_queue_)
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_)
: server_id(server_id_)
, coordination_settings(coordination_settings_)
, state_machine(nuraft::cs_new<NuKeeperStateMachine>(responses_queue_, config.getString("test_keeper_server.snapshot_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/snapshots"), coordination_settings))
, state_machine(nuraft::cs_new<NuKeeperStateMachine>(responses_queue_, snapshots_queue_, config.getString("test_keeper_server.snapshot_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/snapshots"), coordination_settings))
, state_manager(nuraft::cs_new<NuKeeperStateManager>(server_id, "test_keeper_server", config, coordination_settings))
, responses_queue(responses_queue_)
{

View File

@ -41,7 +41,8 @@ public:
int server_id_,
const CoordinationSettingsPtr & coordination_settings_,
const Poco::Util::AbstractConfiguration & config,
ResponsesQueue & responses_queue_);
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_);
void startup();

View File

@ -238,7 +238,6 @@ NuKeeperStorageSnapshot::NuKeeperStorageSnapshot(NuKeeperStorage * storage_, con
NuKeeperStorageSnapshot::~NuKeeperStorageSnapshot()
{
storage->clearGarbageAfterSnapshot();
storage->disableSnapshotMode();
}

View File

@ -37,6 +37,9 @@ public:
SessionAndTimeout session_and_timeout;
};
using NuKeeperStorageSnapshotPtr = std::shared_ptr<NuKeeperStorageSnapshot>;
using CreateSnapshotCallback = std::function<void(const NuKeeperStorageSnapshotPtr &)>;
class NuKeeperSnapshotManager
{
public:
@ -73,4 +76,10 @@ private:
std::map<size_t, std::string> existing_snapshots;
};
struct CreateSnapshotTask
{
NuKeeperStorageSnapshotPtr snapshot;
CreateSnapshotCallback create_snapshot;
};
}

View File

@ -34,11 +34,12 @@ NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
return request_for_session;
}
NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_)
NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_)
: coordination_settings(coordination_settings_)
, storage(coordination_settings->dead_session_check_period_ms.totalMilliseconds())
, snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep)
, responses_queue(responses_queue_)
, snapshots_queue(snapshots_queue_)
, last_committed_idx(0)
, log(&Poco::Logger::get("NuKeeperStateMachine"))
{
@ -46,6 +47,7 @@ NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, co
void NuKeeperStateMachine::init()
{
/// Do everything without mutexes, no other threads exist.
LOG_DEBUG(log, "Totally have {} snapshots", snapshot_manager.totalSnapshots());
bool loaded = false;
bool has_snapshots = snapshot_manager.totalSnapshots() != 0;
@ -119,19 +121,26 @@ nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, n
bool NuKeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{
LOG_DEBUG(log, "Applying snapshot {}", s.get_last_log_idx());
if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Required to apply snapshot with last log index {}, but our last log index is {}",
s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx());
nuraft::ptr<nuraft::buffer> latest_snapshot_ptr;
{
std::lock_guard lock(snapshots_lock);
if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Required to apply snapshot with last log index {}, but our last log index is {}",
s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx());
latest_snapshot_ptr = latest_snapshot_buf;
}
std::lock_guard lock(storage_lock);
snapshot_manager.deserializeSnapshotFromBuffer(&storage, latest_snapshot_buf);
{
std::lock_guard lock(storage_lock);
snapshot_manager.deserializeSnapshotFromBuffer(&storage, latest_snapshot_ptr);
}
last_committed_idx = s.get_last_log_idx();
return true;
}
nuraft::ptr<nuraft::snapshot> NuKeeperStateMachine::last_snapshot()
{
// Just return the latest snapshot.
/// Just return the latest snapshot.
std::lock_guard<std::mutex> lock(snapshots_lock);
return latest_snapshot_meta;
}
@ -141,17 +150,40 @@ void NuKeeperStateMachine::create_snapshot(
nuraft::async_result<bool>::handler_type & when_done)
{
LOG_DEBUG(log, "Creating snapshot {}", s.get_last_log_idx());
std::lock_guard lock(storage_lock);
NuKeeperStorageSnapshot snapshot(&storage, s.get_last_log_idx());
latest_snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(snapshot);
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
latest_snapshot_meta = nuraft::snapshot::deserialize(*snp_buf);
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*latest_snapshot_buf, s.get_last_log_idx());
LOG_DEBUG(log, "Created snapshot {} with path {}", s.get_last_log_idx(), result_path);
nuraft::ptr<std::exception> exception(nullptr);
bool ret = true;
when_done(ret, exception);
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf);
CreateSnapshotTask snapshot_task;
{
std::lock_guard lock(storage_lock);
snapshot_task.snapshot = std::make_shared<NuKeeperStorageSnapshot>(&storage, snapshot_meta_copy);
}
snapshot_task.create_snapshot = [this, when_done] (const NuKeeperStorageSnapshotPtr & snapshot)
{
auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot);
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
{
std::lock_guard lock(snapshots_lock);
latest_snapshot_buf = snapshot_buf;
latest_snapshot_meta = snapshot->snapshot_meta;
}
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), result_path);
{
/// Must do it with lock (clearing elements from list)
std::lock_guard lock(storage_lock);
storage.clearGarbageAfterSnapshot();
LOG_TRACE(log, "Cleared garbage after snapshot");
}
nuraft::ptr<std::exception> exception(nullptr);
bool ret = true;
when_done(ret, exception);
};
LOG_DEBUG(log, "In memory snapshot {} created, queueing task to flash to disk", s.get_last_log_idx());
snapshots_queue.push(std::move(snapshot_task));
}
void NuKeeperStateMachine::save_logical_snp_obj(
@ -163,22 +195,30 @@ void NuKeeperStateMachine::save_logical_snp_obj(
{
LOG_DEBUG(log, "Saving snapshot {} obj_id {}", s.get_last_log_idx(), obj_id);
// Object ID == 0: it contains dummy value, create snapshot context.
nuraft::ptr<nuraft::buffer> cloned_buffer;
nuraft::ptr<nuraft::snapshot> cloned_meta;
if (obj_id == 0)
{
std::lock_guard lock(storage_lock);
NuKeeperStorageSnapshot snapshot(&storage, s.get_last_log_idx());
latest_snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(snapshot);
cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot);
}
else
{
latest_snapshot_buf = nuraft::buffer::clone(data);
cloned_buffer = nuraft::buffer::clone(data);
}
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
latest_snapshot_meta = nuraft::snapshot::deserialize(*snp_buf);
cloned_meta = nuraft::snapshot::deserialize(*snp_buf);
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, s.get_last_log_idx());
{
std::lock_guard lock(snapshots_lock);
latest_snapshot_buf = cloned_buffer;
latest_snapshot_meta = cloned_meta;
}
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*latest_snapshot_buf, s.get_last_log_idx());
LOG_DEBUG(log, "Created snapshot {} with path {}", s.get_last_log_idx(), result_path);
obj_id++;
@ -202,6 +242,7 @@ int NuKeeperStateMachine::read_logical_snp_obj(
}
else
{
std::lock_guard lock(snapshots_lock);
if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Required to apply snapshot with last log index {}, but our last log index is {}",
s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx());

View File

@ -11,11 +11,12 @@ namespace DB
{
using ResponsesQueue = ThreadSafeQueue<NuKeeperStorage::ResponseForSession>;
using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>;
class NuKeeperStateMachine : public nuraft::state_machine
{
public:
NuKeeperStateMachine(ResponsesQueue & responses_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_);
NuKeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_);
void init();
@ -72,6 +73,8 @@ private:
NuKeeperSnapshotManager snapshot_manager;
ResponsesQueue & responses_queue;
SnapshotsQueue & snapshots_queue;
/// Mutex for snapshots
std::mutex snapshots_lock;

View File

@ -69,6 +69,28 @@ void NuKeeperStorageDispatcher::responseThread()
}
}
void NuKeeperStorageDispatcher::snapshotThread()
{
setThreadName("NuKeeperSnpT");
while (!shutdown_called)
{
CreateSnapshotTask task;
snapshots_queue.pop(task);
if (shutdown_called)
break;
try
{
task.create_snapshot(task.snapshot);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
void NuKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(session_to_response_callback_mutex);
@ -110,7 +132,7 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati
coordination_settings->loadFromConfig("test_keeper_server.coordination_settings", config);
server = std::make_unique<NuKeeperServer>(myid, coordination_settings, config, responses_queue);
server = std::make_unique<NuKeeperServer>(myid, coordination_settings, config, responses_queue, snapshots_queue);
try
{
LOG_DEBUG(log, "Waiting server to initialize");
@ -129,6 +151,7 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
LOG_DEBUG(log, "Dispatcher initialized");
}
@ -154,6 +177,9 @@ void NuKeeperStorageDispatcher::shutdown()
if (responses_thread.joinable())
responses_thread.join();
if (snapshot_thread.joinable())
snapshot_thread.join();
}
if (server)

View File

@ -30,18 +30,21 @@ private:
CoordinationSettingsPtr coordination_settings;
using RequestsQueue = ConcurrentBoundedQueue<NuKeeperStorage::RequestForSession>;
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
RequestsQueue requests_queue{1};
ResponsesQueue responses_queue;
SnapshotsQueue snapshots_queue{1};
std::atomic<bool> shutdown_called{false};
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
std::mutex session_to_response_callback_mutex;
SessionToResponseCallback session_to_response_callback;
ThreadFromGlobalPool request_thread;
ThreadFromGlobalPool responses_thread;
ThreadFromGlobalPool session_cleaner_thread;
ThreadFromGlobalPool snapshot_thread;
std::unique_ptr<NuKeeperServer> server;
@ -51,6 +54,7 @@ private:
void requestThread();
void responseThread();
void sessionCleanerTask();
void snapshotThread();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
public:

View File

@ -26,7 +26,7 @@ private:
List list;
IndexMap map;
bool snapshot_mode{false};
std::atomic<bool> snapshot_mode{false};
public:

View File

@ -1030,6 +1030,7 @@ TEST(CoordinationTest, TestStorageSnapshotMode)
}
EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin"));
EXPECT_EQ(storage.container.size(), 26);
storage.clearGarbageAfterSnapshot();
EXPECT_EQ(storage.container.snapshotSize(), 26);
for (size_t i = 0; i < 50; ++i)
{
@ -1097,7 +1098,8 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, size
ChangelogDirTest logs("./logs");
ResponsesQueue queue;
auto state_machine = std::make_shared<NuKeeperStateMachine>(queue, "./snapshots", settings);
SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<NuKeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings);
state_machine->init();
DB::NuKeeperLogStore changelog("./logs", settings->rotate_log_storage_interval, true);
changelog.init(state_machine->last_commit_index() + 1, settings->reserved_log_items);
@ -1120,6 +1122,9 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, size
};
state_machine->create_snapshot(s, when_done);
CreateSnapshotTask snapshot_task;
snapshots_queue.pop(snapshot_task);
snapshot_task.create_snapshot(snapshot_task.snapshot);
}
if (snapshot_created)
{
@ -1130,7 +1135,8 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, size
}
}
auto restore_machine = std::make_shared<NuKeeperStateMachine>(queue, "./snapshots", settings);
SnapshotsQueue snapshots_queue1{1};
auto restore_machine = std::make_shared<NuKeeperStateMachine>(queue, snapshots_queue1, "./snapshots", settings);
restore_machine->init();
EXPECT_EQ(restore_machine->last_commit_index(), total_logs - total_logs % settings->snapshot_distance);