More optimal

This commit is contained in:
alesapin 2021-04-16 16:50:09 +03:00
parent 28577127a7
commit f168bfae6d
15 changed files with 363 additions and 145 deletions

View File

@ -455,6 +455,35 @@ ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return std::m
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return std::make_shared<ZooKeeperMultiResponse>(requests); }
ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::make_shared<ZooKeeperCloseResponse>(); }
void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(internal_id, out);
Coordination::write(session_timeout_ms, out);
}
void ZooKeeperSessionIDRequest::readImpl(ReadBuffer & in)
{
Coordination::read(internal_id, in);
Coordination::read(session_timeout_ms, in);
}
Coordination::ZooKeeperResponsePtr ZooKeeperSessionIDRequest::makeResponse() const
{
return std::make_shared<ZooKeeperSessionIDResponse>();
}
void ZooKeeperSessionIDResponse::readImpl(ReadBuffer & in)
{
Coordination::read(internal_id, in);
Coordination::read(session_id, in);
}
void ZooKeeperSessionIDResponse::writeImpl(WriteBuffer & out) const
{
Coordination::write(internal_id, out);
Coordination::write(session_id, out);
}
void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator)
{
if (!op_num_to_request.try_emplace(op_num, creator).second)
@ -511,6 +540,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::List, ZooKeeperListRequest>(*this);
registerZooKeeperRequest<OpNum::Check, ZooKeeperCheckRequest>(*this);
registerZooKeeperRequest<OpNum::Multi, ZooKeeperMultiRequest>(*this);
registerZooKeeperRequest<OpNum::SessionID, ZooKeeperSessionIDRequest>(*this);
}
}

View File

@ -390,6 +390,36 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
size_t bytesSize() const override { return MultiResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
/// Fake internal coordination (keeper) response. Never received from client
/// and never send to client.
struct ZooKeeperSessionIDRequest final : ZooKeeperRequest
{
int64_t internal_id;
int64_t session_timeout_ms;
Coordination::OpNum getOpNum() const override { return OpNum::SessionID; }
String getPath() const override { return {}; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
Coordination::ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
};
/// Fake internal coordination (keeper) response. Never received from client
/// and never send to client.
struct ZooKeeperSessionIDResponse final : ZooKeeperResponse
{
int64_t internal_id;
int64_t session_id;
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
Coordination::OpNum getOpNum() const override { return OpNum::SessionID; }
};
class ZooKeeperRequestFactory final : private boost::noncopyable
{

View File

@ -21,6 +21,7 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::Check),
static_cast<int32_t>(OpNum::Multi),
static_cast<int32_t>(OpNum::Auth),
static_cast<int32_t>(OpNum::SessionID),
};
std::string toString(OpNum op_num)
@ -55,6 +56,8 @@ std::string toString(OpNum op_num)
return "Heartbeat";
case OpNum::Auth:
return "Auth";
case OpNum::SessionID:
return "SessionID";
}
int32_t raw_op = static_cast<int32_t>(op_num);
throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED);

View File

@ -30,6 +30,7 @@ enum class OpNum : int32_t
Check = 13,
Multi = 14,
Auth = 100,
SessionID = 997, /// Special internal request
};
std::string toString(OpNum op_num);

View File

@ -80,7 +80,7 @@ public:
{}
off_t appendRecord(ChangelogRecord && record, bool sync)
off_t appendRecord(ChangelogRecord && record)
{
off_t result = plain_buf.count();
writeIntBinary(computeRecordChecksum(record), plain_buf);
@ -96,22 +96,20 @@ public:
entries_written++;
if (sync)
plain_buf.sync();
else
plain_buf.next();
return result;
}
void truncateToLength(off_t new_length)
{
flush();
plain_buf.next();
plain_buf.truncate(new_length);
plain_buf.seek(new_length, SEEK_SET);
}
void flush()
void flush(bool force_fsync)
{
plain_buf.next();
if (force_fsync)
plain_buf.sync();
}
@ -247,9 +245,14 @@ private:
ReadBufferFromFile read_buf;
};
Changelog::Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, Poco::Logger * log_)
Changelog::Changelog(
const std::string & changelogs_dir_,
uint64_t rotate_interval_,
bool force_sync_,
Poco::Logger * log_)
: changelogs_dir(changelogs_dir_)
, rotate_interval(rotate_interval_)
, force_sync(force_sync_)
, log(log_)
{
namespace fs = std::filesystem;
@ -357,6 +360,9 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
void Changelog::rotate(uint64_t new_start_log_index)
{
/// Flush previous log
flush();
ChangelogFileDescription new_description;
new_description.prefix = DEFAULT_PREFIX;
new_description.from_log_index = new_start_log_index;
@ -387,7 +393,7 @@ ChangelogRecord Changelog::buildRecord(uint64_t index, const LogEntryPtr & log_e
return record;
}
void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool force_sync)
void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry)
{
if (!current_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before appending records");
@ -398,14 +404,14 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool
if (current_writer->getEntriesWritten() == rotate_interval)
rotate(index);
auto offset = current_writer->appendRecord(buildRecord(index, log_entry), force_sync);
auto offset = current_writer->appendRecord(buildRecord(index, log_entry));
if (!index_to_start_pos.try_emplace(index, offset).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Record with index {} already exists", index);
logs[index] = makeClone(log_entry);
}
void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry, bool force_sync)
void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
{
if (index_to_start_pos.count(index) == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write at index {} because changelog doesn't contain it", index);
@ -451,7 +457,7 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry, bool forc
current_writer->setEntriesWritten(entries_written);
appendEntry(index, log_entry, force_sync);
appendEntry(index, log_entry);
}
void Changelog::compact(uint64_t up_to_log_index)
@ -540,7 +546,7 @@ nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index,
return buf_out;
}
void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer, bool force_sync)
void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer)
{
buffer.pos(0);
int num_logs = buffer.get_int();
@ -555,23 +561,23 @@ void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer,
LogEntryPtr log_entry = nuraft::log_entry::deserialize(*buf_local);
if (i == 0 && logs.count(cur_index))
writeAt(cur_index, log_entry, force_sync);
writeAt(cur_index, log_entry);
else
appendEntry(cur_index, log_entry, force_sync);
appendEntry(cur_index, log_entry);
}
}
void Changelog::flush()
{
current_writer->flush();
if (current_writer)
current_writer->flush(force_sync);
}
Changelog::~Changelog()
{
try
{
if (current_writer)
current_writer->flush();
flush();
}
catch (...)
{

View File

@ -63,17 +63,17 @@ class Changelog
{
public:
Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, Poco::Logger * log_);
Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, bool force_sync_, Poco::Logger * log_);
/// Read changelog from files on changelogs_dir_ skipping all entries before from_log_index
/// Truncate broken entries, remove files after broken entries.
void readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep);
/// Add entry to log with index. Call fsync if force_sync true.
void appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool force_sync);
/// Add entry to log with index.
void appendEntry(uint64_t index, const LogEntryPtr & log_entry);
/// Write entry at index and truncate all subsequent entries.
void writeAt(uint64_t index, const LogEntryPtr & log_entry, bool force_sync);
void writeAt(uint64_t index, const LogEntryPtr & log_entry);
/// Remove log files with to_log_index <= up_to_log_index.
void compact(uint64_t up_to_log_index);
@ -101,9 +101,9 @@ public:
BufferPtr serializeEntriesToBuffer(uint64_t index, int32_t count);
/// Apply entries from buffer overriding existing entries
void applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer, bool force_sync);
void applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer);
/// Fsync log to disk
/// Fsync latest log to disk and flush buffer
void flush();
uint64_t size() const
@ -124,6 +124,7 @@ private:
private:
const std::string changelogs_dir;
const uint64_t rotate_interval;
const bool force_sync;
Poco::Logger * log;
std::map<uint64_t, ChangelogFileDescription> existing_changelogs;

View File

@ -32,6 +32,7 @@ struct Settings;
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \
M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \
M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \
M(UInt64, max_requests_batch_size, 100, "Max size of batch in requests count before it will be sent to RAFT", 0) \
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0)

View File

@ -5,9 +5,12 @@ namespace DB
KeeperLogStore::KeeperLogStore(const std::string & changelogs_path, uint64_t rotate_interval_, bool force_sync_)
: log(&Poco::Logger::get("KeeperLogStore"))
, changelog(changelogs_path, rotate_interval_, log)
, force_sync(force_sync_)
, changelog(changelogs_path, rotate_interval_, force_sync_, log)
{
if (force_sync_)
LOG_INFO(log, "force_sync enabled");
else
LOG_INFO(log, "force_sync disabled");
}
uint64_t KeeperLogStore::start_index() const
@ -38,7 +41,7 @@ uint64_t KeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
uint64_t idx = changelog.getNextEntryIndex();
changelog.appendEntry(idx, entry, force_sync);
changelog.appendEntry(idx, entry);
return idx;
}
@ -46,7 +49,7 @@ uint64_t KeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
void KeeperLogStore::write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
changelog.writeAt(index, entry, force_sync);
changelog.writeAt(index, entry);
}
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> KeeperLogStore::log_entries(uint64_t start, uint64_t end)
@ -93,7 +96,7 @@ bool KeeperLogStore::flush()
void KeeperLogStore::apply_pack(uint64_t index, nuraft::buffer & pack)
{
std::lock_guard lock(changelog_lock);
changelog.applyEntriesFromBuffer(index, pack, force_sync);
changelog.applyEntriesFromBuffer(index, pack);
}
uint64_t KeeperLogStore::size() const
@ -102,4 +105,10 @@ uint64_t KeeperLogStore::size() const
return changelog.size();
}
void KeeperLogStore::end_of_append_batch(uint64_t /*start_index*/, uint64_t /*count*/)
{
std::lock_guard lock(changelog_lock);
changelog.flush();
}
}

View File

@ -42,11 +42,12 @@ public:
uint64_t size() const;
void end_of_append_batch(uint64_t start_index, uint64_t count) override;
private:
mutable std::mutex changelog_lock;
Poco::Logger * log;
Changelog changelog;
bool force_sync;
};
}

View File

@ -73,7 +73,6 @@ KeeperServer::KeeperServer(
config.getString("keeper_server.snapshot_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/snapshots"),
coordination_settings))
, state_manager(nuraft::cs_new<KeeperStateManager>(server_id, "keeper_server", config, coordination_settings))
, responses_queue(responses_queue_)
, log(&Poco::Logger::get("KeeperServer"))
{
if (coordination_settings->quorum_reads)
@ -111,7 +110,7 @@ void KeeperServer::startup()
params.auto_forwarding_ = coordination_settings->auto_forwarding;
params.auto_forwarding_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds() * 2;
params.return_method_ = nuraft::raft_params::blocking;
params.return_method_ = nuraft::raft_params::async_handler;
nuraft::asio_service::options asio_opts{};
if (state_manager->isSecure())
@ -222,75 +221,23 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coord
}
void KeeperServer::putRequest(const KeeperStorage::RequestForSession & request_for_session)
{
auto [session_id, request] = request_for_session;
if (!coordination_settings->quorum_reads && isLeaderAlive() && request->isReadRequest())
void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{
state_machine->processReadRequest(request_for_session);
}
else
RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions)
{
std::vector<nuraft::ptr<nuraft::buffer>> entries;
for (const auto & [session_id, request] : requests_for_sessions)
entries.push_back(getZooKeeperLogEntry(session_id, request));
{
std::lock_guard lock(append_entries_mutex);
auto result = raft_instance->append_entries(entries);
if (!result->get_accepted())
{
KeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0;
response->error = Coordination::Error::ZOPERATIONTIMEOUT;
responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response});
return raft_instance->append_entries(entries);
}
if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
{
KeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0;
response->error = Coordination::Error::ZOPERATIONTIMEOUT;
responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response});
}
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
throw Exception(ErrorCodes::RAFT_ERROR, "Requests result failed with code {} and message: '{}'", result->get_result_code(), result->get_result_str());
}
}
int64_t KeeperServer::getSessionID(int64_t session_timeout_ms)
{
/// Just some sanity check. We don't want to make a lot of clients wait with lock.
if (active_session_id_requests > 10)
throw Exception(ErrorCodes::RAFT_ERROR, "Too many concurrent SessionID requests already in flight");
++active_session_id_requests;
SCOPE_EXIT({ --active_session_id_requests; });
auto entry = nuraft::buffer::alloc(sizeof(int64_t));
/// Just special session request
nuraft::buffer_serializer bs(entry);
bs.put_i64(session_timeout_ms);
std::lock_guard lock(append_entries_mutex);
auto result = raft_instance->append_entries({entry});
if (!result->get_accepted())
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot send session_id request to RAFT");
if (result->get_result_code() != nuraft::cmd_result_code::OK)
throw Exception(ErrorCodes::RAFT_ERROR, "session_id request failed to RAFT");
auto resp = result->get();
if (resp == nullptr)
throw Exception(ErrorCodes::RAFT_ERROR, "Received nullptr as session_id");
nuraft::buffer_serializer bs_resp(resp);
return bs_resp.get_i64();
}
bool KeeperServer::isLeader() const

View File

@ -12,6 +12,8 @@
namespace DB
{
using RaftAppendResult = nuraft::ptr<nuraft::cmd_result<nuraft::ptr<nuraft::buffer>>>;
class KeeperServer
{
private:
@ -29,13 +31,10 @@ private:
std::mutex append_entries_mutex;
ResponsesQueue & responses_queue;
std::mutex initialized_mutex;
std::atomic<bool> initialized_flag = false;
std::condition_variable initialized_cv;
std::atomic<bool> initial_batch_committed = false;
std::atomic<size_t> active_session_id_requests = 0;
Poco::Logger * log;
@ -60,9 +59,9 @@ public:
void startup();
void putRequest(const KeeperStorage::RequestForSession & request);
void putLocalReadRequest(const KeeperStorage::RequestForSession & request);
int64_t getSessionID(int64_t session_timeout_ms);
RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests);
std::unordered_set<int64_t> getDeadSessions();

View File

@ -90,25 +90,28 @@ void KeeperStateMachine::init()
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
{
if (data.size() == sizeof(int64_t))
auto request_for_session = parseRequest(data);
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
{
nuraft::buffer_serializer timeout_data(data);
int64_t session_timeout_ms = timeout_data.get_i64();
auto response = nuraft::buffer::alloc(sizeof(int64_t));
const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast<const Coordination::ZooKeeperSessionIDRequest &>(*request_for_session.request);
int64_t session_id;
nuraft::buffer_serializer bs(response);
{
std::lock_guard lock(storage_lock);
session_id = storage->getSessionID(session_timeout_ms);
bs.put_i64(session_id);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
}
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_timeout_ms);
last_committed_idx = log_idx;
return response;
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
std::shared_ptr<Coordination::ZooKeeperSessionIDResponse> response = std::make_shared<Coordination::ZooKeeperSessionIDResponse>();
response->internal_id = session_id_request.internal_id;
response->session_id = session_id;
KeeperStorage::ResponseForSession response_for_session;
response_for_session.session_id = -1;
response_for_session.response = response;
responses_queue.push(response_for_session);
}
else
{
auto request_for_session = parseRequest(data);
KeeperStorage::ResponsesForSessions responses_for_sessions;
{
std::lock_guard lock(storage_lock);
@ -116,11 +119,12 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
for (auto & response_for_session : responses_for_sessions)
responses_queue.push(response_for_session);
}
}
last_committed_idx = log_idx;
return nullptr;
}
}
bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{

View File

@ -8,7 +8,6 @@
#include <Coordination/SnapshotableHashTable.h>
#include <unordered_map>
#include <unordered_set>
#include <set>
#include <vector>
namespace DB
@ -18,7 +17,7 @@ using namespace DB;
struct KeeperStorageRequest;
using KeeperStorageRequestPtr = std::shared_ptr<KeeperStorageRequest>;
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
using ChildrenSet = std::set<std::string>;
using ChildrenSet = std::unordered_set<std::string>;
using SessionAndTimeout = std::unordered_map<int64_t, int64_t>;
struct KeeperStorageSnapshot;

View File

@ -1,5 +1,9 @@
#include <Coordination/KeeperStorageDispatcher.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <future>
#include <chrono>
namespace DB
{
@ -17,23 +21,101 @@ KeeperStorageDispatcher::KeeperStorageDispatcher()
{
}
void KeeperStorageDispatcher::requestThread()
{
setThreadName("KeeperReqT");
/// Result of requests batch from previous iteration
RaftAppendResult prev_result = nullptr;
/// Requests from previous iteration. We store them to be able
/// to send errors to the client.
KeeperStorage::RequestsForSessions prev_batch;
while (!shutdown_called)
{
KeeperStorage::RequestForSession request;
UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds());
uint64_t max_batch_size = coordination_settings->max_requests_batch_size;
if (requests_queue.tryPop(request, max_wait))
/// The code bellow do a very simple thing: batch all write (quorum) requests into vector until
/// previous write batch is not finished or max_batch size achieved. The main complexity goes from
/// the ability to process read requests without quorum (from local state). So when we are collecting
/// requests into a batch we must check that the new request is not read request. Otherwise we have to
/// process all already accumulated write requests, wait them synchronously and only after that process
/// read request. So reads are some kind of "separator" for writes.
try
{
if (requests_queue->tryPop(request, max_wait))
{
if (shutdown_called)
break;
try
KeeperStorage::RequestsForSessions current_batch;
bool has_read_request = false;
/// If new request is not read request or we must to process it through quorum.
/// Otherwise we will process it locally.
if (coordination_settings->quorum_reads || !request.request->isReadRequest() || !server->isLeaderAlive())
{
server->putRequest(request);
current_batch.emplace_back(request);
/// Waiting until previous append will be successful, or batch is big enough
while (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK) && current_batch.size() <= max_batch_size)
{
/// Trying to get batch requests as fast as possible
if (requests_queue->tryPop(request, 1))
{
/// Don't append read request into batch, we have to process them separately
if (!coordination_settings->quorum_reads && request.request->isReadRequest() && server->isLeaderAlive())
{
has_read_request = true;
break;
}
current_batch.emplace_back(request);
}
if (shutdown_called)
break;
}
}
else
has_read_request = true;
if (shutdown_called)
break;
/// Forcefully process all previous pending requests
if (prev_result)
forceWaitAndProcessResult(std::move(prev_result), std::move(prev_batch));
/// Process collected write requests batch
if (!current_batch.empty())
{
prev_result = server->putRequestBatch(current_batch);
if (prev_result)
{
if (has_read_request) /// If we will execute read request next, than we have to process result now
forceWaitAndProcessResult(std::move(prev_result), std::move(current_batch));
}
else
{
addErrorResponses(std::move(current_batch), Coordination::Error::ZRUNTIMEINCONSISTENCY);
}
prev_batch = current_batch;
}
/// Read request always goes after write batch (last request)
/// We don't check leader aliveness, because it was alive when this request happen
/// so our state is consistent.
if (has_read_request)
server->putLocalReadRequest(request);
}
}
catch (...)
{
@ -41,7 +123,6 @@ void KeeperStorageDispatcher::requestThread()
}
}
}
}
void KeeperStorageDispatcher::responseThread()
{
@ -94,6 +175,19 @@ void KeeperStorageDispatcher::snapshotThread()
void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(session_to_response_callback_mutex);
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::SessionID)
{
const Coordination::ZooKeeperSessionIDResponse & session_id_resp = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
/// Nobody waits for this session id
if (!new_session_id_response_callback.count(session_id_resp.internal_id))
return;
auto callback = new_session_id_response_callback[session_id_resp.internal_id];
callback(response);
new_session_id_response_callback.erase(session_id_resp.internal_id);
}
else
{
auto session_writer = session_to_response_callback.find(session_id);
if (session_writer == session_to_response_callback.end())
return;
@ -101,8 +195,11 @@ void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination
session_writer->second(response);
/// Session closed, no more writes
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
{
session_to_response_callback.erase(session_writer);
}
}
}
bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
@ -119,8 +216,8 @@ bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr
std::lock_guard lock(push_request_mutex);
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
requests_queue.push(std::move(request_info));
else if (!requests_queue.tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds()))
requests_queue->push(std::move(request_info));
else if (!requests_queue->tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
@ -131,6 +228,7 @@ void KeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration
int myid = config.getInt("keeper_server.server_id");
coordination_settings->loadFromConfig("keeper_server.coordination_settings", config);
requests_queue = std::make_unique<RequestsQueue>(coordination_settings->max_requests_batch_size);
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
@ -175,7 +273,7 @@ void KeeperStorageDispatcher::shutdown()
session_cleaner_thread.join();
/// FIXME not the best way to notify
requests_queue.push({});
requests_queue->push({});
if (request_thread.joinable())
request_thread.join();
@ -192,7 +290,7 @@ void KeeperStorageDispatcher::shutdown()
server->shutdown();
KeeperStorage::RequestForSession request_for_session;
while (requests_queue.tryPop(request_for_session))
while (requests_queue->tryPop(request_for_session))
{
if (request_for_session.request)
{
@ -249,7 +347,7 @@ void KeeperStorageDispatcher::sessionCleanerTask()
request_info.session_id = dead_session;
{
std::lock_guard lock(push_request_mutex);
requests_queue.push(std::move(request_info));
requests_queue->push(std::move(request_info));
}
finishSession(dead_session);
LOG_INFO(log, "Dead session close request pushed");
@ -273,4 +371,70 @@ void KeeperStorageDispatcher::finishSession(int64_t session_id)
session_to_response_callback.erase(session_it);
}
void KeeperStorageDispatcher::addErrorResponses(KeeperStorage::RequestsForSessions && requests_for_sessions, Coordination::Error error)
{
for (const auto & [session_id, request] : requests_for_sessions)
{
KeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0;
response->error = error;
responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response});
}
requests_for_sessions.clear();
}
void KeeperStorageDispatcher::forceWaitAndProcessResult(RaftAppendResult && result, KeeperStorage::RequestsForSessions && requests_for_sessions)
{
if (!result->has_result())
result->get();
/// If we get some errors, than send them to clients
if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
addErrorResponses(std::move(requests_for_sessions), Coordination::Error::ZOPERATIONTIMEOUT);
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
addErrorResponses(std::move(requests_for_sessions), Coordination::Error::ZRUNTIMEINCONSISTENCY);
result = nullptr;
}
int64_t KeeperStorageDispatcher::getSessionID(long session_timeout_ms)
{
KeeperStorage::RequestForSession request_info;
std::shared_ptr<Coordination::ZooKeeperSessionIDRequest> request = std::make_shared<Coordination::ZooKeeperSessionIDRequest>();
request->internal_id = internal_session_id_counter.fetch_add(1);
request->session_timeout_ms = session_timeout_ms;
request_info.request = request;
request_info.session_id = -1;
auto promise = std::make_shared<std::promise<int64_t>>();
auto future = promise->get_future();
{
std::lock_guard lock(session_to_response_callback_mutex);
new_session_id_response_callback[request->internal_id] = [promise] (const Coordination::ZooKeeperResponsePtr & response)
{
if (response->getOpNum() != Coordination::OpNum::SessionID)
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response of type {} instead of SessionID response", Coordination::toString(response->getOpNum()))));
if (response->error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException("SessionID request failed with error", response->error)));
promise->set_value(dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response).session_id);
};
}
{
std::lock_guard lock(push_request_mutex);
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
throw Exception("Cannot receive session id within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
return future.get();
}
}

View File

@ -32,24 +32,42 @@ private:
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
RequestsQueue requests_queue{1};
/// Size depends on coordination settings
std::unique_ptr<RequestsQueue> requests_queue;
ResponsesQueue responses_queue;
SnapshotsQueue snapshots_queue{1};
std::atomic<bool> shutdown_called{false};
std::mutex session_to_response_callback_mutex;
/// These two maps looks similar, but serves different purposes.
/// The first map is subscription map for normal responses like
/// (get, set, list, etc.). Dispatcher determines callback for each response
/// using session id from this map.
SessionToResponseCallback session_to_response_callback;
/// But when client connects to the server for the first time it doesn't
/// have session_id. It request it from server. We give temporary
/// internal id for such requests just to much client with its response.
SessionToResponseCallback new_session_id_response_callback;
/// Reading and batching new requests from client handlers
ThreadFromGlobalPool request_thread;
/// Pushing responses to clients client handlers
/// using session_id.
ThreadFromGlobalPool responses_thread;
/// Cleaning old dead sessions
ThreadFromGlobalPool session_cleaner_thread;
/// Dumping new snapshots to disk
ThreadFromGlobalPool snapshot_thread;
/// RAFT wrapper. Most important class.
std::unique_ptr<KeeperServer> server;
Poco::Logger * log;
/// Counter for new session_id requests.
std::atomic<int64_t> internal_session_id_counter{0};
private:
void requestThread();
void responseThread();
@ -57,6 +75,14 @@ private:
void snapshotThread();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
/// Add error responses for requests to responses queue.
/// Clears requests.
void addErrorResponses(KeeperStorage::RequestsForSessions && requests_for_sessions, Coordination::Error error);
/// Forcefully wait for result and sets errors if something when wrong.
/// Clears both arguments
void forceWaitAndProcessResult(RaftAppendResult && result, KeeperStorage::RequestsForSessions && requests_for_sessions);
public:
KeeperStorageDispatcher();
@ -78,10 +104,7 @@ public:
return server->isLeaderAlive();
}
int64_t getSessionID(long session_timeout_ms)
{
return server->getSessionID(session_timeout_ms);
}
int64_t getSessionID(long session_timeout_ms);
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);
/// Call if we don't need any responses for this session no more (session was expired)