Add some missed files

This commit is contained in:
alesapin 2021-01-14 19:20:33 +03:00
parent 294e8f095d
commit a2070bf130
4 changed files with 351 additions and 20 deletions

View File

@ -0,0 +1,40 @@
#pragma once
#include <libnuraft/nuraft.hxx>
#include <common/logger_useful.h>
namespace DB
{
class LoggerWrapper : public nuraft::logger
{
LoggerWrapper(const std::string & name)
: log(&Poco::Logger::get(name))
{}
void put_details(
int level,
const char * /* source_file */,
const char * /* func_name */,
size_t /* line_number */,
const std::string & msg) override
{
LOG_IMPL(log, level, level, msg);
}
void set_level(int level) override
{
level = std::max(6, std::min(1, level));
log->setLevel(level);
}
int get_level() override
{
return log->getLevel();
}
pivate:
Poco::Logger * log;
};
}

View File

@ -0,0 +1,163 @@
#include <Coordination/SummingStateMachine.h>
#include <iostream>
namespace DB
{
static int64_t deserializeValue(nuraft::buffer & buffer)
{
nuraft::buffer_serializer bs(buffer);
int64_t result;
memcpy(&result, bs.get_raw(buffer.size()), sizeof(result));
return result;
}
SummingStateMachine::SummingStateMachine()
: value(0)
, last_committed_idx(0)
{
}
nuraft::ptr<nuraft::buffer> SummingStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
{
int64_t value_to_add = deserializeValue(data);
value += value_to_add;
last_committed_idx = log_idx;
// Return Raft log number as a return result.
nuraft::ptr<nuraft::buffer> ret = nuraft::buffer::alloc(sizeof(log_idx));
nuraft::buffer_serializer bs(ret);
bs.put_u64(log_idx);
return ret;
}
bool SummingStateMachine::apply_snapshot(nuraft::snapshot & s)
{
std::lock_guard<std::mutex> ll(snapshots_lock);
auto entry = snapshots.find(s.get_last_log_idx());
if (entry == snapshots.end())
return false;
auto ctx = entry->second;
value = ctx->value;
return true;
}
nuraft::ptr<nuraft::snapshot> SummingStateMachine::last_snapshot()
{
// Just return the latest snapshot.
std::lock_guard<std::mutex> ll(snapshots_lock);
auto entry = snapshots.rbegin();
if (entry == snapshots.rend()) return nullptr;
auto ctx = entry->second;
return ctx->snapshot;
}
void SummingStateMachine::createSnapshotInternal(nuraft::snapshot & s)
{
// Clone snapshot from `s`.
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
nuraft::ptr<nuraft::snapshot> ss = nuraft::snapshot::deserialize(*snp_buf);
// Put into snapshot map.
auto ctx = cs_new<SingleValueSnapshotContext>(ss, value);
snapshots[s.get_last_log_idx()] = ctx;
// Maintain last 3 snapshots only.
const int MAX_SNAPSHOTS = 3;
int num = snapshots.size();
auto entry = snapshots.begin();
for (int ii = 0; ii < num - MAX_SNAPSHOTS; ++ii)
{
if (entry == snapshots.end())
break;
entry = snapshots.erase(entry);
}
}
void SummingStateMachine::save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
nuraft::buffer & data,
bool /*is_first_obj*/,
bool /*is_last_obj*/)
{
if (obj_id == 0)
{
// Object ID == 0: it contains dummy value, create snapshot context.
createSnapshotInternal(s);
}
else
{
// Object ID > 0: actual snapshot value.
nuraft::buffer_serializer bs(data);
int64_t local_value = static_cast<int64_t>(bs.get_u64());
std::lock_guard<std::mutex> ll(snapshots_lock);
auto entry = snapshots.find(s.get_last_log_idx());
assert(entry != snapshots.end());
entry->second->value = local_value;
}
// Request next object.
obj_id++;
}
int SummingStateMachine::read_logical_snp_obj(
nuraft::snapshot & s,
void* & /*user_snp_ctx*/,
ulong obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj)
{
nuraft::ptr<SingleValueSnapshotContext> ctx = nullptr;
{
std::lock_guard<std::mutex> ll(snapshots_lock);
auto entry = snapshots.find(s.get_last_log_idx());
if (entry == snapshots.end()) {
// Snapshot doesn't exist.
data_out = nullptr;
is_last_obj = true;
return 0;
}
ctx = entry->second;
}
if (obj_id == 0)
{
// Object ID == 0: first object, put dummy data.
data_out = nuraft::buffer::alloc(sizeof(Int32));
nuraft::buffer_serializer bs(data_out);
bs.put_i32(0);
is_last_obj = false;
}
else
{
// Object ID > 0: second object, put actual value.
data_out = nuraft::buffer::alloc(sizeof(size_t));
nuraft::buffer_serializer bs(data_out);
bs.put_u64(ctx->value);
is_last_obj = true;
}
return 0;
}
void SummingStateMachine::create_snapshot(
nuraft::snapshot & s,
nuraft::async_result<bool>::handler_type & when_done)
{
{
std::lock_guard<std::mutex> ll(snapshots_lock);
createSnapshotInternal(s);
}
nuraft::ptr<std::exception> except(nullptr);
bool ret = true;
when_done(ret, except);
}
}

View File

@ -0,0 +1,77 @@
#pragma once
#include <libnuraft/nuraft.hxx>
#include <Core/Types.h>
#include <atomic>
#include <map>
#include <mutex>
namespace DB
{
class SummingStateMachine : public nuraft::state_machine
{
public:
SummingStateMachine();
nuraft::ptr<nuraft::buffer> pre_commit(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
nuraft::ptr<nuraft::buffer> commit(const size_t log_idx, nuraft::buffer & data) override;
void rollback(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
size_t last_commit_index() override { return last_committed_idx; }
bool apply_snapshot(nuraft::snapshot & s) override;
nuraft::ptr<nuraft::snapshot> last_snapshot() override;
void create_snapshot(
nuraft::snapshot & s,
nuraft::async_result<bool>::handler_type & when_done) override;
void save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
nuraft::buffer & data,
bool is_first_obj,
bool is_last_obj) override;
int read_logical_snp_obj(
nuraft::snapshot & s,
void* & user_snp_ctx,
ulong obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj) override;
int64_t getValue() const { return value; }
private:
struct SingleValueSnapshotContext
{
SingleValueSnapshotContext(nuraft::ptr<nuraft::snapshot> & s, int64_t v)
: snapshot(s)
, value(v)
{}
nuraft::ptr<nuraft::snapshot> snapshot;
int64_t value;
};
void createSnapshotInternal(nuraft::snapshot & s);
// State machine's current value.
std::atomic<int64_t> value;
// Last committed Raft log number.
std::atomic<uint64_t> last_committed_idx;
// Keeps the last 3 snapshots, by their Raft log numbers.
std::map<uint64_t, nuraft::ptr<SingleValueSnapshotContext>> snapshots;
// Mutex for `snapshots_`.
std::mutex snapshots_lock;
};
}

View File

@ -3,6 +3,7 @@
#include <Coordination/InMemoryLogStore.h>
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/SummingStateMachine.h>
#include <Coordination/LoggerWrapper.h>
#include <Common/Exception.h>
#include <libnuraft/nuraft.hxx>
#include <thread>
@ -45,7 +46,7 @@ struct SummingRaftServer
params.return_method_ = nuraft::raft_params::blocking;
raft_instance = launcher.init(
state_machine, state_manager, nuraft::cs_new<nuraft::logger>(), port,
state_machine, state_manager, nuraft::cs_new<LoggerWrapper>(), port,
nuraft::asio_service::options{}, params);
if (!raft_instance)
@ -101,7 +102,31 @@ nuraft::ptr<nuraft::buffer> getLogEntry(int64_t number)
return ret;
}
TEST(CoordinationTest, TestSummingRaft)
TEST(CoordinationTest, TestSummingRaft1)
{
SummingRaftServer s1(1, "localhost", 44444);
/// Single node is leader
EXPECT_EQ(s1.raft_instance->get_leader(), 1);
auto entry1 = getLogEntry(143);
auto ret = s1.raft_instance->append_entries({entry});
EXPECT_TRUE(ret->get_accepted()) << "failed to replicate: entry 1" << ret->get_result_code();
EXPECT_EQ(ret->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate: entry 1" << ret->get_result_code();
while (s1.state_machine->getValue() != 143)
{
std::cout << "Waiting s1 to apply entry\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
EXPECT_EQ(s1.state_machine->getValue(), 143);
s1.launcher.shutdown(5);
}
TEST(CoordinationTest, TestSummingRaft3)
{
SummingRaftServer s1(1, "localhost", 44444);
SummingRaftServer s2(2, "localhost", 44445);
@ -145,24 +170,8 @@ TEST(CoordinationTest, TestSummingRaft)
std::cerr << "Starting to add entries\n";
auto entry = getLogEntry(1);
auto ret = s2.raft_instance->append_entries({entry});
if (!ret->get_accepted())
{
// Log append rejected, usually because this node is not a leader.
std::cout << "failed to replicate: entry 1" << ret->get_result_code() << std::endl;
EXPECT_TRUE(false);
}
if (ret->get_result_code() != nuraft::cmd_result_code::OK)
{
// Something went wrong.
// This means committing this log failed,
// but the log itself is still in the log store.
std::cout << "failed to replicate: entry 1" << ret->get_result_code() << std::endl;
EXPECT_TRUE(false);
}
else
{
std::cout << "Append ok\n";
}
EXPECT_TRUE(ret->get_accepted()) << "failed to replicate: entry 1" << ret->get_result_code();
EXPECT_EQ(ret->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate: entry 1" << ret->get_result_code();
while (s1.state_machine->getValue() != 1)
{
@ -176,10 +185,52 @@ TEST(CoordinationTest, TestSummingRaft)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
while (s3.state_machine->getValue() != 1)
{
std::cout << "Waiting s3 to apply entry\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
EXPECT_EQ(s1.state_machine->getValue(), 1);
EXPECT_EQ(s2.state_machine->getValue(), 1);
EXPECT_EQ(s3.state_machine->getValue(), 1);
auto non_leader_entry = getLogEntry(3);
auto ret_non_leader1 = s1.raft_instance->append_entries({non_leader_entry});
EXPECT_FALSE(ret_non_leader1->get_accepted());
auto ret_non_leader3 = s3.raft_instance->append_entries({non_leader_entry});
EXPECT_FALSE(ret_non_leader3->get_accepted());
auto leader_entry = getLogEntry(77);
auto ret_leader = s2.raft_instance->append_entries({leader_entry});
EXPECT_TRUE(ret_leader->get_accepted()) << "failed to replicate: entry 78" << ret_leader->get_result_code();
EXPECT_EQ(ret_leader->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate: entry 78" << ret_leader->get_result_code();
while (s1.state_machine->getValue() != 78)
{
std::cout << "Waiting s1 to apply entry\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
while (s2.state_machine->getValue() != 78)
{
std::cout << "Waiting s2 to apply entry\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
while (s3.state_machine->getValue() != 78)
{
std::cout << "Waiting s3 to apply entry\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
EXPECT_EQ(s1.state_machine->getValue(), 78);
EXPECT_EQ(s2.state_machine->getValue(), 78);
EXPECT_EQ(s3.state_machine->getValue(), 78);
s1.launcher.shutdown(5);
s2.launcher.shutdown(5);
s3.launcher.shutdown(5);