Fix install script

This commit is contained in:
alesapin 2021-02-01 17:14:59 +03:00
parent 57c9b6c864
commit 365bf65f5a
27 changed files with 249 additions and 250 deletions

View File

@ -94,7 +94,7 @@
#endif
#if USE_NURAFT
# include <Server/TestKeeperTCPHandlerFactory.h>
# include <Server/NuKeeperTCPHandlerFactory.h>
#endif
namespace CurrentMetrics
@ -844,15 +844,15 @@ int Server::main(const std::vector<std::string> & /*args*/)
listen_try = true;
}
if (config().has("test_keeper_server"))
if (config().has("nu_keeper_server"))
{
#if USE_NURAFT
/// Initialize test keeper RAFT. Do nothing if no test_keeper_server in config.
global_context->initializeTestKeeperStorageDispatcher();
/// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config.
global_context->initializeNuKeeperStorageDispatcher();
for (const auto & listen_host : listen_hosts)
{
/// TCP TestKeeper
const char * port_name = "test_keeper_server.tcp_port";
/// TCP NuKeeper
const char * port_name = "nu_keeper_server.tcp_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
@ -862,9 +862,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
servers_to_start_before_tables->emplace_back(
port_name,
std::make_unique<Poco::Net::TCPServer>(
new TestKeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
new NuKeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for connections to fake zookeeper (tcp): {}", address.toString());
LOG_INFO(log, "Listening for connections to NuKeeper (tcp): {}", address.toString());
});
}
#else
@ -911,7 +911,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
else
LOG_INFO(log, "Closed connections to servers for tables.");
global_context->shutdownTestKeeperStorageDispatcher();
global_context->shutdownNuKeeperStorageDispatcher();
}
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.

View File

@ -1 +0,0 @@
../../../tests/config/config.d/test_keeper_port.xml

View File

@ -46,8 +46,8 @@ void NuKeeperServer::startup()
{
nuraft::raft_params params;
params.heart_beat_interval_ = 1000;
params.election_timeout_lower_bound_ = 3000;
params.election_timeout_upper_bound_ = 6000;
params.election_timeout_lower_bound_ = 500;
params.election_timeout_upper_bound_ = 1000;
params.reserved_log_items_ = 5000;
params.snapshot_distance_ = 5000;
params.client_req_timeout_ = 10000;
@ -75,9 +75,9 @@ void NuKeeperServer::startup()
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot start RAFT server within startup timeout");
}
TestKeeperStorage::ResponsesForSessions NuKeeperServer::shutdown(const TestKeeperStorage::RequestsForSessions & expired_requests)
NuKeeperStorage::ResponsesForSessions NuKeeperServer::shutdown(const NuKeeperStorage::RequestsForSessions & expired_requests)
{
TestKeeperStorage::ResponsesForSessions responses;
NuKeeperStorage::ResponsesForSessions responses;
if (isLeader())
{
try
@ -108,9 +108,9 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coord
}
TestKeeperStorage::ResponsesForSessions NuKeeperServer::readZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer)
NuKeeperStorage::ResponsesForSessions NuKeeperServer::readZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer)
{
DB::TestKeeperStorage::ResponsesForSessions results;
DB::NuKeeperStorage::ResponsesForSessions results;
DB::ReadBufferFromNuraftBuffer buf(buffer);
while (!buf.eof())
@ -153,12 +153,12 @@ TestKeeperStorage::ResponsesForSessions NuKeeperServer::readZooKeeperResponses(n
response->zxid = zxid;
response->error = err;
results.push_back(DB::TestKeeperStorage::ResponseForSession{session_id, response});
results.push_back(DB::NuKeeperStorage::ResponseForSession{session_id, response});
}
return results;
}
TestKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const TestKeeperStorage::RequestsForSessions & requests)
NuKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const NuKeeperStorage::RequestsForSessions & requests)
{
if (isLeaderAlive() && requests.size() == 1 && requests[0].request->isReadRequest())
{
@ -178,28 +178,28 @@ TestKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const TestKe
auto result = raft_instance->append_entries(entries);
if (!result->get_accepted())
{
TestKeeperStorage::ResponsesForSessions responses;
NuKeeperStorage::ResponsesForSessions responses;
for (const auto & [session_id, request] : requests)
{
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0; /// FIXME what we can do with it?
response->error = Coordination::Error::ZSESSIONEXPIRED;
responses.push_back(DB::TestKeeperStorage::ResponseForSession{session_id, response});
responses.push_back(DB::NuKeeperStorage::ResponseForSession{session_id, response});
}
return responses;
}
if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
{
TestKeeperStorage::ResponsesForSessions responses;
NuKeeperStorage::ResponsesForSessions responses;
for (const auto & [session_id, request] : requests)
{
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0; /// FIXME what we can do with it?
response->error = Coordination::Error::ZOPERATIONTIMEOUT;
responses.push_back(DB::TestKeeperStorage::ResponseForSession{session_id, response});
responses.push_back(DB::NuKeeperStorage::ResponseForSession{session_id, response});
}
return responses;
}

View File

@ -4,7 +4,7 @@
#include <Coordination/InMemoryLogStore.h>
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/TestKeeperStorage.h>
#include <Coordination/NuKeeperStorage.h>
#include <unordered_map>
namespace DB
@ -35,7 +35,7 @@ private:
SessionIDOps ops_mapping;
TestKeeperStorage::ResponsesForSessions readZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer);
NuKeeperStorage::ResponsesForSessions readZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer);
std::mutex append_entries_mutex;
@ -44,7 +44,7 @@ public:
void startup();
TestKeeperStorage::ResponsesForSessions putRequests(const TestKeeperStorage::RequestsForSessions & requests);
NuKeeperStorage::ResponsesForSessions putRequests(const NuKeeperStorage::RequestsForSessions & requests);
int64_t getSessionID();
@ -58,7 +58,7 @@ public:
void waitForServers(const std::vector<int32_t> & ids) const;
void waitForCatchUp() const;
TestKeeperStorage::ResponsesForSessions shutdown(const TestKeeperStorage::RequestsForSessions & expired_requests);
NuKeeperStorage::ResponsesForSessions shutdown(const NuKeeperStorage::RequestsForSessions & expired_requests);
};
}

View File

@ -3,17 +3,17 @@
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <IO/ReadHelpers.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Coordination/TestKeeperStorageSerializer.h>
#include <Coordination/NuKeeperStorageSerializer.h>
namespace DB
{
static constexpr int MAX_SNAPSHOTS = 3;
TestKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
{
ReadBufferFromNuraftBuffer buffer(data);
TestKeeperStorage::RequestForSession request_for_session;
NuKeeperStorage::RequestForSession request_for_session;
readIntBinary(request_for_session.session_id, buffer);
int32_t length;
@ -31,7 +31,7 @@ TestKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
return request_for_session;
}
nuraft::ptr<nuraft::buffer> writeResponses(TestKeeperStorage::ResponsesForSessions & responses)
nuraft::ptr<nuraft::buffer> writeResponses(NuKeeperStorage::ResponsesForSessions & responses)
{
WriteBufferFromNuraftBuffer buffer;
for (const auto & response_and_session : responses)
@ -67,7 +67,7 @@ nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, n
else
{
auto request_for_session = parseRequest(data);
TestKeeperStorage::ResponsesForSessions responses_for_sessions;
NuKeeperStorage::ResponsesForSessions responses_for_sessions;
{
std::lock_guard lock(storage_lock);
responses_for_sessions = storage.processRequest(request_for_session.request, request_for_session.session_id);
@ -118,10 +118,10 @@ NuKeeperStateMachine::StorageSnapshotPtr NuKeeperStateMachine::readSnapshot(nura
{
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
nuraft::ptr<nuraft::snapshot> ss = nuraft::snapshot::deserialize(*snp_buf);
TestKeeperStorageSerializer serializer;
NuKeeperStorageSerializer serializer;
ReadBufferFromNuraftBuffer reader(in);
TestKeeperStorage new_storage;
NuKeeperStorage new_storage;
serializer.deserialize(new_storage, reader);
return std::make_shared<StorageSnapshot>(ss, new_storage);
}
@ -129,7 +129,7 @@ NuKeeperStateMachine::StorageSnapshotPtr NuKeeperStateMachine::readSnapshot(nura
void NuKeeperStateMachine::writeSnapshot(const NuKeeperStateMachine::StorageSnapshotPtr & snapshot, nuraft::ptr<nuraft::buffer> & out)
{
TestKeeperStorageSerializer serializer;
NuKeeperStorageSerializer serializer;
WriteBufferFromNuraftBuffer writer;
serializer.serialize(snapshot->storage, writer);
@ -223,7 +223,7 @@ int NuKeeperStateMachine::read_logical_snp_obj(
return 0;
}
TestKeeperStorage::ResponsesForSessions NuKeeperStateMachine::processReadRequest(const TestKeeperStorage::RequestForSession & request_for_session)
NuKeeperStorage::ResponsesForSessions NuKeeperStateMachine::processReadRequest(const NuKeeperStorage::RequestForSession & request_for_session)
{
std::lock_guard lock(storage_lock);
return storage.processRequest(request_for_session.request, request_for_session.session_id);

View File

@ -1,6 +1,6 @@
#pragma once
#include <Coordination/TestKeeperStorage.h>
#include <Coordination/NuKeeperStorage.h>
#include <libnuraft/nuraft.hxx>
#include <common/logger_useful.h>
@ -42,23 +42,23 @@ public:
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj) override;
TestKeeperStorage & getStorage()
NuKeeperStorage & getStorage()
{
return storage;
}
TestKeeperStorage::ResponsesForSessions processReadRequest(const TestKeeperStorage::RequestForSession & request_for_session);
NuKeeperStorage::ResponsesForSessions processReadRequest(const NuKeeperStorage::RequestForSession & request_for_session);
private:
struct StorageSnapshot
{
StorageSnapshot(const nuraft::ptr<nuraft::snapshot> & s, const TestKeeperStorage & storage_)
StorageSnapshot(const nuraft::ptr<nuraft::snapshot> & s, const NuKeeperStorage & storage_)
: snapshot(s)
, storage(storage_)
{}
nuraft::ptr<nuraft::snapshot> snapshot;
TestKeeperStorage storage;
NuKeeperStorage storage;
};
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
@ -69,7 +69,7 @@ private:
static void writeSnapshot(const StorageSnapshotPtr & snapshot, nuraft::ptr<nuraft::buffer> & out);
TestKeeperStorage storage;
NuKeeperStorage storage;
/// Mutex for snapshots
std::mutex snapshots_lock;

View File

@ -1,4 +1,4 @@
#include <Coordination/TestKeeperStorage.h>
#include <Coordination/NuKeeperStorage.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/setThreadName.h>
#include <mutex>
@ -31,9 +31,9 @@ static String baseName(const String & path)
return path.substr(rslash_pos + 1);
}
static TestKeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches, Coordination::Event event_type)
static NuKeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches, Coordination::Event event_type)
{
TestKeeperStorage::ResponsesForSessions result;
NuKeeperStorage::ResponsesForSessions result;
auto it = watches.find(path);
if (it != watches.end())
{
@ -44,7 +44,7 @@ static TestKeeperStorage::ResponsesForSessions processWatchesImpl(const String &
watch_response->type = event_type;
watch_response->state = Coordination::State::CONNECTED;
for (auto watcher_session : it->second)
result.push_back(TestKeeperStorage::ResponseForSession{watcher_session, watch_response});
result.push_back(NuKeeperStorage::ResponseForSession{watcher_session, watch_response});
watches.erase(it);
}
@ -60,52 +60,52 @@ static TestKeeperStorage::ResponsesForSessions processWatchesImpl(const String &
watch_list_response->type = Coordination::Event::CHILD;
watch_list_response->state = Coordination::State::CONNECTED;
for (auto watcher_session : it->second)
result.push_back(TestKeeperStorage::ResponseForSession{watcher_session, watch_list_response});
result.push_back(NuKeeperStorage::ResponseForSession{watcher_session, watch_list_response});
list_watches.erase(it);
}
return result;
}
TestKeeperStorage::TestKeeperStorage()
NuKeeperStorage::NuKeeperStorage()
{
container.emplace("/", Node());
}
using Undo = std::function<void()>;
struct TestKeeperStorageRequest
struct NuKeeperStorageRequest
{
Coordination::ZooKeeperRequestPtr zk_request;
explicit TestKeeperStorageRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
explicit NuKeeperStorageRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
: zk_request(zk_request_)
{}
virtual std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const = 0;
virtual TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & /*watches*/, TestKeeperStorage::Watches & /*list_watches*/) const { return {}; }
virtual std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const = 0;
virtual NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & /*watches*/, NuKeeperStorage::Watches & /*list_watches*/) const { return {}; }
virtual ~TestKeeperStorageRequest() = default;
virtual ~NuKeeperStorageRequest() = default;
};
struct TestKeeperStorageHeartbeatRequest final : public TestKeeperStorageRequest
struct NuKeeperStorageHeartbeatRequest final : public NuKeeperStorageRequest
{
using TestKeeperStorageRequest::TestKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & /* container */, TestKeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & /* container */, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override
{
return {zk_request->makeResponse(), {}};
}
};
struct TestKeeperStorageCreateRequest final : public TestKeeperStorageRequest
struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest
{
using TestKeeperStorageRequest::TestKeeperStorageRequest;
using NuKeeperStorageRequest::NuKeeperStorageRequest;
TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override
{
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED);
}
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Undo undo;
@ -130,7 +130,7 @@ struct TestKeeperStorageCreateRequest final : public TestKeeperStorageRequest
}
else
{
TestKeeperStorage::Node created_node;
NuKeeperStorage::Node created_node;
created_node.seq_num = 0;
created_node.stat.czxid = zxid;
created_node.stat.mzxid = zxid;
@ -185,10 +185,10 @@ struct TestKeeperStorageCreateRequest final : public TestKeeperStorageRequest
}
};
struct TestKeeperStorageGetRequest final : public TestKeeperStorageRequest
struct NuKeeperStorageGetRequest final : public NuKeeperStorageRequest
{
using TestKeeperStorageRequest::TestKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperGetResponse & response = dynamic_cast<Coordination::ZooKeeperGetResponse &>(*response_ptr);
@ -210,10 +210,10 @@ struct TestKeeperStorageGetRequest final : public TestKeeperStorageRequest
}
};
struct TestKeeperStorageRemoveRequest final : public TestKeeperStorageRequest
struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
{
using TestKeeperStorageRequest::TestKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t session_id) const override
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t session_id) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
@ -260,16 +260,16 @@ struct TestKeeperStorageRemoveRequest final : public TestKeeperStorageRequest
return { response_ptr, undo };
}
TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override
{
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
}
};
struct TestKeeperStorageExistsRequest final : public TestKeeperStorageRequest
struct NuKeeperStorageExistsRequest final : public NuKeeperStorageRequest
{
using TestKeeperStorageRequest::TestKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /* session_id */) const override
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /* session_id */) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperExistsResponse & response = dynamic_cast<Coordination::ZooKeeperExistsResponse &>(*response_ptr);
@ -290,10 +290,10 @@ struct TestKeeperStorageExistsRequest final : public TestKeeperStorageRequest
}
};
struct TestKeeperStorageSetRequest final : public TestKeeperStorageRequest
struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest
{
using TestKeeperStorageRequest::TestKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & /* ephemerals */, int64_t zxid, int64_t /* session_id */) const override
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t zxid, int64_t /* session_id */) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperSetResponse & response = dynamic_cast<Coordination::ZooKeeperSetResponse &>(*response_ptr);
@ -333,17 +333,17 @@ struct TestKeeperStorageSetRequest final : public TestKeeperStorageRequest
return { response_ptr, undo };
}
TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override
{
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
}
};
struct TestKeeperStorageListRequest final : public TestKeeperStorageRequest
struct NuKeeperStorageListRequest final : public NuKeeperStorageRequest
{
using TestKeeperStorageRequest::TestKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperListResponse & response = dynamic_cast<Coordination::ZooKeeperListResponse &>(*response_ptr);
@ -379,10 +379,10 @@ struct TestKeeperStorageListRequest final : public TestKeeperStorageRequest
}
};
struct TestKeeperStorageCheckRequest final : public TestKeeperStorageRequest
struct NuKeeperStorageCheckRequest final : public NuKeeperStorageRequest
{
using TestKeeperStorageRequest::TestKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperCheckResponse & response = dynamic_cast<Coordination::ZooKeeperCheckResponse &>(*response_ptr);
@ -405,11 +405,11 @@ struct TestKeeperStorageCheckRequest final : public TestKeeperStorageRequest
}
};
struct TestKeeperStorageMultiRequest final : public TestKeeperStorageRequest
struct NuKeeperStorageMultiRequest final : public NuKeeperStorageRequest
{
std::vector<TestKeeperStorageRequestPtr> concrete_requests;
explicit TestKeeperStorageMultiRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
: TestKeeperStorageRequest(zk_request_)
std::vector<NuKeeperStorageRequestPtr> concrete_requests;
explicit NuKeeperStorageMultiRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
: NuKeeperStorageRequest(zk_request_)
{
Coordination::ZooKeeperMultiRequest & request = dynamic_cast<Coordination::ZooKeeperMultiRequest &>(*zk_request);
concrete_requests.reserve(request.requests.size());
@ -419,26 +419,26 @@ struct TestKeeperStorageMultiRequest final : public TestKeeperStorageRequest
auto sub_zk_request = std::dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_request);
if (sub_zk_request->getOpNum() == Coordination::OpNum::Create)
{
concrete_requests.push_back(std::make_shared<TestKeeperStorageCreateRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<NuKeeperStorageCreateRequest>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Remove)
{
concrete_requests.push_back(std::make_shared<TestKeeperStorageRemoveRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<NuKeeperStorageRemoveRequest>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Set)
{
concrete_requests.push_back(std::make_shared<TestKeeperStorageSetRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<NuKeeperStorageSetRequest>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Check)
{
concrete_requests.push_back(std::make_shared<TestKeeperStorageCheckRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<NuKeeperStorageCheckRequest>(sub_zk_request));
}
else
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum());
}
}
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);
@ -491,9 +491,9 @@ struct TestKeeperStorageMultiRequest final : public TestKeeperStorageRequest
}
}
TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override
{
TestKeeperStorage::ResponsesForSessions result;
NuKeeperStorage::ResponsesForSessions result;
for (const auto & generic_request : concrete_requests)
{
auto responses = generic_request->processWatches(watches, list_watches);
@ -503,16 +503,16 @@ struct TestKeeperStorageMultiRequest final : public TestKeeperStorageRequest
}
};
struct TestKeeperStorageCloseRequest final : public TestKeeperStorageRequest
struct NuKeeperStorageCloseRequest final : public NuKeeperStorageRequest
{
using TestKeeperStorageRequest::TestKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container &, TestKeeperStorage::Ephemerals &, int64_t, int64_t) const override
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container &, NuKeeperStorage::Ephemerals &, int64_t, int64_t) const override
{
throw DB::Exception("Called process on close request", ErrorCodes::LOGICAL_ERROR);
}
};
TestKeeperStorage::ResponsesForSessions TestKeeperStorage::finalize(const RequestsForSessions & expired_requests)
NuKeeperStorage::ResponsesForSessions NuKeeperStorage::finalize(const RequestsForSessions & expired_requests)
{
if (finalized)
throw DB::Exception("Testkeeper storage already finalized", ErrorCodes::LOGICAL_ERROR);
@ -559,20 +559,20 @@ TestKeeperStorage::ResponsesForSessions TestKeeperStorage::finalize(const Reques
}
class TestKeeperWrapperFactory final : private boost::noncopyable
class NuKeeperWrapperFactory final : private boost::noncopyable
{
public:
using Creator = std::function<TestKeeperStorageRequestPtr(const Coordination::ZooKeeperRequestPtr &)>;
using Creator = std::function<NuKeeperStorageRequestPtr(const Coordination::ZooKeeperRequestPtr &)>;
using OpNumToRequest = std::unordered_map<Coordination::OpNum, Creator>;
static TestKeeperWrapperFactory & instance()
static NuKeeperWrapperFactory & instance()
{
static TestKeeperWrapperFactory factory;
static NuKeeperWrapperFactory factory;
return factory;
}
TestKeeperStorageRequestPtr get(const Coordination::ZooKeeperRequestPtr & zk_request) const
NuKeeperStorageRequestPtr get(const Coordination::ZooKeeperRequestPtr & zk_request) const
{
auto it = op_num_to_request.find(zk_request->getOpNum());
if (it == op_num_to_request.end())
@ -589,36 +589,36 @@ public:
private:
OpNumToRequest op_num_to_request;
TestKeeperWrapperFactory();
NuKeeperWrapperFactory();
};
template<Coordination::OpNum num, typename RequestT>
void registerTestKeeperRequestWrapper(TestKeeperWrapperFactory & factory)
void registerNuKeeperRequestWrapper(NuKeeperWrapperFactory & factory)
{
factory.registerRequest(num, [] (const Coordination::ZooKeeperRequestPtr & zk_request) { return std::make_shared<RequestT>(zk_request); });
}
TestKeeperWrapperFactory::TestKeeperWrapperFactory()
NuKeeperWrapperFactory::NuKeeperWrapperFactory()
{
registerTestKeeperRequestWrapper<Coordination::OpNum::Heartbeat, TestKeeperStorageHeartbeatRequest>(*this);
//registerTestKeeperRequestWrapper<Coordination::OpNum::Auth, TestKeeperStorageAuthRequest>(*this);
registerTestKeeperRequestWrapper<Coordination::OpNum::Close, TestKeeperStorageCloseRequest>(*this);
registerTestKeeperRequestWrapper<Coordination::OpNum::Create, TestKeeperStorageCreateRequest>(*this);
registerTestKeeperRequestWrapper<Coordination::OpNum::Remove, TestKeeperStorageRemoveRequest>(*this);
registerTestKeeperRequestWrapper<Coordination::OpNum::Exists, TestKeeperStorageExistsRequest>(*this);
registerTestKeeperRequestWrapper<Coordination::OpNum::Get, TestKeeperStorageGetRequest>(*this);
registerTestKeeperRequestWrapper<Coordination::OpNum::Set, TestKeeperStorageSetRequest>(*this);
registerTestKeeperRequestWrapper<Coordination::OpNum::List, TestKeeperStorageListRequest>(*this);
registerTestKeeperRequestWrapper<Coordination::OpNum::SimpleList, TestKeeperStorageListRequest>(*this);
registerTestKeeperRequestWrapper<Coordination::OpNum::Check, TestKeeperStorageCheckRequest>(*this);
registerTestKeeperRequestWrapper<Coordination::OpNum::Multi, TestKeeperStorageMultiRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Heartbeat, NuKeeperStorageHeartbeatRequest>(*this);
//registerNuKeeperRequestWrapper<Coordination::OpNum::Auth, NuKeeperStorageAuthRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Close, NuKeeperStorageCloseRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Create, NuKeeperStorageCreateRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Remove, NuKeeperStorageRemoveRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Exists, NuKeeperStorageExistsRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Get, NuKeeperStorageGetRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Set, NuKeeperStorageSetRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::List, NuKeeperStorageListRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::SimpleList, NuKeeperStorageListRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Check, NuKeeperStorageCheckRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Multi, NuKeeperStorageMultiRequest>(*this);
}
TestKeeperStorage::ResponsesForSessions TestKeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id)
NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id)
{
TestKeeperStorage::ResponsesForSessions results;
NuKeeperStorage::ResponsesForSessions results;
if (zk_request->getOpNum() == Coordination::OpNum::Close)
{
auto it = ephemerals.find(session_id);
@ -643,7 +643,7 @@ TestKeeperStorage::ResponsesForSessions TestKeeperStorage::processRequest(const
else
{
TestKeeperStorageRequestPtr storage_request = TestKeeperWrapperFactory::instance().get(zk_request);
NuKeeperStorageRequestPtr storage_request = NuKeeperWrapperFactory::instance().get(zk_request);
auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id);
if (zk_request->has_watch)
@ -689,7 +689,7 @@ TestKeeperStorage::ResponsesForSessions TestKeeperStorage::processRequest(const
}
void TestKeeperStorage::clearDeadWatches(int64_t session_id)
void NuKeeperStorage::clearDeadWatches(int64_t session_id)
{
auto watches_it = sessions_and_watchers.find(session_id);
if (watches_it != sessions_and_watchers.end())

View File

@ -12,11 +12,11 @@ namespace DB
{
using namespace DB;
struct TestKeeperStorageRequest;
using TestKeeperStorageRequestPtr = std::shared_ptr<TestKeeperStorageRequest>;
struct NuKeeperStorageRequest;
using NuKeeperStorageRequestPtr = std::shared_ptr<NuKeeperStorageRequest>;
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
class TestKeeperStorage
class NuKeeperStorage
{
public:
int64_t session_id_counter{0};
@ -72,7 +72,7 @@ public:
}
public:
TestKeeperStorage();
NuKeeperStorage();
int64_t getSessionID()
{

View File

@ -1,4 +1,4 @@
#include <Coordination/TestKeeperStorageDispatcher.h>
#include <Coordination/NuKeeperStorageDispatcher.h>
#include <Common/setThreadName.h>
namespace DB
@ -11,17 +11,17 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
}
TestKeeperStorageDispatcher::TestKeeperStorageDispatcher()
: log(&Poco::Logger::get("TestKeeperDispatcher"))
NuKeeperStorageDispatcher::NuKeeperStorageDispatcher()
: log(&Poco::Logger::get("NuKeeperDispatcher"))
{
}
void TestKeeperStorageDispatcher::processingThread()
void NuKeeperStorageDispatcher::processingThread()
{
setThreadName("TestKeeperSProc");
setThreadName("NuKeeperSProc");
while (!shutdown_called)
{
TestKeeperStorage::RequestForSession request;
NuKeeperStorage::RequestForSession request;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
@ -44,7 +44,7 @@ void TestKeeperStorageDispatcher::processingThread()
}
}
void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
void NuKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_writer = session_to_response_callback.find(session_id);
@ -57,7 +57,7 @@ void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordina
session_to_response_callback.erase(session_writer);
}
bool TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
bool NuKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
{
@ -66,7 +66,7 @@ bool TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperReques
return false;
}
TestKeeperStorage::RequestForSession request_info;
NuKeeperStorage::RequestForSession request_info;
request_info.request = request;
request_info.session_id = session_id;
@ -104,27 +104,27 @@ namespace
}
}
void TestKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration & config)
void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration & config)
{
LOG_DEBUG(log, "Initializing storage dispatcher");
int myid = config.getInt("test_keeper_server.server_id");
int myid = config.getInt("nu_keeper_server.server_id");
std::string myhostname;
int myport;
int32_t my_priority = 1;
Poco::Util::AbstractConfiguration::Keys keys;
config.keys("test_keeper_server.raft_configuration", keys);
config.keys("nu_keeper_server.raft_configuration", keys);
bool my_can_become_leader = true;
std::vector<std::tuple<int, std::string, int, bool, int32_t>> server_configs;
std::vector<int32_t> ids;
for (const auto & server_key : keys)
{
int server_id = config.getInt("test_keeper_server.raft_configuration." + server_key + ".id");
std::string hostname = config.getString("test_keeper_server.raft_configuration." + server_key + ".hostname");
int port = config.getInt("test_keeper_server.raft_configuration." + server_key + ".port");
bool can_become_leader = config.getBool("test_keeper_server.raft_configuration." + server_key + ".can_become_leader", true);
int32_t priority = config.getInt("test_keeper_server.raft_configuration." + server_key + ".priority", 1);
int server_id = config.getInt("nu_keeper_server.raft_configuration." + server_key + ".id");
std::string hostname = config.getString("nu_keeper_server.raft_configuration." + server_key + ".hostname");
int port = config.getInt("nu_keeper_server.raft_configuration." + server_key + ".port");
bool can_become_leader = config.getBool("nu_keeper_server.raft_configuration." + server_key + ".can_become_leader", true);
int32_t priority = config.getInt("nu_keeper_server.raft_configuration." + server_key + ".priority", 1);
if (server_id == myid)
{
myhostname = hostname;
@ -175,7 +175,7 @@ void TestKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigura
LOG_DEBUG(log, "Dispatcher initialized");
}
void TestKeeperStorageDispatcher::shutdown()
void NuKeeperStorageDispatcher::shutdown()
{
try
{
@ -194,10 +194,10 @@ void TestKeeperStorageDispatcher::shutdown()
if (server)
{
TestKeeperStorage::RequestsForSessions expired_requests;
TestKeeperStorage::RequestForSession request;
NuKeeperStorage::RequestsForSessions expired_requests;
NuKeeperStorage::RequestForSession request;
while (requests_queue.tryPop(request))
expired_requests.push_back(TestKeeperStorage::RequestForSession{request});
expired_requests.push_back(NuKeeperStorage::RequestForSession{request});
auto expired_responses = server->shutdown(expired_requests);
@ -213,19 +213,19 @@ void TestKeeperStorageDispatcher::shutdown()
LOG_DEBUG(log, "Dispatcher shut down");
}
TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher()
NuKeeperStorageDispatcher::~NuKeeperStorageDispatcher()
{
shutdown();
}
void TestKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
void NuKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
{
std::lock_guard lock(session_to_response_callback_mutex);
if (!session_to_response_callback.try_emplace(session_id, callback).second)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
}
void TestKeeperStorageDispatcher::finishSession(int64_t session_id)
void NuKeeperStorageDispatcher::finishSession(int64_t session_id)
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_it = session_to_response_callback.find(session_id);

View File

@ -21,7 +21,7 @@ namespace DB
using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response)>;
class TestKeeperStorageDispatcher
class NuKeeperStorageDispatcher
{
private:
@ -30,7 +30,7 @@ private:
std::mutex push_request_mutex;
using RequestsQueue = ConcurrentBoundedQueue<TestKeeperStorage::RequestForSession>;
using RequestsQueue = ConcurrentBoundedQueue<NuKeeperStorage::RequestForSession>;
RequestsQueue requests_queue{1};
std::atomic<bool> shutdown_called{false};
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
@ -49,13 +49,13 @@ private:
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
public:
TestKeeperStorageDispatcher();
NuKeeperStorageDispatcher();
void initialize(const Poco::Util::AbstractConfiguration & config);
void shutdown();
~TestKeeperStorageDispatcher();
~NuKeeperStorageDispatcher();
bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);

View File

@ -1,4 +1,4 @@
#include <Coordination/TestKeeperStorageSerializer.h>
#include <Coordination/NuKeeperStorageSerializer.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
@ -8,7 +8,7 @@ namespace DB
namespace
{
void writeNode(const TestKeeperStorage::Node & node, WriteBuffer & out)
void writeNode(const NuKeeperStorage::Node & node, WriteBuffer & out)
{
Coordination::write(node.data, out);
Coordination::write(node.acls, out);
@ -18,7 +18,7 @@ namespace
Coordination::write(node.seq_num, out);
}
void readNode(TestKeeperStorage::Node & node, ReadBuffer & in)
void readNode(NuKeeperStorage::Node & node, ReadBuffer & in)
{
Coordination::read(node.data, in);
Coordination::read(node.acls, in);
@ -29,7 +29,7 @@ namespace
}
}
void TestKeeperStorageSerializer::serialize(const TestKeeperStorage & storage, WriteBuffer & out)
void NuKeeperStorageSerializer::serialize(const NuKeeperStorage & storage, WriteBuffer & out)
{
Coordination::write(storage.zxid, out);
Coordination::write(storage.session_id_counter, out);
@ -49,7 +49,7 @@ void TestKeeperStorageSerializer::serialize(const TestKeeperStorage & storage, W
}
}
void TestKeeperStorageSerializer::deserialize(TestKeeperStorage & storage, ReadBuffer & in)
void NuKeeperStorageSerializer::deserialize(NuKeeperStorage & storage, ReadBuffer & in)
{
int64_t session_id_counter, zxid;
Coordination::read(zxid, in);
@ -63,7 +63,7 @@ void TestKeeperStorageSerializer::deserialize(TestKeeperStorage & storage, ReadB
{
std::string path;
Coordination::read(path, in);
TestKeeperStorage::Node node;
NuKeeperStorage::Node node;
readNode(node, in);
storage.container[path] = node;
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <Coordination/NuKeeperStorage.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadBuffer.h>
namespace DB
{
class NuKeeperStorageSerializer
{
public:
static void serialize(const NuKeeperStorage & storage, WriteBuffer & out);
static void deserialize(NuKeeperStorage & storage, ReadBuffer & in);
};
}

View File

@ -1,17 +0,0 @@
#pragma once
#include <Coordination/TestKeeperStorage.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadBuffer.h>
namespace DB
{
class TestKeeperStorageSerializer
{
public:
static void serialize(const TestKeeperStorage & storage, WriteBuffer & out);
static void deserialize(TestKeeperStorage & storage, ReadBuffer & in);
};
}

View File

@ -9,7 +9,7 @@
#include <Coordination/InMemoryLogStore.h>
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/TestKeeperStorageSerializer.h>
#include <Coordination/NuKeeperStorageSerializer.h>
#include <Coordination/SummingStateMachine.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/LoggerWrapper.h>
@ -283,9 +283,9 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coord
return buf.getBuffer();
}
DB::TestKeeperStorage::ResponsesForSessions getZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer, const Coordination::ZooKeeperRequestPtr & request)
DB::NuKeeperStorage::ResponsesForSessions getZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer, const Coordination::ZooKeeperRequestPtr & request)
{
DB::TestKeeperStorage::ResponsesForSessions results;
DB::NuKeeperStorage::ResponsesForSessions results;
DB::ReadBufferFromNuraftBuffer buf(buffer);
while (!buf.eof())
{
@ -303,28 +303,28 @@ DB::TestKeeperStorage::ResponsesForSessions getZooKeeperResponses(nuraft::ptr<nu
Coordination::read(err, buf);
auto response = request->makeResponse();
response->readImpl(buf);
results.push_back(DB::TestKeeperStorage::ResponseForSession{session_id, response});
results.push_back(DB::NuKeeperStorage::ResponseForSession{session_id, response});
}
return results;
}
TEST(CoordinationTest, TestStorageSerialization)
{
DB::TestKeeperStorage storage;
storage.container["/hello"] = DB::TestKeeperStorage::Node{.data="world"};
storage.container["/hello/somepath"] = DB::TestKeeperStorage::Node{.data="somedata"};
DB::NuKeeperStorage storage;
storage.container["/hello"] = DB::NuKeeperStorage::Node{.data="world"};
storage.container["/hello/somepath"] = DB::NuKeeperStorage::Node{.data="somedata"};
storage.session_id_counter = 5;
storage.zxid = 156;
storage.ephemerals[3] = {"/hello", "/"};
storage.ephemerals[1] = {"/hello/somepath"};
DB::WriteBufferFromOwnString buffer;
DB::TestKeeperStorageSerializer serializer;
DB::NuKeeperStorageSerializer serializer;
serializer.serialize(storage, buffer);
std::string serialized = buffer.str();
EXPECT_NE(serialized.size(), 0);
DB::ReadBufferFromString read(serialized);
DB::TestKeeperStorage new_storage;
DB::NuKeeperStorage new_storage;
serializer.deserialize(new_storage, read);
EXPECT_EQ(new_storage.container.size(), 3);

View File

@ -14,10 +14,10 @@ SRCS(
InMemoryStateManager.cpp
NuKeeperServer.cpp
NuKeeperStateMachine.cpp
NuKeeperStorage.cpp
NuKeeperStorageDispatcher.cpp
NuKeeperStorageSerializer.cpp
SummingStateMachine.cpp
TestKeeperStorage.cpp
TestKeeperStorageDispatcher.cpp
TestKeeperStorageSerializer.cpp
WriteBufferFromNuraftBuffer.cpp
)

View File

@ -12,7 +12,7 @@
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/thread_local_rng.h>
#include <Coordination/TestKeeperStorageDispatcher.h>
#include <Coordination/NuKeeperStorageDispatcher.h>
#include <Compression/ICompressionCodec.h>
#include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h>
@ -305,8 +305,8 @@ struct ContextShared
ConfigurationPtr zookeeper_config; /// Stores zookeeper configs
#if USE_NURAFT
mutable std::mutex test_keeper_storage_dispatcher_mutex;
mutable std::shared_ptr<TestKeeperStorageDispatcher> test_keeper_storage_dispatcher;
mutable std::mutex nu_keeper_storage_dispatcher_mutex;
mutable std::shared_ptr<NuKeeperStorageDispatcher> nu_keeper_storage_dispatcher;
#endif
mutable std::mutex auxiliary_zookeepers_mutex;
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
@ -1582,42 +1582,42 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
}
void Context::initializeTestKeeperStorageDispatcher() const
void Context::initializeNuKeeperStorageDispatcher() const
{
#if USE_NURAFT
std::lock_guard lock(shared->test_keeper_storage_dispatcher_mutex);
std::lock_guard lock(shared->nu_keeper_storage_dispatcher_mutex);
if (shared->test_keeper_storage_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize TestKeeper multiple times");
if (shared->nu_keeper_storage_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize NuKeeper multiple times");
const auto & config = getConfigRef();
if (config.has("test_keeper_server"))
if (config.has("nu_keeper_server"))
{
shared->test_keeper_storage_dispatcher = std::make_shared<TestKeeperStorageDispatcher>();
shared->test_keeper_storage_dispatcher->initialize(config);
shared->nu_keeper_storage_dispatcher = std::make_shared<NuKeeperStorageDispatcher>();
shared->nu_keeper_storage_dispatcher->initialize(config);
}
#endif
}
#if USE_NURAFT
std::shared_ptr<TestKeeperStorageDispatcher> & Context::getTestKeeperStorageDispatcher() const
std::shared_ptr<NuKeeperStorageDispatcher> & Context::getNuKeeperStorageDispatcher() const
{
std::lock_guard lock(shared->test_keeper_storage_dispatcher_mutex);
if (!shared->test_keeper_storage_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "TestKeeper must be initialized before requests");
std::lock_guard lock(shared->nu_keeper_storage_dispatcher_mutex);
if (!shared->nu_keeper_storage_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "NuKeeper must be initialized before requests");
return shared->test_keeper_storage_dispatcher;
return shared->nu_keeper_storage_dispatcher;
}
#endif
void Context::shutdownTestKeeperStorageDispatcher() const
void Context::shutdownNuKeeperStorageDispatcher() const
{
#if USE_NURAFT
std::lock_guard lock(shared->test_keeper_storage_dispatcher_mutex);
if (shared->test_keeper_storage_dispatcher)
std::lock_guard lock(shared->nu_keeper_storage_dispatcher_mutex);
if (shared->nu_keeper_storage_dispatcher)
{
shared->test_keeper_storage_dispatcher->shutdown();
shared->test_keeper_storage_dispatcher.reset();
shared->nu_keeper_storage_dispatcher->shutdown();
shared->nu_keeper_storage_dispatcher.reset();
}
#endif
}

View File

@ -106,7 +106,7 @@ using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
using StoragePoliciesMap = std::map<String, StoragePolicyPtr>;
class StoragePolicySelector;
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
class TestKeeperStorageDispatcher;
class NuKeeperStorageDispatcher;
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
@ -574,10 +574,10 @@ public:
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
#if USE_NURAFT
std::shared_ptr<TestKeeperStorageDispatcher> & getTestKeeperStorageDispatcher() const;
std::shared_ptr<NuKeeperStorageDispatcher> & getNuKeeperStorageDispatcher() const;
#endif
void initializeTestKeeperStorageDispatcher() const;
void shutdownTestKeeperStorageDispatcher() const;
void initializeNuKeeperStorageDispatcher() const;
void shutdownNuKeeperStorageDispatcher() const;
/// Set auxiliary zookeepers configuration at server starting or configuration reloading.
void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config);

View File

@ -1,4 +1,4 @@
#include <Server/TestKeeperTCPHandler.h>
#include <Server/NuKeeperTCPHandler.h>
#if USE_NURAFT
@ -224,20 +224,20 @@ struct SocketInterruptablePollWrapper
#endif
};
TestKeeperTCPHandler::TestKeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_)
NuKeeperTCPHandler::NuKeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, log(&Poco::Logger::get("TestKeeperTCPHandler"))
, log(&Poco::Logger::get("NuKeeperTCPHandler"))
, global_context(server.context())
, test_keeper_storage_dispatcher(global_context.getTestKeeperStorageDispatcher())
, operation_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, session_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000)
, nu_keeper_storage_dispatcher(global_context.getNuKeeperStorageDispatcher())
, operation_timeout(0, global_context.getConfigRef().getUInt("nu_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, session_timeout(0, global_context.getConfigRef().getUInt("nu_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000)
, poll_wrapper(std::make_unique<SocketInterruptablePollWrapper>(socket_))
, responses(std::make_unique<ThreadSafeResponseQueue>())
{
}
void TestKeeperTCPHandler::sendHandshake(bool has_leader)
void NuKeeperTCPHandler::sendHandshake(bool has_leader)
{
Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *out);
if (has_leader)
@ -252,12 +252,12 @@ void TestKeeperTCPHandler::sendHandshake(bool has_leader)
out->next();
}
void TestKeeperTCPHandler::run()
void NuKeeperTCPHandler::run()
{
runImpl();
}
void TestKeeperTCPHandler::receiveHandshake()
void NuKeeperTCPHandler::receiveHandshake()
{
int32_t handshake_length;
int32_t protocol_version;
@ -294,7 +294,7 @@ void TestKeeperTCPHandler::receiveHandshake()
}
void TestKeeperTCPHandler::runImpl()
void NuKeeperTCPHandler::runImpl()
{
setThreadName("TstKprHandler");
ThreadStatus thread_status;
@ -324,11 +324,11 @@ void TestKeeperTCPHandler::runImpl()
return;
}
if (test_keeper_storage_dispatcher->hasLeader())
if (nu_keeper_storage_dispatcher->hasLeader())
{
try
{
session_id = test_keeper_storage_dispatcher->getSessionID();
session_id = nu_keeper_storage_dispatcher->getSessionID();
}
catch (const Exception & e)
{
@ -354,7 +354,7 @@ void TestKeeperTCPHandler::runImpl()
UInt8 single_byte = 1;
[[maybe_unused]] int result = write(response_fd, &single_byte, sizeof(single_byte));
};
test_keeper_storage_dispatcher->registerSession(session_id, response_callback);
nu_keeper_storage_dispatcher->registerSession(session_id, response_callback);
session_stopwatch.start();
bool close_received = false;
@ -428,18 +428,18 @@ void TestKeeperTCPHandler::runImpl()
}
}
void TestKeeperTCPHandler::finish()
void NuKeeperTCPHandler::finish()
{
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = close_xid;
/// Put close request (so storage will remove all info about session)
test_keeper_storage_dispatcher->putRequest(request, session_id);
nu_keeper_storage_dispatcher->putRequest(request, session_id);
/// We don't need any callbacks because session can be already dead and
/// nobody wait for response
test_keeper_storage_dispatcher->finishSession(session_id);
nu_keeper_storage_dispatcher->finishSession(session_id);
}
std::pair<Coordination::OpNum, Coordination::XID> TestKeeperTCPHandler::receiveRequest()
std::pair<Coordination::OpNum, Coordination::XID> NuKeeperTCPHandler::receiveRequest()
{
int32_t length;
Coordination::read(length, *in);
@ -453,7 +453,7 @@ std::pair<Coordination::OpNum, Coordination::XID> TestKeeperTCPHandler::receiveR
request->xid = xid;
request->readImpl(*in);
if (!test_keeper_storage_dispatcher->putRequest(request, session_id))
if (!nu_keeper_storage_dispatcher->putRequest(request, session_id))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id);
return std::make_pair(opnum, xid);
}

View File

@ -13,7 +13,7 @@
#include <Interpreters/Context.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Coordination/TestKeeperStorageDispatcher.h>
#include <Coordination/NuKeeperStorageDispatcher.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <unordered_map>
@ -26,16 +26,16 @@ using SocketInterruptablePollWrapperPtr = std::unique_ptr<SocketInterruptablePol
class ThreadSafeResponseQueue;
using ThreadSafeResponseQueuePtr = std::unique_ptr<ThreadSafeResponseQueue>;
class TestKeeperTCPHandler : public Poco::Net::TCPServerConnection
class NuKeeperTCPHandler : public Poco::Net::TCPServerConnection
{
public:
TestKeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_);
NuKeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_);
void run() override;
private:
IServer & server;
Poco::Logger * log;
Context global_context;
std::shared_ptr<TestKeeperStorageDispatcher> test_keeper_storage_dispatcher;
std::shared_ptr<NuKeeperStorageDispatcher> nu_keeper_storage_dispatcher;
Poco::Timespan operation_timeout;
Poco::Timespan session_timeout;
int64_t session_id;

View File

@ -1,6 +1,6 @@
#pragma once
#include <Server/TestKeeperTCPHandler.h>
#include <Server/NuKeeperTCPHandler.h>
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>
@ -9,7 +9,7 @@
namespace DB
{
class TestKeeperTCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory
class NuKeeperTCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory
{
private:
IServer & server;
@ -21,9 +21,9 @@ private:
void run() override {}
};
public:
TestKeeperTCPHandlerFactory(IServer & server_)
NuKeeperTCPHandlerFactory(IServer & server_)
: server(server_)
, log(&Poco::Logger::get("TestKeeperTCPHandlerFactory"))
, log(&Poco::Logger::get("NuKeeperTCPHandlerFactory"))
{
}
@ -31,8 +31,8 @@ public:
{
try
{
LOG_TRACE(log, "Test keeper request. Address: {}", socket.peerAddress().toString());
return new TestKeeperTCPHandler(server, socket);
LOG_TRACE(log, "NuKeeper request. Address: {}", socket.peerAddress().toString());
return new NuKeeperTCPHandler(server, socket);
}
catch (const Poco::Net::NetException &)
{

View File

@ -17,6 +17,7 @@ SRCS(
MySQLHandler.cpp
MySQLHandlerFactory.cpp
NotFoundHandler.cpp
NuKeeperTCPHandler.cpp
PostgreSQLHandler.cpp
PostgreSQLHandlerFactory.cpp
PrometheusMetricsWriter.cpp
@ -25,7 +26,6 @@ SRCS(
ReplicasStatusHandler.cpp
StaticRequestHandler.cpp
TCPHandler.cpp
TestKeeperTCPHandler.cpp
WebUIRequestHandler.cpp
)

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<nu_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
@ -11,5 +11,5 @@
<port>44444</port>
</server>
</raft_configuration>
</test_keeper_server>
</nu_keeper_server>
</yandex>

View File

@ -29,7 +29,7 @@ ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/test_keeper_port.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/nu_keeper_port.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/tcp_with_proxy.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/top_level_domains_lists.xml $DEST_SERVER_PATH/config.d/

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<nu_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
@ -11,5 +11,5 @@
<port>44444</port>
</server>
</raft_configuration>
</test_keeper_server>
</nu_keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<nu_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
@ -27,5 +27,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</nu_keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<nu_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
@ -27,5 +27,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</nu_keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<nu_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
@ -27,5 +27,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</nu_keeper_server>
</yandex>