Merge branch 'decompose_test_keeper_storage' into in_memory_raft

This commit is contained in:
alesapin 2021-01-20 15:51:54 +03:00
commit a241793a14
9 changed files with 431 additions and 375 deletions

View File

@ -0,0 +1,131 @@
#include <Common/ZooKeeper/TestKeeperStorageDispatcher.h>
#include <Common/setThreadName.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
}
}
namespace zkutil
{
void TestKeeperStorageDispatcher::processingThread()
{
setThreadName("TestKeeperSProc");
try
{
while (!shutdown)
{
RequestInfo info;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
if (requests_queue.tryPop(info, max_wait))
{
if (shutdown)
break;
auto responses = storage.processRequest(info.request, info.session_id);
for (const auto & response_for_session : responses)
setResponse(response_for_session.session_id, response_for_session.response);
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize();
}
}
void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_writer = session_to_response_callback.find(session_id);
if (session_writer == session_to_response_callback.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown session id {}", session_id);
session_writer->second(response);
/// Session closed, no more writes
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
session_to_response_callback.erase(session_writer);
}
void TestKeeperStorageDispatcher::finalize()
{
{
std::lock_guard lock(push_request_mutex);
if (shutdown)
return;
shutdown = true;
if (processing_thread.joinable())
processing_thread.join();
}
RequestInfo info;
TestKeeperStorage::RequestsForSessions expired_requests;
while (requests_queue.tryPop(info))
expired_requests.push_back(TestKeeperStorage::RequestForSession{info.session_id, info.request});
auto expired_responses = storage.finalize(expired_requests);
for (const auto & response_for_session : expired_responses)
setResponse(response_for_session.session_id, response_for_session.response);
}
void TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
{
std::lock_guard lock(session_to_response_callback_mutex);
if (session_to_response_callback.count(session_id) == 0)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown session id {}", session_id);
}
RequestInfo request_info;
request_info.time = clock::now();
request_info.request = request;
request_info.session_id = session_id;
std::lock_guard lock(push_request_mutex);
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
requests_queue.push(std::move(request_info));
else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
TestKeeperStorageDispatcher::TestKeeperStorageDispatcher()
{
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
}
TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void TestKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
{
std::lock_guard lock(session_to_response_callback_mutex);
if (!session_to_response_callback.try_emplace(session_id, callback).second)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
}
}

View File

@ -0,0 +1,58 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/TestKeeperStorage.h>
#include <functional>
namespace zkutil
{
using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response)>;
class TestKeeperStorageDispatcher
{
private:
Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000};
using clock = std::chrono::steady_clock;
struct RequestInfo
{
Coordination::ZooKeeperRequestPtr request;
clock::time_point time;
int64_t session_id;
};
std::mutex push_request_mutex;
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1};
std::atomic<bool> shutdown{false};
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
std::mutex session_to_response_callback_mutex;
SessionToResponseCallback session_to_response_callback;
ThreadFromGlobalPool processing_thread;
TestKeeperStorage storage;
private:
void processingThread();
void finalize();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
public:
TestKeeperStorageDispatcher();
~TestKeeperStorageDispatcher();
void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
int64_t getSessionID()
{
return storage.getSessionID();
}
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);
};
}

View File

@ -85,6 +85,7 @@ SRCS(
ZooKeeper/IKeeper.cpp ZooKeeper/IKeeper.cpp
ZooKeeper/TestKeeper.cpp ZooKeeper/TestKeeper.cpp
ZooKeeper/TestKeeperStorage.cpp ZooKeeper/TestKeeperStorage.cpp
ZooKeeper/TestKeeperStorageDispatcher.cpp
ZooKeeper/ZooKeeper.cpp ZooKeeper/ZooKeeper.cpp
ZooKeeper/ZooKeeperCommon.cpp ZooKeeper/ZooKeeperCommon.cpp
ZooKeeper/ZooKeeperConstants.cpp ZooKeeper/ZooKeeperConstants.cpp

View File

@ -14,7 +14,6 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
} }
@ -39,8 +38,9 @@ static String baseName(const String & path)
return path.substr(rslash_pos + 1); return path.substr(rslash_pos + 1);
} }
static void processWatchesImpl(const String & path, TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches, Coordination::Event event_type) static TestKeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches, Coordination::Event event_type)
{ {
TestKeeperStorage::ResponsesForSessions result;
auto it = watches.find(path); auto it = watches.find(path);
if (it != watches.end()) if (it != watches.end())
{ {
@ -50,9 +50,8 @@ static void processWatchesImpl(const String & path, TestKeeperStorage::Watches &
watch_response->zxid = -1; watch_response->zxid = -1;
watch_response->type = event_type; watch_response->type = event_type;
watch_response->state = Coordination::State::CONNECTED; watch_response->state = Coordination::State::CONNECTED;
for (auto & watcher : it->second) for (auto watcher_session : it->second)
if (watcher.watch_callback) result.push_back(TestKeeperStorage::ResponseForSession{watcher_session, watch_response});
watcher.watch_callback(watch_response);
watches.erase(it); watches.erase(it);
} }
@ -67,19 +66,17 @@ static void processWatchesImpl(const String & path, TestKeeperStorage::Watches &
watch_list_response->zxid = -1; watch_list_response->zxid = -1;
watch_list_response->type = Coordination::Event::CHILD; watch_list_response->type = Coordination::Event::CHILD;
watch_list_response->state = Coordination::State::CONNECTED; watch_list_response->state = Coordination::State::CONNECTED;
for (auto & watcher : it->second) for (auto watcher_session : it->second)
if (watcher.watch_callback) result.push_back(TestKeeperStorage::ResponseForSession{watcher_session, watch_list_response});
watcher.watch_callback(watch_list_response);
list_watches.erase(it); list_watches.erase(it);
} }
return result;
} }
TestKeeperStorage::TestKeeperStorage() TestKeeperStorage::TestKeeperStorage()
{ {
container.emplace("/", Node()); container.emplace("/", Node());
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
} }
using Undo = std::function<void()>; using Undo = std::function<void()>;
@ -92,7 +89,7 @@ struct TestKeeperStorageRequest
: zk_request(zk_request_) : zk_request(zk_request_)
{} {}
virtual std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const = 0; virtual std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const = 0;
virtual void processWatches(TestKeeperStorage::Watches & /*watches*/, TestKeeperStorage::Watches & /*list_watches*/) const {} virtual TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & /*watches*/, TestKeeperStorage::Watches & /*list_watches*/) const { return {}; }
virtual ~TestKeeperStorageRequest() = default; virtual ~TestKeeperStorageRequest() = default;
}; };
@ -111,9 +108,9 @@ struct TestKeeperStorageCreateRequest final : public TestKeeperStorageRequest
{ {
using TestKeeperStorageRequest::TestKeeperStorageRequest; using TestKeeperStorageRequest::TestKeeperStorageRequest;
void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
{ {
processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED); return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED);
} }
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override
@ -271,9 +268,9 @@ struct TestKeeperStorageRemoveRequest final : public TestKeeperStorageRequest
return { response_ptr, undo }; return { response_ptr, undo };
} }
void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
{ {
processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
} }
}; };
@ -344,9 +341,9 @@ struct TestKeeperStorageSetRequest final : public TestKeeperStorageRequest
return { response_ptr, undo }; return { response_ptr, undo };
} }
void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
{ {
processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED); return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
} }
}; };
@ -502,10 +499,15 @@ struct TestKeeperStorageMultiRequest final : public TestKeeperStorageRequest
} }
} }
void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
{ {
TestKeeperStorage::ResponsesForSessions result;
for (const auto & generic_request : concrete_requests) for (const auto & generic_request : concrete_requests)
generic_request->processWatches(watches, list_watches); {
auto responses = generic_request->processWatches(watches, list_watches);
result.insert(result.end(), responses.begin(), responses.end());
}
return result;
} }
}; };
@ -518,160 +520,49 @@ struct TestKeeperStorageCloseRequest final : public TestKeeperStorageRequest
} }
}; };
void TestKeeperStorage::processingThread() TestKeeperStorage::ResponsesForSessions TestKeeperStorage::finalize(const RequestsForSessions & expired_requests)
{ {
setThreadName("TestKeeperSProc"); if (finalized)
throw DB::Exception("Testkeeper storage already finalized", ErrorCodes::LOGICAL_ERROR);
try finalized = true;
ResponsesForSessions finalize_results;
auto finish_watch = [] (const auto & watch_pair) -> ResponsesForSessions
{ {
while (!shutdown) ResponsesForSessions results;
{ std::shared_ptr<Coordination::ZooKeeperWatchResponse> response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
RequestInfo info; response->type = Coordination::SESSION;
response->state = Coordination::EXPIRED_SESSION;
response->error = Coordination::Error::ZSESSIONEXPIRED;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds()); for (auto & watcher_session : watch_pair.second)
results.push_back(ResponseForSession{watcher_session, response});
return results;
};
if (requests_queue.tryPop(info, max_wait)) for (auto & path_watch : watches)
{
if (shutdown)
break;
auto zk_request = info.request->zk_request;
if (zk_request->getOpNum() == Coordination::OpNum::Close)
{
auto it = ephemerals.find(info.session_id);
if (it != ephemerals.end())
{
for (const auto & ephemeral_path : it->second)
{
container.erase(ephemeral_path);
processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED);
}
ephemerals.erase(it);
}
clearDeadWatches(info.session_id);
/// Finish connection
auto response = std::make_shared<Coordination::ZooKeeperCloseResponse>();
response->xid = zk_request->xid;
response->zxid = getZXID();
info.response_callback(response);
}
else
{
auto [response, _] = info.request->process(container, ephemerals, zxid, info.session_id);
if (info.watch_callback)
{
if (response->error == Coordination::Error::ZOK)
{
auto & watches_type = zk_request->getOpNum() == Coordination::OpNum::List || zk_request->getOpNum() == Coordination::OpNum::SimpleList
? list_watches
: watches;
watches_type[zk_request->getPath()].emplace_back(Watcher{info.session_id, info.watch_callback});
sessions_and_watchers[info.session_id].emplace(zk_request->getPath());
}
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
{
watches[zk_request->getPath()].emplace_back(Watcher{info.session_id, info.watch_callback});
sessions_and_watchers[info.session_id].emplace(zk_request->getPath());
}
else
{
std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
watch_response->path = zk_request->getPath();
watch_response->xid = -1;
watch_response->error = response->error;
watch_response->type = Coordination::Event::NOTWATCHING;
info.watch_callback(watch_response);
}
}
if (response->error == Coordination::Error::ZOK)
info.request->processWatches(watches, list_watches);
response->xid = zk_request->xid;
response->zxid = getZXID();
info.response_callback(response);
}
}
}
}
catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); auto watch_responses = finish_watch(path_watch);
finalize(); finalize_results.insert(finalize_results.end(), watch_responses.begin(), watch_responses.end());
}
}
void TestKeeperStorage::finalize()
{
{
std::lock_guard lock(push_request_mutex);
if (shutdown)
return;
shutdown = true;
if (processing_thread.joinable())
processing_thread.join();
} }
try watches.clear();
for (auto & path_watch : list_watches)
{ {
{ auto list_watch_responses = finish_watch(path_watch);
auto finish_watch = [] (const auto & watch_pair) finalize_results.insert(finalize_results.end(), list_watch_responses.begin(), list_watch_responses.end());
{ }
Coordination::ZooKeeperWatchResponse response; list_watches.clear();
response.type = Coordination::SESSION; sessions_and_watchers.clear();
response.state = Coordination::EXPIRED_SESSION;
response.error = Coordination::Error::ZSESSIONEXPIRED;
for (auto & watcher : watch_pair.second) for (const auto & [session_id, zk_request] : expired_requests)
{
if (watcher.watch_callback)
{
try
{
watcher.watch_callback(std::make_shared<Coordination::ZooKeeperWatchResponse>(response));
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
};
for (auto & path_watch : watches)
finish_watch(path_watch);
watches.clear();
for (auto & path_watch : list_watches)
finish_watch(path_watch);
list_watches.clear();
sessions_and_watchers.clear();
}
RequestInfo info;
while (requests_queue.tryPop(info))
{
auto response = info.request->zk_request->makeResponse();
response->error = Coordination::Error::ZSESSIONEXPIRED;
try
{
info.response_callback(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); auto response = zk_request->makeResponse();
response->error = Coordination::Error::ZSESSIONEXPIRED;
finalize_results.push_back(ResponseForSession{session_id, response});
} }
return finalize_results;
} }
@ -731,55 +622,80 @@ TestKeeperWrapperFactory::TestKeeperWrapperFactory()
registerTestKeeperRequestWrapper<Coordination::OpNum::Multi, TestKeeperStorageMultiRequest>(*this); registerTestKeeperRequestWrapper<Coordination::OpNum::Multi, TestKeeperStorageMultiRequest>(*this);
} }
void TestKeeperStorage::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback)
TestKeeperStorage::ResponsesForSessions TestKeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id)
{ {
TestKeeperStorageRequestPtr storage_request = TestKeeperWrapperFactory::instance().get(request); TestKeeperStorage::ResponsesForSessions results;
RequestInfo request_info; if (zk_request->getOpNum() == Coordination::OpNum::Close)
request_info.time = clock::now();
request_info.request = storage_request;
request_info.session_id = session_id;
request_info.response_callback = callback;
std::lock_guard lock(push_request_mutex);
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
requests_queue.push(std::move(request_info));
else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
void TestKeeperStorage::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback, ResponseCallback watch_callback)
{
TestKeeperStorageRequestPtr storage_request = TestKeeperWrapperFactory::instance().get(request);
RequestInfo request_info;
request_info.time = clock::now();
request_info.request = storage_request;
request_info.session_id = session_id;
request_info.response_callback = callback;
if (request->has_watch)
request_info.watch_callback = watch_callback;
std::lock_guard lock(push_request_mutex);
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
requests_queue.push(std::move(request_info));
else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
TestKeeperStorage::~TestKeeperStorage()
{
try
{ {
finalize(); auto it = ephemerals.find(session_id);
if (it != ephemerals.end())
{
for (const auto & ephemeral_path : it->second)
{
container.erase(ephemeral_path);
auto responses = processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED);
results.insert(results.end(), responses.begin(), responses.end());
}
ephemerals.erase(it);
}
clearDeadWatches(session_id);
/// Finish connection
auto response = std::make_shared<Coordination::ZooKeeperCloseResponse>();
response->xid = zk_request->xid;
response->zxid = getZXID();
results.push_back(ResponseForSession{session_id, response});
} }
catch (...) else
{ {
tryLogCurrentException(__PRETTY_FUNCTION__);
TestKeeperStorageRequestPtr storage_request = TestKeeperWrapperFactory::instance().get(zk_request);
auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id);
if (zk_request->has_watch)
{
if (response->error == Coordination::Error::ZOK)
{
auto & watches_type = zk_request->getOpNum() == Coordination::OpNum::List || zk_request->getOpNum() == Coordination::OpNum::SimpleList
? list_watches
: watches;
watches_type[zk_request->getPath()].emplace_back(session_id);
sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
{
watches[zk_request->getPath()].emplace_back(session_id);
sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
else
{
std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
watch_response->path = zk_request->getPath();
watch_response->xid = -1;
watch_response->error = response->error;
watch_response->type = Coordination::Event::NOTWATCHING;
results.push_back(ResponseForSession{session_id, watch_response});
}
}
if (response->error == Coordination::Error::ZOK)
{
auto watch_responses = storage_request->processWatches(watches, list_watches);
results.insert(results.end(), watch_responses.begin(), watch_responses.end());
}
response->xid = zk_request->xid;
response->zxid = getZXID();
results.push_back(ResponseForSession{session_id, response});
} }
return results;
} }
void TestKeeperStorage::clearDeadWatches(int64_t session_id) void TestKeeperStorage::clearDeadWatches(int64_t session_id)
{ {
auto watches_it = sessions_and_watchers.find(session_id); auto watches_it = sessions_and_watchers.find(session_id);
@ -793,7 +709,7 @@ void TestKeeperStorage::clearDeadWatches(int64_t session_id)
auto & watches_for_path = watch->second; auto & watches_for_path = watch->second;
for (auto w_it = watches_for_path.begin(); w_it != watches_for_path.end();) for (auto w_it = watches_for_path.begin(); w_it != watches_for_path.end();)
{ {
if (w_it->session_id == session_id) if (*w_it == session_id)
w_it = watches_for_path.erase(w_it); w_it = watches_for_path.erase(w_it);
else else
++w_it; ++w_it;
@ -808,7 +724,7 @@ void TestKeeperStorage::clearDeadWatches(int64_t session_id)
auto & list_watches_for_path = list_watch->second; auto & list_watches_for_path = list_watch->second;
for (auto w_it = list_watches_for_path.begin(); w_it != list_watches_for_path.end();) for (auto w_it = list_watches_for_path.begin(); w_it != list_watches_for_path.end();)
{ {
if (w_it->session_id == session_id) if (*w_it == session_id)
w_it = list_watches_for_path.erase(w_it); w_it = list_watches_for_path.erase(w_it);
else else
++w_it; ++w_it;

View File

@ -4,9 +4,9 @@
#include <Common/ZooKeeper/IKeeper.h> #include <Common/ZooKeeper/IKeeper.h>
#include <Common/ConcurrentBoundedQueue.h> #include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h> #include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <future>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <vector>
namespace zkutil namespace zkutil
{ {
@ -18,10 +18,7 @@ using ResponseCallback = std::function<void(const Coordination::ZooKeeperRespons
class TestKeeperStorage class TestKeeperStorage
{ {
public: public:
Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000};
std::atomic<int64_t> session_id_counter{0}; std::atomic<int64_t> session_id_counter{0};
struct Node struct Node
@ -34,71 +31,56 @@ public:
int32_t seq_num = 0; int32_t seq_num = 0;
}; };
struct Watcher struct ResponseForSession
{ {
int64_t session_id; int64_t session_id;
ResponseCallback watch_callback; Coordination::ZooKeeperResponsePtr response;
}; };
using ResponsesForSessions = std::vector<ResponseForSession>;
struct RequestForSession
{
int64_t session_id;
Coordination::ZooKeeperRequestPtr request;
};
using RequestsForSessions = std::vector<RequestForSession>;
using Container = std::map<std::string, Node>; using Container = std::map<std::string, Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<String>>; using Ephemerals = std::unordered_map<int64_t, std::unordered_set<String>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<String>>; using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<String>>;
using SessionIDs = std::vector<int64_t>;
using WatchCallbacks = std::vector<Watcher>; using Watches = std::map<String /* path, relative of root_path */, SessionIDs>;
using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
Container container; Container container;
Ephemerals ephemerals; Ephemerals ephemerals;
SessionAndWatcher sessions_and_watchers; SessionAndWatcher sessions_and_watchers;
std::atomic<int64_t> zxid{0}; std::atomic<int64_t> zxid{0};
std::atomic<bool> shutdown{false}; std::atomic<bool> finalized{false};
Watches watches; Watches watches;
Watches list_watches; /// Watches for 'list' request (watches on children). Watches list_watches; /// Watches for 'list' request (watches on children).
using clock = std::chrono::steady_clock;
struct RequestInfo
{
TestKeeperStorageRequestPtr request;
ResponseCallback response_callback;
ResponseCallback watch_callback;
clock::time_point time;
int64_t session_id;
};
std::mutex push_request_mutex;
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1};
void finalize();
ThreadFromGlobalPool processing_thread;
void processingThread();
void clearDeadWatches(int64_t session_id); void clearDeadWatches(int64_t session_id);
public: int64_t getZXID()
using AsyncResponse = std::future<Coordination::ZooKeeperResponsePtr>;
TestKeeperStorage();
~TestKeeperStorage();
struct ResponsePair
{ {
AsyncResponse response; return zxid.fetch_add(1);
std::optional<AsyncResponse> watch_response; }
};
void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback); public:
void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback, ResponseCallback watch_callback); TestKeeperStorage();
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
ResponsesForSessions finalize(const RequestsForSessions & expired_requests);
int64_t getSessionID() int64_t getSessionID()
{ {
return session_id_counter.fetch_add(1); return session_id_counter.fetch_add(1);
} }
int64_t getZXID()
{
return zxid.fetch_add(1);
}
}; };
} }

View File

@ -12,7 +12,7 @@
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Common/thread_local_rng.h> #include <Common/thread_local_rng.h>
#include <Coordination/TestKeeperStorage.h> #include <Coordination/TestKeeperStorageDispatcher.h>
#include <Compression/ICompressionCodec.h> #include <Compression/ICompressionCodec.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
@ -305,8 +305,8 @@ struct ContextShared
mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper. mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper.
ConfigurationPtr zookeeper_config; /// Stores zookeeper configs ConfigurationPtr zookeeper_config; /// Stores zookeeper configs
mutable std::mutex test_keeper_storage_mutex; mutable std::mutex test_keeper_storage_dispatcher_mutex;
mutable std::shared_ptr<zkutil::TestKeeperStorage> test_keeper_storage; mutable std::shared_ptr<zkutil::TestKeeperStorageDispatcher> test_keeper_storage_dispatcher;
mutable std::mutex auxiliary_zookeepers_mutex; mutable std::mutex auxiliary_zookeepers_mutex;
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients. mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
ConfigurationPtr auxiliary_zookeepers_config; /// Stores auxiliary zookeepers configs ConfigurationPtr auxiliary_zookeepers_config; /// Stores auxiliary zookeepers configs
@ -447,7 +447,7 @@ struct ContextShared
/// Stop zookeeper connection /// Stop zookeeper connection
zookeeper.reset(); zookeeper.reset();
/// Stop test_keeper storage /// Stop test_keeper storage
test_keeper_storage.reset(); test_keeper_storage_dispatcher.reset();
} }
bool hasTraceCollector() const bool hasTraceCollector() const
@ -1531,13 +1531,13 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
return shared->zookeeper; return shared->zookeeper;
} }
std::shared_ptr<zkutil::TestKeeperStorage> & Context::getTestKeeperStorage() const std::shared_ptr<zkutil::TestKeeperStorageDispatcher> & Context::getTestKeeperStorageDispatcher() const
{ {
std::lock_guard lock(shared->test_keeper_storage_mutex); std::lock_guard lock(shared->test_keeper_storage_dispatcher_mutex);
if (!shared->test_keeper_storage) if (!shared->test_keeper_storage_dispatcher)
shared->test_keeper_storage = std::make_shared<zkutil::TestKeeperStorage>(); shared->test_keeper_storage_dispatcher = std::make_shared<zkutil::TestKeeperStorageDispatcher>();
return shared->test_keeper_storage; return shared->test_keeper_storage_dispatcher;
} }
zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const

View File

@ -40,7 +40,7 @@ namespace Poco
namespace zkutil namespace zkutil
{ {
class ZooKeeper; class ZooKeeper;
class TestKeeperStorage; class TestKeeperStorageDispatcher;
} }
@ -513,7 +513,7 @@ public:
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const; std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
std::shared_ptr<zkutil::TestKeeperStorage> & getTestKeeperStorage() const; std::shared_ptr<zkutil::TestKeeperStorageDispatcher> & getTestKeeperStorageDispatcher() const;
/// Set auxiliary zookeepers configuration at server starting or configuration reloading. /// Set auxiliary zookeepers configuration at server starting or configuration reloading.
void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config); void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config);

View File

@ -13,6 +13,8 @@
#include <Common/PipeFDs.h> #include <Common/PipeFDs.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <IO/ReadBufferFromFileDescriptor.h> #include <IO/ReadBufferFromFileDescriptor.h>
#include <queue>
#include <mutex>
#ifdef POCO_HAVE_FD_EPOLL #ifdef POCO_HAVE_FD_EPOLL
#include <sys/epoll.h> #include <sys/epoll.h>
@ -27,16 +29,45 @@ namespace ErrorCodes
{ {
extern const int SYSTEM_ERROR; extern const int SYSTEM_ERROR;
extern const int UNEXPECTED_PACKET_FROM_CLIENT; extern const int UNEXPECTED_PACKET_FROM_CLIENT;
extern const int LOGICAL_ERROR;
} }
struct PollResult struct PollResult
{ {
std::vector<size_t> ready_responses; bool has_responses;
bool has_requests; bool has_requests;
bool error; bool error;
}; };
/// Queue with mutex. As simple as possible.
class ThreadSafeResponseQueue
{
private:
mutable std::mutex queue_mutex;
std::queue<Coordination::ZooKeeperResponsePtr> queue;
public:
void push(const Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(queue_mutex);
queue.push(response);
}
bool tryPop(Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(queue_mutex);
if (!queue.empty())
{
response = queue.front();
queue.pop();
return true;
}
return false;
}
size_t size() const
{
std::lock_guard lock(queue_mutex);
return queue.size();
}
};
struct SocketInterruptablePollWrapper struct SocketInterruptablePollWrapper
{ {
int sockfd; int sockfd;
@ -160,12 +191,10 @@ struct SocketInterruptablePollWrapper
result.has_requests = true; result.has_requests = true;
else else
{ {
do /// Skip all of them, we are not interested in exact
{ /// amount because responses ordered in responses queue.
size_t response_position; response_in.ignore();
readIntBinary(response_position, response_in); result.has_responses = true;
result.ready_responses.push_back(response_position);
} while (response_in.available());
} }
} }
} }
@ -186,11 +215,12 @@ TestKeeperTCPHandler::TestKeeperTCPHandler(IServer & server_, const Poco::Net::S
, server(server_) , server(server_)
, log(&Poco::Logger::get("TestKeeperTCPHandler")) , log(&Poco::Logger::get("TestKeeperTCPHandler"))
, global_context(server.context()) , global_context(server.context())
, test_keeper_storage(global_context.getTestKeeperStorage()) , test_keeper_storage_dispatcher(global_context.getTestKeeperStorageDispatcher())
, operation_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000) , operation_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, session_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000) , session_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000)
, session_id(test_keeper_storage->getSessionID()) , session_id(test_keeper_storage_dispatcher->getSessionID())
, poll_wrapper(std::make_unique<SocketInterruptablePollWrapper>(socket_)) , poll_wrapper(std::make_unique<SocketInterruptablePollWrapper>(socket_))
, responses(std::make_unique<ThreadSafeResponseQueue>())
{ {
} }
@ -278,6 +308,16 @@ void TestKeeperTCPHandler::runImpl()
} }
sendHandshake(); sendHandshake();
auto response_fd = poll_wrapper->getResponseFD();
auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response)
{
responses->push(response);
UInt8 single_byte = 1;
[[maybe_unused]] int result = write(response_fd, &single_byte, sizeof(single_byte));
};
test_keeper_storage_dispatcher->registerSession(session_id, response_callback);
session_stopwatch.start(); session_stopwatch.start();
bool close_received = false; bool close_received = false;
try try
@ -287,30 +327,17 @@ void TestKeeperTCPHandler::runImpl()
using namespace std::chrono_literals; using namespace std::chrono_literals;
PollResult result = poll_wrapper->poll(session_timeout); PollResult result = poll_wrapper->poll(session_timeout);
if (result.has_requests) if (result.has_requests && !close_received)
{ {
do do
{ {
Coordination::OpNum received_op = receiveRequest(); auto [received_op, received_xid] = receiveRequest();
if (received_op == Coordination::OpNum::Close) if (received_op == Coordination::OpNum::Close)
{ {
auto last_response = responses.find(response_id_counter - 1); LOG_DEBUG(log, "Received close event with xid {} for session id #{}", received_xid, session_id);
if (last_response == responses.end()) close_xid = received_xid;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Just inserted response #{} not found in responses", response_id_counter - 1);
LOG_DEBUG(log, "Received close request for session #{}", session_id);
if (last_response->second.wait_for(std::chrono::microseconds(operation_timeout.totalMicroseconds())) != std::future_status::ready)
{
LOG_DEBUG(log, "Cannot sent close for session #{}", session_id);
}
else
{
LOG_DEBUG(log, "Sent close for session #{}", session_id);
last_response->second.get()->write(*out);
}
close_received = true; close_received = true;
break; break;
} }
else if (received_op == Coordination::OpNum::Heartbeat) else if (received_op == Coordination::OpNum::Heartbeat)
@ -322,30 +349,23 @@ void TestKeeperTCPHandler::runImpl()
while (in->available()); while (in->available());
} }
if (close_received) if (result.has_responses)
break;
for (size_t response_id : result.ready_responses)
{ {
auto response_future = responses.find(response_id); Coordination::ZooKeeperResponsePtr response;
if (response_future == responses.end()) while (responses->tryPop(response))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get unknown response #{}", response_id);
if (response_future->second.wait_for(0s) != std::future_status::ready)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Response #{} was market as ready but corresponding future not ready yet", response_id);
auto response = response_future->second.get();
if (response->error == Coordination::Error::ZOK)
{ {
response->write(*out); if (response->xid == close_xid)
} {
else LOG_DEBUG(log, "Session #{} successfully closed", session_id);
{ return;
/// TODO Get rid of this }
if (!dynamic_cast<Coordination::ZooKeeperWatchResponse *>(response.get()))
if (response->error == Coordination::Error::ZOK)
response->write(*out); response->write(*out);
else if (response->xid != Coordination::WATCH_XID)
response->write(*out);
/// skipping bad response for watch
} }
responses.erase(response_future);
} }
if (result.error) if (result.error)
@ -354,12 +374,7 @@ void TestKeeperTCPHandler::runImpl()
if (session_stopwatch.elapsedMicroseconds() > static_cast<UInt64>(session_timeout.totalMicroseconds())) if (session_stopwatch.elapsedMicroseconds() > static_cast<UInt64>(session_timeout.totalMicroseconds()))
{ {
LOG_DEBUG(log, "Session #{} expired", session_id); LOG_DEBUG(log, "Session #{} expired", session_id);
auto response = putCloseRequest(); finish();
if (response.wait_for(std::chrono::microseconds(operation_timeout.totalMicroseconds())) != std::future_status::ready)
LOG_DEBUG(log, "Cannot sent close for expired session #{}", session_id);
else
response.get()->write(*out);
break; break;
} }
} }
@ -367,29 +382,18 @@ void TestKeeperTCPHandler::runImpl()
catch (const Exception & ex) catch (const Exception & ex)
{ {
LOG_INFO(log, "Got exception processing session #{}: {}", session_id, getExceptionMessage(ex, true)); LOG_INFO(log, "Got exception processing session #{}: {}", session_id, getExceptionMessage(ex, true));
auto response = putCloseRequest(); finish();
if (response.wait_for(std::chrono::microseconds(operation_timeout.totalMicroseconds())) != std::future_status::ready)
LOG_DEBUG(log, "Cannot sent close for session #{}", session_id);
else
response.get()->write(*out);
} }
} }
zkutil::TestKeeperStorage::AsyncResponse TestKeeperTCPHandler::putCloseRequest() void TestKeeperTCPHandler::finish()
{ {
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID; request->xid = close_xid;
auto promise = std::make_shared<std::promise<Coordination::ZooKeeperResponsePtr>>(); test_keeper_storage_dispatcher->putRequest(request, session_id);
zkutil::ResponseCallback callback = [promise] (const Coordination::ZooKeeperResponsePtr & response)
{
promise->set_value(response);
};
test_keeper_storage->putRequest(request, session_id, callback);
return promise->get_future();
} }
Coordination::OpNum TestKeeperTCPHandler::receiveRequest() std::pair<Coordination::OpNum, Coordination::XID> TestKeeperTCPHandler::receiveRequest()
{ {
int32_t length; int32_t length;
Coordination::read(length, *in); Coordination::read(length, *in);
@ -402,47 +406,9 @@ Coordination::OpNum TestKeeperTCPHandler::receiveRequest()
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request->xid = xid; request->xid = xid;
request->readImpl(*in); request->readImpl(*in);
auto promise = std::make_shared<std::promise<Coordination::ZooKeeperResponsePtr>>();
if (opnum != Coordination::OpNum::Close)
{
int response_fd = poll_wrapper->getResponseFD();
size_t response_num = response_id_counter++;
zkutil::ResponseCallback callback = [response_fd, promise, response_num] (const Coordination::ZooKeeperResponsePtr & response)
{
promise->set_value(response);
[[maybe_unused]] int result = write(response_fd, &response_num, sizeof(response_num));
};
if (request->has_watch) test_keeper_storage_dispatcher->putRequest(request, session_id);
{ return std::make_pair(opnum, xid);
auto watch_promise = std::make_shared<std::promise<Coordination::ZooKeeperResponsePtr>>();
size_t watch_response_num = response_id_counter++;
zkutil::ResponseCallback watch_callback = [response_fd, watch_promise, watch_response_num] (const Coordination::ZooKeeperResponsePtr & response)
{
watch_promise->set_value(response);
[[maybe_unused]] int result = write(response_fd, &watch_response_num, sizeof(watch_response_num));
};
test_keeper_storage->putRequest(request, session_id, callback, watch_callback);
responses.try_emplace(response_num, promise->get_future());
responses.try_emplace(watch_response_num, watch_promise->get_future());
}
else
{
test_keeper_storage->putRequest(request, session_id, callback);
responses.try_emplace(response_num, promise->get_future());
}
}
else
{
zkutil::ResponseCallback callback = [promise] (const Coordination::ZooKeeperResponsePtr & response)
{
promise->set_value(response);
};
test_keeper_storage->putRequest(request, session_id, callback);
responses.try_emplace(response_id_counter++, promise->get_future());
}
return opnum;
} }
} }

View File

@ -6,17 +6,18 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h> #include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h> #include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Coordination/TestKeeperStorage.h> #include <Coordination/TestKeeperStorageDispatcher.h>
#include <IO/WriteBufferFromPocoSocket.h> #include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadBufferFromPocoSocket.h> #include <IO/ReadBufferFromPocoSocket.h>
#include <unordered_map> #include <unordered_map>
#include <future>
namespace DB namespace DB
{ {
struct SocketInterruptablePollWrapper; struct SocketInterruptablePollWrapper;
using SocketInterruptablePollWrapperPtr = std::unique_ptr<SocketInterruptablePollWrapper>; using SocketInterruptablePollWrapperPtr = std::unique_ptr<SocketInterruptablePollWrapper>;
class ThreadSafeResponseQueue;
using ThreadSafeResponseQueuePtr = std::unique_ptr<ThreadSafeResponseQueue>;
class TestKeeperTCPHandler : public Poco::Net::TCPServerConnection class TestKeeperTCPHandler : public Poco::Net::TCPServerConnection
{ {
@ -27,15 +28,16 @@ private:
IServer & server; IServer & server;
Poco::Logger * log; Poco::Logger * log;
Context global_context; Context global_context;
std::shared_ptr<zkutil::TestKeeperStorage> test_keeper_storage; std::shared_ptr<zkutil::TestKeeperStorageDispatcher> test_keeper_storage_dispatcher;
Poco::Timespan operation_timeout; Poco::Timespan operation_timeout;
Poco::Timespan session_timeout; Poco::Timespan session_timeout;
int64_t session_id; int64_t session_id;
Stopwatch session_stopwatch; Stopwatch session_stopwatch;
SocketInterruptablePollWrapperPtr poll_wrapper; SocketInterruptablePollWrapperPtr poll_wrapper;
size_t response_id_counter = 0; ThreadSafeResponseQueuePtr responses;
std::unordered_map<size_t, zkutil::TestKeeperStorage::AsyncResponse> responses;
Coordination::XID close_xid = Coordination::CLOSE_XID;
/// Streams for reading/writing from/to client connection socket. /// Streams for reading/writing from/to client connection socket.
std::shared_ptr<ReadBufferFromPocoSocket> in; std::shared_ptr<ReadBufferFromPocoSocket> in;
@ -46,8 +48,8 @@ private:
void sendHandshake(); void sendHandshake();
void receiveHandshake(); void receiveHandshake();
Coordination::OpNum receiveRequest(); std::pair<Coordination::OpNum, Coordination::XID> receiveRequest();
zkutil::TestKeeperStorage::AsyncResponse putCloseRequest(); void finish();
}; };
} }