Add background session lifetime control

This commit is contained in:
alesapin 2021-02-03 23:32:15 +03:00
parent 0c3ef018bb
commit 1ff87ac6f9
17 changed files with 261 additions and 52 deletions

View File

@ -72,7 +72,7 @@ struct ZooKeeperHeartbeatRequest final : ZooKeeperRequest
void writeImpl(WriteBuffer &) const override {}
void readImpl(ReadBuffer &) override {}
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
bool isReadRequest() const override { return false; }
};
struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse

View File

@ -24,7 +24,7 @@ NuKeeperServer::NuKeeperServer(int server_id_, const std::string & hostname_, in
, hostname(hostname_)
, port(port_)
, endpoint(hostname + ":" + std::to_string(port))
, state_machine(nuraft::cs_new<NuKeeperStateMachine>())
, state_machine(nuraft::cs_new<NuKeeperStateMachine>(500 /* FIXME */))
, state_manager(nuraft::cs_new<InMemoryStateManager>(server_id, endpoint))
{
}
@ -214,12 +214,12 @@ NuKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const NuKeeper
}
}
int64_t NuKeeperServer::getSessionID()
int64_t NuKeeperServer::getSessionID(long session_timeout_ms)
{
auto entry = nuraft::buffer::alloc(sizeof(int64_t));
auto entry = nuraft::buffer::alloc(sizeof(long));
/// Just special session request
nuraft::buffer_serializer bs(entry);
bs.put_i64(0);
bs.put_i64(session_timeout_ms);
std::lock_guard lock(append_entries_mutex);
@ -275,4 +275,9 @@ void NuKeeperServer::waitForCatchUp() const
}
}
std::unordered_set<int64_t> NuKeeperServer::getDeadSessions()
{
return state_machine->getDeadSessions();
}
}

View File

@ -46,7 +46,9 @@ public:
NuKeeperStorage::ResponsesForSessions putRequests(const NuKeeperStorage::RequestsForSessions & requests);
int64_t getSessionID();
int64_t getSessionID(long session_timeout_ms);
std::unordered_set<int64_t> getDeadSessions();
void addServer(int server_id_, const std::string & server_uri, bool can_become_leader_, int32_t priority);

View File

@ -43,8 +43,9 @@ nuraft::ptr<nuraft::buffer> writeResponses(NuKeeperStorage::ResponsesForSessions
}
NuKeeperStateMachine::NuKeeperStateMachine()
: last_committed_idx(0)
NuKeeperStateMachine::NuKeeperStateMachine(long tick_time)
: storage(tick_time)
, last_committed_idx(0)
, log(&Poco::Logger::get("NuRaftStateMachine"))
{
LOG_DEBUG(log, "Created nukeeper state machine");
@ -52,15 +53,19 @@ NuKeeperStateMachine::NuKeeperStateMachine()
nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
{
if (data.size() == sizeof(size_t))
if (data.size() == sizeof(long))
{
LOG_DEBUG(log, "Session ID response {}", log_idx);
nuraft::buffer_serializer timeout_data(data);
long session_timeout_ms = timeout_data.get_i64();
auto response = nuraft::buffer::alloc(sizeof(size_t));
int64_t session_id;
nuraft::buffer_serializer bs(response);
{
std::lock_guard lock(storage_lock);
bs.put_i64(storage.getSessionID());
session_id = storage.getSessionID(session_timeout_ms);
bs.put_i64(session_id);
}
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_timeout_ms);
last_committed_idx = log_idx;
return response;
}
@ -121,7 +126,7 @@ NuKeeperStateMachine::StorageSnapshotPtr NuKeeperStateMachine::readSnapshot(nura
NuKeeperStorageSerializer serializer;
ReadBufferFromNuraftBuffer reader(in);
NuKeeperStorage new_storage;
NuKeeperStorage new_storage(500 /*FIXME*/);
serializer.deserialize(new_storage, reader);
return std::make_shared<StorageSnapshot>(ss, new_storage);
}
@ -229,4 +234,10 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStateMachine::processReadRequest(c
return storage.processRequest(request_for_session.request, request_for_session.session_id);
}
std::unordered_set<int64_t> NuKeeperStateMachine::getDeadSessions()
{
std::lock_guard lock(storage_lock);
return storage.getDeadSessions();
}
}

View File

@ -10,7 +10,7 @@ namespace DB
class NuKeeperStateMachine : public nuraft::state_machine
{
public:
NuKeeperStateMachine();
NuKeeperStateMachine(long tick_time);
nuraft::ptr<nuraft::buffer> pre_commit(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
@ -49,6 +49,8 @@ public:
NuKeeperStorage::ResponsesForSessions processReadRequest(const NuKeeperStorage::RequestForSession & request_for_session);
std::unordered_set<int64_t> getDeadSessions();
private:
struct StorageSnapshot
{

View File

@ -67,7 +67,8 @@ static NuKeeperStorage::ResponsesForSessions processWatchesImpl(const String & p
return result;
}
NuKeeperStorage::NuKeeperStorage()
NuKeeperStorage::NuKeeperStorage(long tick_time_ms)
: session_expiry_queue(tick_time_ms)
{
container.emplace("/", Node());
}
@ -638,6 +639,18 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor
auto response = std::make_shared<Coordination::ZooKeeperCloseResponse>();
response->xid = zk_request->xid;
response->zxid = getZXID();
session_expiry_queue.remove(session_id);
session_and_timeout.erase(session_id);
results.push_back(ResponseForSession{session_id, response});
}
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat)
{
session_expiry_queue.update(session_id, session_and_timeout[session_id]);
NuKeeperStorageRequestPtr storage_request = NuKeeperWrapperFactory::instance().get(zk_request);
auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id);
response->xid = zk_request->xid;
response->zxid = getZXID();
results.push_back(ResponseForSession{session_id, response});
}
else

View File

@ -4,6 +4,7 @@
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Coordination/SessionExpiryQueue.h>
#include <unordered_map>
#include <unordered_set>
#include <vector>
@ -50,6 +51,7 @@ public:
using Container = std::map<std::string, Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<String>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<String>>;
using SessionAndTimeout = std::unordered_map<int64_t, long>;
using SessionIDs = std::vector<int64_t>;
using Watches = std::map<String /* path, relative of root_path */, SessionIDs>;
@ -57,6 +59,8 @@ public:
Container container;
Ephemerals ephemerals;
SessionAndWatcher sessions_and_watchers;
SessionExpiryQueue session_expiry_queue;
SessionAndTimeout session_and_timeout;
int64_t zxid{0};
bool finalized{false};
@ -72,15 +76,23 @@ public:
}
public:
NuKeeperStorage();
NuKeeperStorage(long tick_time_ms);
int64_t getSessionID()
int64_t getSessionID(long session_timeout_ms)
{
return session_id_counter++;
auto result = session_id_counter++;
session_and_timeout.emplace(result, session_timeout_ms);
session_expiry_queue.update(result, session_timeout_ms);
return result;
}
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
ResponsesForSessions finalize(const RequestsForSessions & expired_requests);
std::unordered_set<int64_t> getDeadSessions()
{
return session_expiry_queue.getExpiredSessions();
}
};
}

View File

@ -59,7 +59,6 @@ void NuKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordinati
bool NuKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
{
std::lock_guard lock(session_to_response_callback_mutex);
if (session_to_response_callback.count(session_id) == 0)
@ -171,6 +170,7 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati
}
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
LOG_DEBUG(log, "Dispatcher initialized");
}
@ -188,6 +188,9 @@ void NuKeeperStorageDispatcher::shutdown()
LOG_DEBUG(log, "Shutting down storage dispatcher");
shutdown_called = true;
if (session_cleaner_thread.joinable())
session_cleaner_thread.join();
if (processing_thread.joinable())
processing_thread.join();
}
@ -225,6 +228,43 @@ void NuKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperRes
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
}
void NuKeeperStorageDispatcher::sessionCleanerTask()
{
while (true)
{
if (shutdown_called)
return;
try
{
if (isLeader())
{
auto dead_sessions = server->getDeadSessions();
for (int64_t dead_session : dead_sessions)
{
LOG_INFO(log, "Found dead session {}, will try to close it", dead_session);
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
putRequest(request, dead_session);
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_it = session_to_response_callback.find(dead_session);
if (session_it != session_to_response_callback.end())
session_to_response_callback.erase(session_it);
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/*FIXME*/
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
void NuKeeperStorageDispatcher::finishSession(int64_t session_id)
{
std::lock_guard lock(session_to_response_callback_mutex);

View File

@ -27,7 +27,6 @@ class NuKeeperStorageDispatcher
private:
Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000};
std::mutex push_request_mutex;
using RequestsQueue = ConcurrentBoundedQueue<NuKeeperStorage::RequestForSession>;
@ -40,12 +39,15 @@ private:
ThreadFromGlobalPool processing_thread;
ThreadFromGlobalPool session_cleaner_thread;
std::unique_ptr<NuKeeperServer> server;
Poco::Logger * log;
private:
void processingThread();
void sessionCleanerTask();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
public:
@ -69,15 +71,14 @@ public:
return server->isLeaderAlive();
}
int64_t getSessionID()
int64_t getSessionID(long session_timeout_ms)
{
return server->getSessionID();
return server->getSessionID(session_timeout_ms);
}
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);
/// Call if we don't need any responses for this session no more (session was expired)
void finishSession(int64_t session_id);
};
}

View File

@ -0,0 +1,77 @@
#include <Coordination/SessionExpiryQueue.h>
#include <common/logger_useful.h>
namespace DB
{
bool SessionExpiryQueue::remove(int64_t session_id)
{
auto session_it = session_to_timeout.find(session_id);
if (session_it != session_to_timeout.end())
{
auto set_it = expiry_to_sessions.find(session_it->second);
if (set_it != expiry_to_sessions.end())
set_it->second.erase(session_id);
return true;
}
return false;
}
bool SessionExpiryQueue::update(int64_t session_id, long timeout_ms)
{
auto session_it = session_to_timeout.find(session_id);
long now = getNowMilliseconds();
long new_expiry_time = roundToNextInterval(now + timeout_ms);
if (session_it != session_to_timeout.end())
{
if (new_expiry_time == session_it->second)
return false;
auto set_it = expiry_to_sessions.find(new_expiry_time);
if (set_it == expiry_to_sessions.end())
std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set<int64_t>());
set_it->second.insert(session_id);
long prev_expiry_time = session_it->second;
if (prev_expiry_time != new_expiry_time)
{
auto prev_set_it = expiry_to_sessions.find(prev_expiry_time);
if (prev_set_it != expiry_to_sessions.end())
prev_set_it->second.erase(session_id);
}
session_it->second = new_expiry_time;
return true;
}
else
{
session_to_timeout[session_id] = new_expiry_time;
auto set_it = expiry_to_sessions.find(new_expiry_time);
if (set_it == expiry_to_sessions.end())
std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set<int64_t>());
set_it->second.insert(session_id);
return false;
}
}
std::unordered_set<int64_t> SessionExpiryQueue::getExpiredSessions()
{
long now = getNowMilliseconds();
if (now < next_expiration_time)
return {};
auto set_it = expiry_to_sessions.find(next_expiration_time);
long new_expiration_time = next_expiration_time + expiration_interval;
next_expiration_time = new_expiration_time;
if (set_it != expiry_to_sessions.end())
{
auto result = set_it->second;
expiry_to_sessions.erase(set_it);
return result;
}
return {};
}
}

View File

@ -0,0 +1,43 @@
#pragma once
#include <unordered_map>
#include <unordered_set>
#include <chrono>
namespace DB
{
class SessionExpiryQueue
{
private:
std::unordered_map<int64_t, long> session_to_timeout;
std::unordered_map<long, std::unordered_set<int64_t>> expiry_to_sessions;
long expiration_interval;
long next_expiration_time;
static long getNowMilliseconds()
{
using namespace std::chrono;
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}
long roundToNextInterval(long time) const
{
return (time / expiration_interval + 1) * expiration_interval;
}
public:
explicit SessionExpiryQueue(long expiration_interval_)
: expiration_interval(expiration_interval_)
, next_expiration_time(roundToNextInterval(getNowMilliseconds()))
{
}
bool remove(int64_t session_id);
bool update(int64_t session_id, long timeout_ms);
std::unordered_set<int64_t> getExpiredSessions();
};
}

View File

@ -0,0 +1,12 @@
OWNER(g:clickhouse)
LIBRARY()
PEERDIR(
clickhouse/src/Common
)
SRCS(
)
END()

View File

@ -230,8 +230,8 @@ NuKeeperTCPHandler::NuKeeperTCPHandler(IServer & server_, const Poco::Net::Strea
, log(&Poco::Logger::get("NuKeeperTCPHandler"))
, global_context(server.context())
, nu_keeper_storage_dispatcher(global_context.getNuKeeperStorageDispatcher())
, operation_timeout(0, global_context.getConfigRef().getUInt("nu_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, session_timeout(0, global_context.getConfigRef().getUInt("nu_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000)
, operation_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, session_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000)
, poll_wrapper(std::make_unique<SocketInterruptablePollWrapper>(socket_))
, responses(std::make_unique<ThreadSafeResponseQueue>())
{
@ -245,7 +245,7 @@ void NuKeeperTCPHandler::sendHandshake(bool has_leader)
else /// Specially ignore connections if we are not leader, client will throw exception
Coordination::write(42, *out);
Coordination::write(Coordination::DEFAULT_SESSION_TIMEOUT_MS, *out);
Coordination::write(static_cast<int32_t>(session_timeout.totalMilliseconds()), *out);
Coordination::write(session_id, *out);
std::array<char, Coordination::PASSWORD_LENGTH> passwd{};
Coordination::write(passwd, *out);
@ -257,15 +257,14 @@ void NuKeeperTCPHandler::run()
runImpl();
}
void NuKeeperTCPHandler::receiveHandshake()
Poco::Timespan NuKeeperTCPHandler::receiveHandshake()
{
int32_t handshake_length;
int32_t protocol_version;
int64_t last_zxid_seen;
int32_t timeout;
int32_t timeout_ms;
int64_t previous_session_id = 0; /// We don't support session restore. So previous session_id is always zero.
std::array<char, Coordination::PASSWORD_LENGTH> passwd {};
Coordination::read(handshake_length, *in);
if (handshake_length != Coordination::CLIENT_HANDSHAKE_LENGTH && handshake_length != Coordination::CLIENT_HANDSHAKE_LENGTH_WITH_READONLY)
throw Exception("Unexpected handshake length received: " + toString(handshake_length), ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
@ -280,7 +279,7 @@ void NuKeeperTCPHandler::receiveHandshake()
if (last_zxid_seen != 0)
throw Exception("Non zero last_zxid_seen is not supported", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
Coordination::read(timeout, *in);
Coordination::read(timeout_ms, *in);
Coordination::read(previous_session_id, *in);
if (previous_session_id != 0)
@ -291,6 +290,8 @@ void NuKeeperTCPHandler::receiveHandshake()
int8_t readonly;
if (handshake_length == Coordination::CLIENT_HANDSHAKE_LENGTH_WITH_READONLY)
Coordination::read(readonly, *in);
return Poco::Timespan(0, timeout_ms * 1000);
}
@ -316,7 +317,9 @@ void NuKeeperTCPHandler::runImpl()
try
{
receiveHandshake();
auto client_timeout = receiveHandshake();
if (client_timeout != 0)
session_timeout = std::min(client_timeout, session_timeout);
}
catch (const Exception & e) /// Typical for an incorrect username, password, or address.
{
@ -328,7 +331,7 @@ void NuKeeperTCPHandler::runImpl()
{
try
{
session_id = nu_keeper_storage_dispatcher->getSessionID();
session_id = nu_keeper_storage_dispatcher->getSessionID(session_timeout.totalMilliseconds());
}
catch (const Exception & e)
{
@ -416,7 +419,7 @@ void NuKeeperTCPHandler::runImpl()
if (session_stopwatch.elapsedMicroseconds() > static_cast<UInt64>(session_timeout.totalMicroseconds()))
{
LOG_DEBUG(log, "Session #{} expired", session_id);
finish();
nu_keeper_storage_dispatcher->finishSession(session_id);
break;
}
}
@ -424,21 +427,10 @@ void NuKeeperTCPHandler::runImpl()
catch (const Exception & ex)
{
LOG_INFO(log, "Got exception processing session #{}: {}", session_id, getExceptionMessage(ex, true));
finish();
nu_keeper_storage_dispatcher->finishSession(session_id);
}
}
void NuKeeperTCPHandler::finish()
{
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = close_xid;
/// Put close request (so storage will remove all info about session)
nu_keeper_storage_dispatcher->putRequest(request, session_id);
/// We don't need any callbacks because session can be already dead and
/// nobody wait for response
nu_keeper_storage_dispatcher->finishSession(session_id);
}
std::pair<Coordination::OpNum, Coordination::XID> NuKeeperTCPHandler::receiveRequest()
{
int32_t length;

View File

@ -53,10 +53,9 @@ private:
void runImpl();
void sendHandshake(bool has_leader);
void receiveHandshake();
Poco::Timespan receiveHandshake();
std::pair<Coordination::OpNum, Coordination::XID> receiveRequest();
void finish();
};
}

View File

@ -1,8 +1,8 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<server_id>1</server_id>
<raft_configuration>
<server>

View File

@ -1,8 +1,8 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<server_id>2</server_id>
<raft_configuration>
<server>

View File

@ -1,8 +1,8 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<server_id>3</server_id>
<raft_configuration>
<server>