Something working

This commit is contained in:
alesapin 2021-01-22 19:04:57 +03:00
parent 4a06bf1d79
commit c1e36cfe70
8 changed files with 231 additions and 38 deletions

View File

@ -830,6 +830,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
listen_try = true;
}
/// Initialize test keeper raft
global_context->getTestKeeperStorageDispatcher();
for (const auto & listen_host : listen_hosts)
{
/// TCP TestKeeper

View File

@ -1,13 +1,165 @@
#include <Coordination/NuKeeperServer.h>
#include <Coordination/LoggerWrapper.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <chrono>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <string>
namespace DB
{
void NuKeeperServer::addServer(int server_id_, const std::string & server_uri)
NuKeeperServer::NuKeeperServer(int server_id_, const std::string & hostname_, int port_)
: server_id(server_id_)
, hostname(hostname_)
, port(port_)
, endpoint(hostname + ":" + std::to_string(port))
, state_machine(nuraft::cs_new<NuKeeperStateMachine>())
, state_manager(nuraft::cs_new<InMemoryStateManager>(server_id, endpoint))
{
if (raft_instance->is_leader())
}
NuraftError NuKeeperServer::addServer(int server_id_, const std::string & server_uri_)
{
nuraft::srv_config config(server_id_, server_uri_);
auto ret1 = raft_instance->add_srv(config);
return NuraftError{ret1->get_result_code(), ret1->get_result_str()};
}
NuraftError NuKeeperServer::startup()
{
nuraft::raft_params params;
params.heart_beat_interval_ = 100;
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
params.reserved_log_items_ = 5;
params.snapshot_distance_ = 5;
params.client_req_timeout_ = 3000;
params.return_method_ = nuraft::raft_params::blocking;
raft_instance = launcher.init(
state_machine, state_manager, nuraft::cs_new<LoggerWrapper>("RaftInstance"), port,
nuraft::asio_service::options{}, params);
if (!raft_instance)
return NuraftError{nuraft::cmd_result_code::TIMEOUT, "Cannot create RAFT instance"};
static constexpr auto MAX_RETRY = 30;
for (size_t i = 0; i < MAX_RETRY; ++i)
{
nuraft::srv_config first_config(server_id, server_uri);
if (raft_instance->is_initialized())
return NuraftError{nuraft::cmd_result_code::OK, ""};
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return NuraftError{nuraft::cmd_result_code::TIMEOUT, "Cannot start RAFT instance"};
}
NuraftError NuKeeperServer::shutdown()
{
if (!launcher.shutdown(5))
return NuraftError{nuraft::cmd_result_code::TIMEOUT, "Temout waiting RAFT instance to shutdown"};
return NuraftError{nuraft::cmd_result_code::OK, ""};
}
namespace
{
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request)
{
DB::WriteBufferFromNuraftBuffer buf;
DB::writeIntBinary(session_id, buf);
request->write(buf);
return buf.getBuffer();
}
}
TestKeeperStorage::ResponsesForSessions NuKeeperServer::readZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer)
{
DB::TestKeeperStorage::ResponsesForSessions results;
DB::ReadBufferFromNuraftBuffer buf(buffer);
while (!buf.eof())
{
int64_t session_id;
DB::readIntBinary(session_id, buf);
int32_t length;
Coordination::XID xid;
int64_t zxid;
Coordination::Error err;
Coordination::read(length, buf);
Coordination::read(xid, buf);
Coordination::read(zxid, buf);
Coordination::read(err, buf);
Coordination::ZooKeeperResponsePtr response;
if (xid == Coordination::WATCH_XID)
response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
else
{
response = ops_mapping[session_id][xid];
ops_mapping[session_id].erase(xid);
if (ops_mapping[session_id].empty())
ops_mapping.erase(session_id);
}
if (err == Coordination::Error::ZOK && (xid == Coordination::WATCH_XID || response->getOpNum() != Coordination::OpNum::Close))
response->readImpl(buf);
response->xid = xid;
response->zxid = zxid;
response->error = err;
results.push_back(DB::TestKeeperStorage::ResponseForSession{session_id, response});
}
return results;
}
TestKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const TestKeeperStorage::RequestsForSessions & requests)
{
std::vector<nuraft::ptr<nuraft::buffer>> entries;
for (auto & [session_id, request] : requests)
{
ops_mapping[session_id][request->xid] = request->makeResponse();
entries.push_back(getZooKeeperLogEntry(session_id, request));
}
auto result = raft_instance->append_entries(entries);
if (!result->get_accepted())
return {};
if (result->get_result_code() != nuraft::cmd_result_code::OK)
return {};
return readZooKeeperResponses(result->get());
}
int64_t NuKeeperServer::getSessionID()
{
auto entry = nuraft::buffer::alloc(sizeof(size_t));
nuraft::buffer_serializer bs(entry);
bs.put_i64(0);
auto result = raft_instance->append_entries({entry});
if (!result->get_accepted())
return -1;
if (result->get_result_code() != nuraft::cmd_result_code::OK)
return -1;
auto resp = result->get();
nuraft::buffer_serializer bs_resp(resp);
return bs_resp.get_i64();
}
}

View File

@ -5,10 +5,17 @@
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/TestKeeperStorage.h>
#include <unordered_map>
namespace DB
{
struct NuraftError
{
nuraft::cmd_result_code code;
std::string message;
};
class NuKeeperServer
{
private:
@ -20,7 +27,7 @@ private:
std::string endpoint;
nuraft::ptr<StateMachine> state_machine;
nuraft::ptr<NuKeeperStateMachine> state_machine;
nuraft::ptr<nuraft::state_mgr> state_manager;
@ -28,16 +35,26 @@ private:
nuraft::ptr<nuraft::raft_server> raft_instance;
public:
NuKeeperServer(int server_id, const std::string & hostname, int port);
using XIDToOp = std::unordered_map<Coordination::XID, Coordination::ZooKeeperResponsePtr>;
void startup();
using SessionIDOps = std::unordered_map<int64_t, XIDToOp>;
SessionIDOps ops_mapping;
TestKeeperStorage::ResponsesForSessions readZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer);
public:
NuKeeperServer(int server_id_, const std::string & hostname_, int port_);
NuraftError startup();
TestKeeperStorage::ResponsesForSessions putRequests(const TestKeeperStorage::RequestsForSessions & requests);
void addServer(int server_id_, const std::string & server_uri);
int64_t getSessionID();
void shutdown();
NuraftError addServer(int server_id_, const std::string & server_uri);
NuraftError shutdown();
};
}

View File

@ -51,15 +51,32 @@ NuKeeperStateMachine::NuKeeperStateMachine()
nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
{
LOG_DEBUG(log, "Commiting logidx {}", log_idx);
auto request_for_session = parseRequest(data);
TestKeeperStorage::ResponsesForSessions responses_for_sessions;
if (data.size() == sizeof(size_t))
{
std::lock_guard lock(storage_lock);
responses_for_sessions = storage.processRequest(request_for_session.request, request_for_session.session_id);
LOG_DEBUG(log, "Session ID response {}", log_idx);
auto response = nuraft::buffer::alloc(sizeof(size_t));
nuraft::buffer_serializer bs(response);
{
std::lock_guard lock(storage_lock);
bs.put_i64(storage.getSessionID());
}
last_committed_idx = log_idx;
return response;
}
else
{
auto request_for_session = parseRequest(data);
//LOG_DEBUG(log, "GOT REQUEST {}", Coordination::toString(request_for_session.request->getOpNum()));
TestKeeperStorage::ResponsesForSessions responses_for_sessions;
{
std::lock_guard lock(storage_lock);
responses_for_sessions = storage.processRequest(request_for_session.request, request_for_session.session_id);
}
//LOG_DEBUG(log, "TOTAL RESPONSES {} FIRST XID {}", responses_for_sessions.size(), responses_for_sessions[0].response->xid);
last_committed_idx = log_idx;
return writeResponses(responses_for_sessions);
last_committed_idx = log_idx;
return writeResponses(responses_for_sessions);
}
}
bool NuKeeperStateMachine::apply_snapshot(nuraft::snapshot & s)

View File

@ -519,6 +519,7 @@ TestKeeperStorage::ResponsesForSessions TestKeeperStorage::finalize(const Reques
finalized = true;
/// TODO delete ephemerals
ResponsesForSessions finalize_results;
auto finish_watch = [] (const auto & watch_pair) -> ResponsesForSessions
{

View File

@ -18,16 +18,16 @@ void TestKeeperStorageDispatcher::processingThread()
{
while (!shutdown)
{
RequestInfo info;
TestKeeperStorage::RequestForSession request;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
if (requests_queue.tryPop(info, max_wait))
if (requests_queue.tryPop(request, max_wait))
{
if (shutdown)
break;
auto responses = storage.processRequest(info.request, info.session_id);
auto responses = server.putRequests({request});
for (const auto & response_for_session : responses)
setResponse(response_for_session.session_id, response_for_session.response);
}
@ -67,15 +67,17 @@ void TestKeeperStorageDispatcher::finalize()
processing_thread.join();
}
RequestInfo info;
TestKeeperStorage::RequestsForSessions expired_requests;
while (requests_queue.tryPop(info))
expired_requests.push_back(TestKeeperStorage::RequestForSession{info.session_id, info.request});
//TestKeeperStorage::RequestsForSessions expired_requests;
//TestKeeperStorage::RequestForSession request;
//while (requests_queue.tryPop(request))
// expired_requests.push_back(TestKeeperStorage::RequestForSession{request});
auto expired_responses = storage.finalize(expired_requests);
//auto expired_responses = storage.finalize(expired_requests);
for (const auto & response_for_session : expired_responses)
setResponse(response_for_session.session_id, response_for_session.response);
//for (const auto & response_for_session : expired_responses)
// setResponse(response_for_session.session_id, response_for_session.response);
/// TODO FIXME
server.shutdown();
}
void TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
@ -87,8 +89,7 @@ void TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperReques
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown session id {}", session_id);
}
RequestInfo request_info;
request_info.time = clock::now();
TestKeeperStorage::RequestForSession request_info;
request_info.request = request;
request_info.session_id = session_id;
@ -101,7 +102,9 @@ void TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperReques
}
TestKeeperStorageDispatcher::TestKeeperStorageDispatcher()
: server(1, "localhost", 44444)
{
server.startup();
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
}

View File

@ -2,8 +2,9 @@
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Coordination/TestKeeperStorage.h>
#include <functional>
#include <Coordination/NuKeeperServer.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
@ -17,16 +18,9 @@ private:
using clock = std::chrono::steady_clock;
struct RequestInfo
{
Coordination::ZooKeeperRequestPtr request;
clock::time_point time;
int64_t session_id;
};
std::mutex push_request_mutex;
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
using RequestsQueue = ConcurrentBoundedQueue<TestKeeperStorage::RequestForSession>;
RequestsQueue requests_queue{1};
std::atomic<bool> shutdown{false};
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
@ -36,7 +30,7 @@ private:
ThreadFromGlobalPool processing_thread;
TestKeeperStorage storage;
NuKeeperServer server;
std::mutex session_id_mutex;
private:
@ -46,6 +40,7 @@ private:
public:
TestKeeperStorageDispatcher();
~TestKeeperStorageDispatcher();
void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
@ -53,7 +48,7 @@ public:
int64_t getSessionID()
{
std::lock_guard lock(session_id_mutex);
return storage.getSessionID();
return server.getSessionID();
}
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);

View File

@ -127,18 +127,22 @@ void testCreateListWatchEvent(zkutil::ZooKeeper & zk)
void testMultiRequest(zkutil::ZooKeeper & zk)
{
std::cerr << "Testing multi request\n";
Coordination::Requests requests;
requests.push_back(zkutil::makeCreateRequest("/data/multirequest", "aaa", zkutil::CreateMode::Persistent));
requests.push_back(zkutil::makeSetRequest("/data/multirequest", "bbb", -1));
zk.multi(requests);
std::cerr << "Multi executed\n";
try
{
requests.clear();
std::cerr << "Testing bad multi\n";
requests.push_back(zkutil::makeCreateRequest("/data/multirequest", "qweqwe", zkutil::CreateMode::Persistent));
requests.push_back(zkutil::makeSetRequest("/data/multirequest", "bbb", -1));
requests.push_back(zkutil::makeSetRequest("/data/multirequest", "ccc", -1));
zk.multi(requests);
std::cerr << "Bad multi executed\n";
std::terminate();
}
catch (...)
@ -147,6 +151,7 @@ void testMultiRequest(zkutil::ZooKeeper & zk)
}
checkEq(zk, "/data/multirequest", "bbb");
std::cerr << "Multi request finished\n";
}
std::mutex elements_mutex;