diff --git a/src/Coordination/LoggerWrapper.h b/src/Coordination/LoggerWrapper.h new file mode 100644 index 00000000000..51718eaee8b --- /dev/null +++ b/src/Coordination/LoggerWrapper.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class LoggerWrapper : public nuraft::logger +{ + LoggerWrapper(const std::string & name) + : log(&Poco::Logger::get(name)) + {} + + void put_details( + int level, + const char * /* source_file */, + const char * /* func_name */, + size_t /* line_number */, + const std::string & msg) override + { + LOG_IMPL(log, level, level, msg); + } + + void set_level(int level) override + { + level = std::max(6, std::min(1, level)); + log->setLevel(level); + } + + int get_level() override + { + return log->getLevel(); + } + +pivate: + Poco::Logger * log; +}; + +} diff --git a/src/Coordination/SummingStateMachine.cpp b/src/Coordination/SummingStateMachine.cpp new file mode 100644 index 00000000000..16154ca8cd4 --- /dev/null +++ b/src/Coordination/SummingStateMachine.cpp @@ -0,0 +1,163 @@ +#include +#include + +namespace DB +{ + +static int64_t deserializeValue(nuraft::buffer & buffer) +{ + nuraft::buffer_serializer bs(buffer); + int64_t result; + memcpy(&result, bs.get_raw(buffer.size()), sizeof(result)); + return result; +} + +SummingStateMachine::SummingStateMachine() + : value(0) + , last_committed_idx(0) +{ +} + +nuraft::ptr SummingStateMachine::commit(const size_t log_idx, nuraft::buffer & data) +{ + int64_t value_to_add = deserializeValue(data); + + value += value_to_add; + last_committed_idx = log_idx; + + // Return Raft log number as a return result. + nuraft::ptr ret = nuraft::buffer::alloc(sizeof(log_idx)); + nuraft::buffer_serializer bs(ret); + bs.put_u64(log_idx); + return ret; +} + +bool SummingStateMachine::apply_snapshot(nuraft::snapshot & s) +{ + std::lock_guard ll(snapshots_lock); + auto entry = snapshots.find(s.get_last_log_idx()); + if (entry == snapshots.end()) + return false; + + auto ctx = entry->second; + value = ctx->value; + return true; +} + +nuraft::ptr SummingStateMachine::last_snapshot() +{ + // Just return the latest snapshot. + std::lock_guard ll(snapshots_lock); + auto entry = snapshots.rbegin(); + if (entry == snapshots.rend()) return nullptr; + + auto ctx = entry->second; + return ctx->snapshot; +} + + +void SummingStateMachine::createSnapshotInternal(nuraft::snapshot & s) +{ + // Clone snapshot from `s`. + nuraft::ptr snp_buf = s.serialize(); + nuraft::ptr ss = nuraft::snapshot::deserialize(*snp_buf); + + // Put into snapshot map. + auto ctx = cs_new(ss, value); + snapshots[s.get_last_log_idx()] = ctx; + + // Maintain last 3 snapshots only. + const int MAX_SNAPSHOTS = 3; + int num = snapshots.size(); + auto entry = snapshots.begin(); + + for (int ii = 0; ii < num - MAX_SNAPSHOTS; ++ii) + { + if (entry == snapshots.end()) + break; + entry = snapshots.erase(entry); + } +} + +void SummingStateMachine::save_logical_snp_obj( + nuraft::snapshot & s, + size_t & obj_id, + nuraft::buffer & data, + bool /*is_first_obj*/, + bool /*is_last_obj*/) +{ + if (obj_id == 0) + { + // Object ID == 0: it contains dummy value, create snapshot context. + createSnapshotInternal(s); + } + else + { + // Object ID > 0: actual snapshot value. + nuraft::buffer_serializer bs(data); + int64_t local_value = static_cast(bs.get_u64()); + + std::lock_guard ll(snapshots_lock); + auto entry = snapshots.find(s.get_last_log_idx()); + assert(entry != snapshots.end()); + entry->second->value = local_value; + } + // Request next object. + obj_id++; +} + +int SummingStateMachine::read_logical_snp_obj( + nuraft::snapshot & s, + void* & /*user_snp_ctx*/, + ulong obj_id, + nuraft::ptr & data_out, + bool & is_last_obj) +{ + nuraft::ptr ctx = nullptr; + { + std::lock_guard ll(snapshots_lock); + auto entry = snapshots.find(s.get_last_log_idx()); + if (entry == snapshots.end()) { + // Snapshot doesn't exist. + data_out = nullptr; + is_last_obj = true; + return 0; + } + ctx = entry->second; + } + + if (obj_id == 0) + { + // Object ID == 0: first object, put dummy data. + data_out = nuraft::buffer::alloc(sizeof(Int32)); + nuraft::buffer_serializer bs(data_out); + bs.put_i32(0); + is_last_obj = false; + + } + else + { + // Object ID > 0: second object, put actual value. + data_out = nuraft::buffer::alloc(sizeof(size_t)); + nuraft::buffer_serializer bs(data_out); + bs.put_u64(ctx->value); + is_last_obj = true; + } + return 0; +} + +void SummingStateMachine::create_snapshot( + nuraft::snapshot & s, + nuraft::async_result::handler_type & when_done) +{ + { + std::lock_guard ll(snapshots_lock); + createSnapshotInternal(s); + } + nuraft::ptr except(nullptr); + bool ret = true; + when_done(ret, except); +} + + +} diff --git a/src/Coordination/SummingStateMachine.h b/src/Coordination/SummingStateMachine.h new file mode 100644 index 00000000000..df343378408 --- /dev/null +++ b/src/Coordination/SummingStateMachine.h @@ -0,0 +1,77 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +class SummingStateMachine : public nuraft::state_machine +{ +public: + SummingStateMachine(); + + nuraft::ptr pre_commit(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; } + + nuraft::ptr commit(const size_t log_idx, nuraft::buffer & data) override; + + void rollback(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override {} + + size_t last_commit_index() override { return last_committed_idx; } + + bool apply_snapshot(nuraft::snapshot & s) override; + + nuraft::ptr last_snapshot() override; + + void create_snapshot( + nuraft::snapshot & s, + nuraft::async_result::handler_type & when_done) override; + + void save_logical_snp_obj( + nuraft::snapshot & s, + size_t & obj_id, + nuraft::buffer & data, + bool is_first_obj, + bool is_last_obj) override; + + int read_logical_snp_obj( + nuraft::snapshot & s, + void* & user_snp_ctx, + ulong obj_id, + nuraft::ptr & data_out, + bool & is_last_obj) override; + + int64_t getValue() const { return value; } + +private: + struct SingleValueSnapshotContext + { + SingleValueSnapshotContext(nuraft::ptr & s, int64_t v) + : snapshot(s) + , value(v) + {} + + nuraft::ptr snapshot; + int64_t value; + }; + + void createSnapshotInternal(nuraft::snapshot & s); + + // State machine's current value. + std::atomic value; + + // Last committed Raft log number. + std::atomic last_committed_idx; + + // Keeps the last 3 snapshots, by their Raft log numbers. + std::map> snapshots; + + // Mutex for `snapshots_`. + std::mutex snapshots_lock; + +}; + +} diff --git a/src/Coordination/tests/gtest_for_build.cpp b/src/Coordination/tests/gtest_for_build.cpp index f9856eb275a..5785c9adb27 100644 --- a/src/Coordination/tests/gtest_for_build.cpp +++ b/src/Coordination/tests/gtest_for_build.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -45,7 +46,7 @@ struct SummingRaftServer params.return_method_ = nuraft::raft_params::blocking; raft_instance = launcher.init( - state_machine, state_manager, nuraft::cs_new(), port, + state_machine, state_manager, nuraft::cs_new(), port, nuraft::asio_service::options{}, params); if (!raft_instance) @@ -101,7 +102,31 @@ nuraft::ptr getLogEntry(int64_t number) return ret; } -TEST(CoordinationTest, TestSummingRaft) + +TEST(CoordinationTest, TestSummingRaft1) +{ + SummingRaftServer s1(1, "localhost", 44444); + + /// Single node is leader + EXPECT_EQ(s1.raft_instance->get_leader(), 1); + + auto entry1 = getLogEntry(143); + auto ret = s1.raft_instance->append_entries({entry}); + EXPECT_TRUE(ret->get_accepted()) << "failed to replicate: entry 1" << ret->get_result_code(); + EXPECT_EQ(ret->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate: entry 1" << ret->get_result_code(); + + while (s1.state_machine->getValue() != 143) + { + std::cout << "Waiting s1 to apply entry\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + EXPECT_EQ(s1.state_machine->getValue(), 143); + + s1.launcher.shutdown(5); +} + +TEST(CoordinationTest, TestSummingRaft3) { SummingRaftServer s1(1, "localhost", 44444); SummingRaftServer s2(2, "localhost", 44445); @@ -145,24 +170,8 @@ TEST(CoordinationTest, TestSummingRaft) std::cerr << "Starting to add entries\n"; auto entry = getLogEntry(1); auto ret = s2.raft_instance->append_entries({entry}); - if (!ret->get_accepted()) - { - // Log append rejected, usually because this node is not a leader. - std::cout << "failed to replicate: entry 1" << ret->get_result_code() << std::endl; - EXPECT_TRUE(false); - } - if (ret->get_result_code() != nuraft::cmd_result_code::OK) - { - // Something went wrong. - // This means committing this log failed, - // but the log itself is still in the log store. - std::cout << "failed to replicate: entry 1" << ret->get_result_code() << std::endl; - EXPECT_TRUE(false); - } - else - { - std::cout << "Append ok\n"; - } + EXPECT_TRUE(ret->get_accepted()) << "failed to replicate: entry 1" << ret->get_result_code(); + EXPECT_EQ(ret->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate: entry 1" << ret->get_result_code(); while (s1.state_machine->getValue() != 1) { @@ -176,10 +185,52 @@ TEST(CoordinationTest, TestSummingRaft) std::this_thread::sleep_for(std::chrono::milliseconds(100)); } + while (s3.state_machine->getValue() != 1) + { + std::cout << "Waiting s3 to apply entry\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_EQ(s1.state_machine->getValue(), 1); EXPECT_EQ(s2.state_machine->getValue(), 1); EXPECT_EQ(s3.state_machine->getValue(), 1); + auto non_leader_entry = getLogEntry(3); + auto ret_non_leader1 = s1.raft_instance->append_entries({non_leader_entry}); + + EXPECT_FALSE(ret_non_leader1->get_accepted()); + + auto ret_non_leader3 = s3.raft_instance->append_entries({non_leader_entry}); + + EXPECT_FALSE(ret_non_leader3->get_accepted()); + + auto leader_entry = getLogEntry(77); + auto ret_leader = s2.raft_instance->append_entries({leader_entry}); + EXPECT_TRUE(ret_leader->get_accepted()) << "failed to replicate: entry 78" << ret_leader->get_result_code(); + EXPECT_EQ(ret_leader->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate: entry 78" << ret_leader->get_result_code(); + + while (s1.state_machine->getValue() != 78) + { + std::cout << "Waiting s1 to apply entry\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + while (s2.state_machine->getValue() != 78) + { + std::cout << "Waiting s2 to apply entry\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + while (s3.state_machine->getValue() != 78) + { + std::cout << "Waiting s3 to apply entry\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + EXPECT_EQ(s1.state_machine->getValue(), 78); + EXPECT_EQ(s2.state_machine->getValue(), 78); + EXPECT_EQ(s3.state_machine->getValue(), 78); + s1.launcher.shutdown(5); s2.launcher.shutdown(5); s3.launcher.shutdown(5);