Merge pull request #19284 from ClickHouse/decompose_test_keeper_storage

Split TestKeeperStorage and processing thread
This commit is contained in:
alesapin 2021-01-21 00:19:05 +03:00 committed by GitHub
commit af4afff723
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 431 additions and 375 deletions

View File

@ -14,7 +14,6 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
extern const int BAD_ARGUMENTS;
}
@ -39,8 +38,9 @@ static String baseName(const String & path)
return path.substr(rslash_pos + 1);
}
static void processWatchesImpl(const String & path, TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches, Coordination::Event event_type)
static TestKeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches, Coordination::Event event_type)
{
TestKeeperStorage::ResponsesForSessions result;
auto it = watches.find(path);
if (it != watches.end())
{
@ -50,9 +50,8 @@ static void processWatchesImpl(const String & path, TestKeeperStorage::Watches &
watch_response->zxid = -1;
watch_response->type = event_type;
watch_response->state = Coordination::State::CONNECTED;
for (auto & watcher : it->second)
if (watcher.watch_callback)
watcher.watch_callback(watch_response);
for (auto watcher_session : it->second)
result.push_back(TestKeeperStorage::ResponseForSession{watcher_session, watch_response});
watches.erase(it);
}
@ -67,19 +66,17 @@ static void processWatchesImpl(const String & path, TestKeeperStorage::Watches &
watch_list_response->zxid = -1;
watch_list_response->type = Coordination::Event::CHILD;
watch_list_response->state = Coordination::State::CONNECTED;
for (auto & watcher : it->second)
if (watcher.watch_callback)
watcher.watch_callback(watch_list_response);
for (auto watcher_session : it->second)
result.push_back(TestKeeperStorage::ResponseForSession{watcher_session, watch_list_response});
list_watches.erase(it);
}
return result;
}
TestKeeperStorage::TestKeeperStorage()
{
container.emplace("/", Node());
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
}
using Undo = std::function<void()>;
@ -92,7 +89,7 @@ struct TestKeeperStorageRequest
: zk_request(zk_request_)
{}
virtual std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const = 0;
virtual void processWatches(TestKeeperStorage::Watches & /*watches*/, TestKeeperStorage::Watches & /*list_watches*/) const {}
virtual TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & /*watches*/, TestKeeperStorage::Watches & /*list_watches*/) const { return {}; }
virtual ~TestKeeperStorageRequest() = default;
};
@ -111,9 +108,9 @@ struct TestKeeperStorageCreateRequest final : public TestKeeperStorageRequest
{
using TestKeeperStorageRequest::TestKeeperStorageRequest;
void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
{
processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED);
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED);
}
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override
@ -271,9 +268,9 @@ struct TestKeeperStorageRemoveRequest final : public TestKeeperStorageRequest
return { response_ptr, undo };
}
void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
{
processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
}
};
@ -344,9 +341,9 @@ struct TestKeeperStorageSetRequest final : public TestKeeperStorageRequest
return { response_ptr, undo };
}
void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
{
processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
}
};
@ -502,10 +499,15 @@ struct TestKeeperStorageMultiRequest final : public TestKeeperStorageRequest
}
}
void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override
{
TestKeeperStorage::ResponsesForSessions result;
for (const auto & generic_request : concrete_requests)
generic_request->processWatches(watches, list_watches);
{
auto responses = generic_request->processWatches(watches, list_watches);
result.insert(result.end(), responses.begin(), responses.end());
}
return result;
}
};
@ -518,160 +520,49 @@ struct TestKeeperStorageCloseRequest final : public TestKeeperStorageRequest
}
};
void TestKeeperStorage::processingThread()
TestKeeperStorage::ResponsesForSessions TestKeeperStorage::finalize(const RequestsForSessions & expired_requests)
{
setThreadName("TestKeeperSProc");
if (finalized)
throw DB::Exception("Testkeeper storage already finalized", ErrorCodes::LOGICAL_ERROR);
try
finalized = true;
ResponsesForSessions finalize_results;
auto finish_watch = [] (const auto & watch_pair) -> ResponsesForSessions
{
while (!shutdown)
{
RequestInfo info;
ResponsesForSessions results;
std::shared_ptr<Coordination::ZooKeeperWatchResponse> response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
response->type = Coordination::SESSION;
response->state = Coordination::EXPIRED_SESSION;
response->error = Coordination::Error::ZSESSIONEXPIRED;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
for (auto & watcher_session : watch_pair.second)
results.push_back(ResponseForSession{watcher_session, response});
return results;
};
if (requests_queue.tryPop(info, max_wait))
{
if (shutdown)
break;
auto zk_request = info.request->zk_request;
if (zk_request->getOpNum() == Coordination::OpNum::Close)
{
auto it = ephemerals.find(info.session_id);
if (it != ephemerals.end())
{
for (const auto & ephemeral_path : it->second)
{
container.erase(ephemeral_path);
processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED);
}
ephemerals.erase(it);
}
clearDeadWatches(info.session_id);
/// Finish connection
auto response = std::make_shared<Coordination::ZooKeeperCloseResponse>();
response->xid = zk_request->xid;
response->zxid = getZXID();
info.response_callback(response);
}
else
{
auto [response, _] = info.request->process(container, ephemerals, zxid, info.session_id);
if (info.watch_callback)
{
if (response->error == Coordination::Error::ZOK)
{
auto & watches_type = zk_request->getOpNum() == Coordination::OpNum::List || zk_request->getOpNum() == Coordination::OpNum::SimpleList
? list_watches
: watches;
watches_type[zk_request->getPath()].emplace_back(Watcher{info.session_id, info.watch_callback});
sessions_and_watchers[info.session_id].emplace(zk_request->getPath());
}
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
{
watches[zk_request->getPath()].emplace_back(Watcher{info.session_id, info.watch_callback});
sessions_and_watchers[info.session_id].emplace(zk_request->getPath());
}
else
{
std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
watch_response->path = zk_request->getPath();
watch_response->xid = -1;
watch_response->error = response->error;
watch_response->type = Coordination::Event::NOTWATCHING;
info.watch_callback(watch_response);
}
}
if (response->error == Coordination::Error::ZOK)
info.request->processWatches(watches, list_watches);
response->xid = zk_request->xid;
response->zxid = getZXID();
info.response_callback(response);
}
}
}
}
catch (...)
for (auto & path_watch : watches)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize();
}
}
void TestKeeperStorage::finalize()
{
{
std::lock_guard lock(push_request_mutex);
if (shutdown)
return;
shutdown = true;
if (processing_thread.joinable())
processing_thread.join();
auto watch_responses = finish_watch(path_watch);
finalize_results.insert(finalize_results.end(), watch_responses.begin(), watch_responses.end());
}
try
watches.clear();
for (auto & path_watch : list_watches)
{
{
auto finish_watch = [] (const auto & watch_pair)
{
Coordination::ZooKeeperWatchResponse response;
response.type = Coordination::SESSION;
response.state = Coordination::EXPIRED_SESSION;
response.error = Coordination::Error::ZSESSIONEXPIRED;
auto list_watch_responses = finish_watch(path_watch);
finalize_results.insert(finalize_results.end(), list_watch_responses.begin(), list_watch_responses.end());
}
list_watches.clear();
sessions_and_watchers.clear();
for (auto & watcher : watch_pair.second)
{
if (watcher.watch_callback)
{
try
{
watcher.watch_callback(std::make_shared<Coordination::ZooKeeperWatchResponse>(response));
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
};
for (auto & path_watch : watches)
finish_watch(path_watch);
watches.clear();
for (auto & path_watch : list_watches)
finish_watch(path_watch);
list_watches.clear();
sessions_and_watchers.clear();
}
RequestInfo info;
while (requests_queue.tryPop(info))
{
auto response = info.request->zk_request->makeResponse();
response->error = Coordination::Error::ZSESSIONEXPIRED;
try
{
info.response_callback(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
catch (...)
for (const auto & [session_id, zk_request] : expired_requests)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
auto response = zk_request->makeResponse();
response->error = Coordination::Error::ZSESSIONEXPIRED;
finalize_results.push_back(ResponseForSession{session_id, response});
}
return finalize_results;
}
@ -731,55 +622,80 @@ TestKeeperWrapperFactory::TestKeeperWrapperFactory()
registerTestKeeperRequestWrapper<Coordination::OpNum::Multi, TestKeeperStorageMultiRequest>(*this);
}
void TestKeeperStorage::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback)
TestKeeperStorage::ResponsesForSessions TestKeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id)
{
TestKeeperStorageRequestPtr storage_request = TestKeeperWrapperFactory::instance().get(request);
RequestInfo request_info;
request_info.time = clock::now();
request_info.request = storage_request;
request_info.session_id = session_id;
request_info.response_callback = callback;
std::lock_guard lock(push_request_mutex);
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
requests_queue.push(std::move(request_info));
else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
void TestKeeperStorage::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback, ResponseCallback watch_callback)
{
TestKeeperStorageRequestPtr storage_request = TestKeeperWrapperFactory::instance().get(request);
RequestInfo request_info;
request_info.time = clock::now();
request_info.request = storage_request;
request_info.session_id = session_id;
request_info.response_callback = callback;
if (request->has_watch)
request_info.watch_callback = watch_callback;
std::lock_guard lock(push_request_mutex);
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
requests_queue.push(std::move(request_info));
else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
TestKeeperStorage::~TestKeeperStorage()
{
try
TestKeeperStorage::ResponsesForSessions results;
if (zk_request->getOpNum() == Coordination::OpNum::Close)
{
finalize();
auto it = ephemerals.find(session_id);
if (it != ephemerals.end())
{
for (const auto & ephemeral_path : it->second)
{
container.erase(ephemeral_path);
auto responses = processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED);
results.insert(results.end(), responses.begin(), responses.end());
}
ephemerals.erase(it);
}
clearDeadWatches(session_id);
/// Finish connection
auto response = std::make_shared<Coordination::ZooKeeperCloseResponse>();
response->xid = zk_request->xid;
response->zxid = getZXID();
results.push_back(ResponseForSession{session_id, response});
}
catch (...)
else
{
tryLogCurrentException(__PRETTY_FUNCTION__);
TestKeeperStorageRequestPtr storage_request = TestKeeperWrapperFactory::instance().get(zk_request);
auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id);
if (zk_request->has_watch)
{
if (response->error == Coordination::Error::ZOK)
{
auto & watches_type = zk_request->getOpNum() == Coordination::OpNum::List || zk_request->getOpNum() == Coordination::OpNum::SimpleList
? list_watches
: watches;
watches_type[zk_request->getPath()].emplace_back(session_id);
sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
{
watches[zk_request->getPath()].emplace_back(session_id);
sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
else
{
std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
watch_response->path = zk_request->getPath();
watch_response->xid = -1;
watch_response->error = response->error;
watch_response->type = Coordination::Event::NOTWATCHING;
results.push_back(ResponseForSession{session_id, watch_response});
}
}
if (response->error == Coordination::Error::ZOK)
{
auto watch_responses = storage_request->processWatches(watches, list_watches);
results.insert(results.end(), watch_responses.begin(), watch_responses.end());
}
response->xid = zk_request->xid;
response->zxid = getZXID();
results.push_back(ResponseForSession{session_id, response});
}
return results;
}
void TestKeeperStorage::clearDeadWatches(int64_t session_id)
{
auto watches_it = sessions_and_watchers.find(session_id);
@ -793,7 +709,7 @@ void TestKeeperStorage::clearDeadWatches(int64_t session_id)
auto & watches_for_path = watch->second;
for (auto w_it = watches_for_path.begin(); w_it != watches_for_path.end();)
{
if (w_it->session_id == session_id)
if (*w_it == session_id)
w_it = watches_for_path.erase(w_it);
else
++w_it;
@ -808,7 +724,7 @@ void TestKeeperStorage::clearDeadWatches(int64_t session_id)
auto & list_watches_for_path = list_watch->second;
for (auto w_it = list_watches_for_path.begin(); w_it != list_watches_for_path.end();)
{
if (w_it->session_id == session_id)
if (*w_it == session_id)
w_it = list_watches_for_path.erase(w_it);
else
++w_it;

View File

@ -4,9 +4,9 @@
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <future>
#include <unordered_map>
#include <unordered_set>
#include <vector>
namespace zkutil
{
@ -18,10 +18,7 @@ using ResponseCallback = std::function<void(const Coordination::ZooKeeperRespons
class TestKeeperStorage
{
public:
Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000};
std::atomic<int64_t> session_id_counter{0};
struct Node
@ -34,71 +31,56 @@ public:
int32_t seq_num = 0;
};
struct Watcher
struct ResponseForSession
{
int64_t session_id;
ResponseCallback watch_callback;
Coordination::ZooKeeperResponsePtr response;
};
using ResponsesForSessions = std::vector<ResponseForSession>;
struct RequestForSession
{
int64_t session_id;
Coordination::ZooKeeperRequestPtr request;
};
using RequestsForSessions = std::vector<RequestForSession>;
using Container = std::map<std::string, Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<String>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<String>>;
using SessionIDs = std::vector<int64_t>;
using WatchCallbacks = std::vector<Watcher>;
using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
using Watches = std::map<String /* path, relative of root_path */, SessionIDs>;
Container container;
Ephemerals ephemerals;
SessionAndWatcher sessions_and_watchers;
std::atomic<int64_t> zxid{0};
std::atomic<bool> shutdown{false};
std::atomic<bool> finalized{false};
Watches watches;
Watches list_watches; /// Watches for 'list' request (watches on children).
using clock = std::chrono::steady_clock;
struct RequestInfo
{
TestKeeperStorageRequestPtr request;
ResponseCallback response_callback;
ResponseCallback watch_callback;
clock::time_point time;
int64_t session_id;
};
std::mutex push_request_mutex;
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1};
void finalize();
ThreadFromGlobalPool processing_thread;
void processingThread();
void clearDeadWatches(int64_t session_id);
public:
using AsyncResponse = std::future<Coordination::ZooKeeperResponsePtr>;
TestKeeperStorage();
~TestKeeperStorage();
struct ResponsePair
int64_t getZXID()
{
AsyncResponse response;
std::optional<AsyncResponse> watch_response;
};
void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback);
void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback, ResponseCallback watch_callback);
return zxid.fetch_add(1);
}
public:
TestKeeperStorage();
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
ResponsesForSessions finalize(const RequestsForSessions & expired_requests);
int64_t getSessionID()
{
return session_id_counter.fetch_add(1);
}
int64_t getZXID()
{
return zxid.fetch_add(1);
}
};
}

View File

@ -0,0 +1,131 @@
#include <Common/ZooKeeper/TestKeeperStorageDispatcher.h>
#include <Common/setThreadName.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
}
}
namespace zkutil
{
void TestKeeperStorageDispatcher::processingThread()
{
setThreadName("TestKeeperSProc");
try
{
while (!shutdown)
{
RequestInfo info;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
if (requests_queue.tryPop(info, max_wait))
{
if (shutdown)
break;
auto responses = storage.processRequest(info.request, info.session_id);
for (const auto & response_for_session : responses)
setResponse(response_for_session.session_id, response_for_session.response);
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize();
}
}
void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_writer = session_to_response_callback.find(session_id);
if (session_writer == session_to_response_callback.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown session id {}", session_id);
session_writer->second(response);
/// Session closed, no more writes
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
session_to_response_callback.erase(session_writer);
}
void TestKeeperStorageDispatcher::finalize()
{
{
std::lock_guard lock(push_request_mutex);
if (shutdown)
return;
shutdown = true;
if (processing_thread.joinable())
processing_thread.join();
}
RequestInfo info;
TestKeeperStorage::RequestsForSessions expired_requests;
while (requests_queue.tryPop(info))
expired_requests.push_back(TestKeeperStorage::RequestForSession{info.session_id, info.request});
auto expired_responses = storage.finalize(expired_requests);
for (const auto & response_for_session : expired_responses)
setResponse(response_for_session.session_id, response_for_session.response);
}
void TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
{
std::lock_guard lock(session_to_response_callback_mutex);
if (session_to_response_callback.count(session_id) == 0)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown session id {}", session_id);
}
RequestInfo request_info;
request_info.time = clock::now();
request_info.request = request;
request_info.session_id = session_id;
std::lock_guard lock(push_request_mutex);
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
requests_queue.push(std::move(request_info));
else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
TestKeeperStorageDispatcher::TestKeeperStorageDispatcher()
{
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
}
TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void TestKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
{
std::lock_guard lock(session_to_response_callback_mutex);
if (!session_to_response_callback.try_emplace(session_id, callback).second)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
}
}

View File

@ -0,0 +1,58 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/TestKeeperStorage.h>
#include <functional>
namespace zkutil
{
using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response)>;
class TestKeeperStorageDispatcher
{
private:
Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000};
using clock = std::chrono::steady_clock;
struct RequestInfo
{
Coordination::ZooKeeperRequestPtr request;
clock::time_point time;
int64_t session_id;
};
std::mutex push_request_mutex;
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1};
std::atomic<bool> shutdown{false};
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
std::mutex session_to_response_callback_mutex;
SessionToResponseCallback session_to_response_callback;
ThreadFromGlobalPool processing_thread;
TestKeeperStorage storage;
private:
void processingThread();
void finalize();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
public:
TestKeeperStorageDispatcher();
~TestKeeperStorageDispatcher();
void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
int64_t getSessionID()
{
return storage.getSessionID();
}
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);
};
}

View File

@ -85,6 +85,7 @@ SRCS(
ZooKeeper/IKeeper.cpp
ZooKeeper/TestKeeper.cpp
ZooKeeper/TestKeeperStorage.cpp
ZooKeeper/TestKeeperStorageDispatcher.cpp
ZooKeeper/ZooKeeper.cpp
ZooKeeper/ZooKeeperCommon.cpp
ZooKeeper/ZooKeeperConstants.cpp

View File

@ -12,7 +12,7 @@
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/thread_local_rng.h>
#include <Common/ZooKeeper/TestKeeperStorage.h>
#include <Common/ZooKeeper/TestKeeperStorageDispatcher.h>
#include <Compression/ICompressionCodec.h>
#include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h>
@ -305,8 +305,8 @@ struct ContextShared
mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper.
ConfigurationPtr zookeeper_config; /// Stores zookeeper configs
mutable std::mutex test_keeper_storage_mutex;
mutable std::shared_ptr<zkutil::TestKeeperStorage> test_keeper_storage;
mutable std::mutex test_keeper_storage_dispatcher_mutex;
mutable std::shared_ptr<zkutil::TestKeeperStorageDispatcher> test_keeper_storage_dispatcher;
mutable std::mutex auxiliary_zookeepers_mutex;
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
ConfigurationPtr auxiliary_zookeepers_config; /// Stores auxiliary zookeepers configs
@ -447,7 +447,7 @@ struct ContextShared
/// Stop zookeeper connection
zookeeper.reset();
/// Stop test_keeper storage
test_keeper_storage.reset();
test_keeper_storage_dispatcher.reset();
}
bool hasTraceCollector() const
@ -1531,13 +1531,13 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
return shared->zookeeper;
}
std::shared_ptr<zkutil::TestKeeperStorage> & Context::getTestKeeperStorage() const
std::shared_ptr<zkutil::TestKeeperStorageDispatcher> & Context::getTestKeeperStorageDispatcher() const
{
std::lock_guard lock(shared->test_keeper_storage_mutex);
if (!shared->test_keeper_storage)
shared->test_keeper_storage = std::make_shared<zkutil::TestKeeperStorage>();
std::lock_guard lock(shared->test_keeper_storage_dispatcher_mutex);
if (!shared->test_keeper_storage_dispatcher)
shared->test_keeper_storage_dispatcher = std::make_shared<zkutil::TestKeeperStorageDispatcher>();
return shared->test_keeper_storage;
return shared->test_keeper_storage_dispatcher;
}
zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const

View File

@ -40,7 +40,7 @@ namespace Poco
namespace zkutil
{
class ZooKeeper;
class TestKeeperStorage;
class TestKeeperStorageDispatcher;
}
@ -513,7 +513,7 @@ public:
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
std::shared_ptr<zkutil::TestKeeperStorage> & getTestKeeperStorage() const;
std::shared_ptr<zkutil::TestKeeperStorageDispatcher> & getTestKeeperStorageDispatcher() const;
/// Set auxiliary zookeepers configuration at server starting or configuration reloading.
void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config);

View File

@ -13,6 +13,8 @@
#include <Common/PipeFDs.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <queue>
#include <mutex>
#ifdef POCO_HAVE_FD_EPOLL
#include <sys/epoll.h>
@ -27,16 +29,45 @@ namespace ErrorCodes
{
extern const int SYSTEM_ERROR;
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
extern const int LOGICAL_ERROR;
}
struct PollResult
{
std::vector<size_t> ready_responses;
bool has_responses;
bool has_requests;
bool error;
};
/// Queue with mutex. As simple as possible.
class ThreadSafeResponseQueue
{
private:
mutable std::mutex queue_mutex;
std::queue<Coordination::ZooKeeperResponsePtr> queue;
public:
void push(const Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(queue_mutex);
queue.push(response);
}
bool tryPop(Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(queue_mutex);
if (!queue.empty())
{
response = queue.front();
queue.pop();
return true;
}
return false;
}
size_t size() const
{
std::lock_guard lock(queue_mutex);
return queue.size();
}
};
struct SocketInterruptablePollWrapper
{
int sockfd;
@ -160,12 +191,10 @@ struct SocketInterruptablePollWrapper
result.has_requests = true;
else
{
do
{
size_t response_position;
readIntBinary(response_position, response_in);
result.ready_responses.push_back(response_position);
} while (response_in.available());
/// Skip all of them, we are not interested in exact
/// amount because responses ordered in responses queue.
response_in.ignore();
result.has_responses = true;
}
}
}
@ -186,11 +215,12 @@ TestKeeperTCPHandler::TestKeeperTCPHandler(IServer & server_, const Poco::Net::S
, server(server_)
, log(&Poco::Logger::get("TestKeeperTCPHandler"))
, global_context(server.context())
, test_keeper_storage(global_context.getTestKeeperStorage())
, test_keeper_storage_dispatcher(global_context.getTestKeeperStorageDispatcher())
, operation_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, session_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000)
, session_id(test_keeper_storage->getSessionID())
, session_id(test_keeper_storage_dispatcher->getSessionID())
, poll_wrapper(std::make_unique<SocketInterruptablePollWrapper>(socket_))
, responses(std::make_unique<ThreadSafeResponseQueue>())
{
}
@ -278,6 +308,16 @@ void TestKeeperTCPHandler::runImpl()
}
sendHandshake();
auto response_fd = poll_wrapper->getResponseFD();
auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response)
{
responses->push(response);
UInt8 single_byte = 1;
[[maybe_unused]] int result = write(response_fd, &single_byte, sizeof(single_byte));
};
test_keeper_storage_dispatcher->registerSession(session_id, response_callback);
session_stopwatch.start();
bool close_received = false;
try
@ -287,30 +327,17 @@ void TestKeeperTCPHandler::runImpl()
using namespace std::chrono_literals;
PollResult result = poll_wrapper->poll(session_timeout);
if (result.has_requests)
if (result.has_requests && !close_received)
{
do
{
Coordination::OpNum received_op = receiveRequest();
auto [received_op, received_xid] = receiveRequest();
if (received_op == Coordination::OpNum::Close)
{
auto last_response = responses.find(response_id_counter - 1);
if (last_response == responses.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Just inserted response #{} not found in responses", response_id_counter - 1);
LOG_DEBUG(log, "Received close request for session #{}", session_id);
if (last_response->second.wait_for(std::chrono::microseconds(operation_timeout.totalMicroseconds())) != std::future_status::ready)
{
LOG_DEBUG(log, "Cannot sent close for session #{}", session_id);
}
else
{
LOG_DEBUG(log, "Sent close for session #{}", session_id);
last_response->second.get()->write(*out);
}
LOG_DEBUG(log, "Received close event with xid {} for session id #{}", received_xid, session_id);
close_xid = received_xid;
close_received = true;
break;
}
else if (received_op == Coordination::OpNum::Heartbeat)
@ -322,30 +349,23 @@ void TestKeeperTCPHandler::runImpl()
while (in->available());
}
if (close_received)
break;
for (size_t response_id : result.ready_responses)
if (result.has_responses)
{
auto response_future = responses.find(response_id);
if (response_future == responses.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get unknown response #{}", response_id);
if (response_future->second.wait_for(0s) != std::future_status::ready)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Response #{} was market as ready but corresponding future not ready yet", response_id);
auto response = response_future->second.get();
if (response->error == Coordination::Error::ZOK)
Coordination::ZooKeeperResponsePtr response;
while (responses->tryPop(response))
{
response->write(*out);
}
else
{
/// TODO Get rid of this
if (!dynamic_cast<Coordination::ZooKeeperWatchResponse *>(response.get()))
if (response->xid == close_xid)
{
LOG_DEBUG(log, "Session #{} successfully closed", session_id);
return;
}
if (response->error == Coordination::Error::ZOK)
response->write(*out);
else if (response->xid != Coordination::WATCH_XID)
response->write(*out);
/// skipping bad response for watch
}
responses.erase(response_future);
}
if (result.error)
@ -354,12 +374,7 @@ void TestKeeperTCPHandler::runImpl()
if (session_stopwatch.elapsedMicroseconds() > static_cast<UInt64>(session_timeout.totalMicroseconds()))
{
LOG_DEBUG(log, "Session #{} expired", session_id);
auto response = putCloseRequest();
if (response.wait_for(std::chrono::microseconds(operation_timeout.totalMicroseconds())) != std::future_status::ready)
LOG_DEBUG(log, "Cannot sent close for expired session #{}", session_id);
else
response.get()->write(*out);
finish();
break;
}
}
@ -367,29 +382,18 @@ void TestKeeperTCPHandler::runImpl()
catch (const Exception & ex)
{
LOG_INFO(log, "Got exception processing session #{}: {}", session_id, getExceptionMessage(ex, true));
auto response = putCloseRequest();
if (response.wait_for(std::chrono::microseconds(operation_timeout.totalMicroseconds())) != std::future_status::ready)
LOG_DEBUG(log, "Cannot sent close for session #{}", session_id);
else
response.get()->write(*out);
finish();
}
}
zkutil::TestKeeperStorage::AsyncResponse TestKeeperTCPHandler::putCloseRequest()
void TestKeeperTCPHandler::finish()
{
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
auto promise = std::make_shared<std::promise<Coordination::ZooKeeperResponsePtr>>();
zkutil::ResponseCallback callback = [promise] (const Coordination::ZooKeeperResponsePtr & response)
{
promise->set_value(response);
};
test_keeper_storage->putRequest(request, session_id, callback);
return promise->get_future();
request->xid = close_xid;
test_keeper_storage_dispatcher->putRequest(request, session_id);
}
Coordination::OpNum TestKeeperTCPHandler::receiveRequest()
std::pair<Coordination::OpNum, Coordination::XID> TestKeeperTCPHandler::receiveRequest()
{
int32_t length;
Coordination::read(length, *in);
@ -402,47 +406,9 @@ Coordination::OpNum TestKeeperTCPHandler::receiveRequest()
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request->xid = xid;
request->readImpl(*in);
auto promise = std::make_shared<std::promise<Coordination::ZooKeeperResponsePtr>>();
if (opnum != Coordination::OpNum::Close)
{
int response_fd = poll_wrapper->getResponseFD();
size_t response_num = response_id_counter++;
zkutil::ResponseCallback callback = [response_fd, promise, response_num] (const Coordination::ZooKeeperResponsePtr & response)
{
promise->set_value(response);
[[maybe_unused]] int result = write(response_fd, &response_num, sizeof(response_num));
};
if (request->has_watch)
{
auto watch_promise = std::make_shared<std::promise<Coordination::ZooKeeperResponsePtr>>();
size_t watch_response_num = response_id_counter++;
zkutil::ResponseCallback watch_callback = [response_fd, watch_promise, watch_response_num] (const Coordination::ZooKeeperResponsePtr & response)
{
watch_promise->set_value(response);
[[maybe_unused]] int result = write(response_fd, &watch_response_num, sizeof(watch_response_num));
};
test_keeper_storage->putRequest(request, session_id, callback, watch_callback);
responses.try_emplace(response_num, promise->get_future());
responses.try_emplace(watch_response_num, watch_promise->get_future());
}
else
{
test_keeper_storage->putRequest(request, session_id, callback);
responses.try_emplace(response_num, promise->get_future());
}
}
else
{
zkutil::ResponseCallback callback = [promise] (const Coordination::ZooKeeperResponsePtr & response)
{
promise->set_value(response);
};
test_keeper_storage->putRequest(request, session_id, callback);
responses.try_emplace(response_id_counter++, promise->get_future());
}
return opnum;
test_keeper_storage_dispatcher->putRequest(request, session_id);
return std::make_pair(opnum, xid);
}
}

View File

@ -6,17 +6,18 @@
#include <Interpreters/Context.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Common/ZooKeeper/TestKeeperStorage.h>
#include <Common/ZooKeeper/TestKeeperStorageDispatcher.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <unordered_map>
#include <future>
namespace DB
{
struct SocketInterruptablePollWrapper;
using SocketInterruptablePollWrapperPtr = std::unique_ptr<SocketInterruptablePollWrapper>;
class ThreadSafeResponseQueue;
using ThreadSafeResponseQueuePtr = std::unique_ptr<ThreadSafeResponseQueue>;
class TestKeeperTCPHandler : public Poco::Net::TCPServerConnection
{
@ -27,15 +28,16 @@ private:
IServer & server;
Poco::Logger * log;
Context global_context;
std::shared_ptr<zkutil::TestKeeperStorage> test_keeper_storage;
std::shared_ptr<zkutil::TestKeeperStorageDispatcher> test_keeper_storage_dispatcher;
Poco::Timespan operation_timeout;
Poco::Timespan session_timeout;
int64_t session_id;
Stopwatch session_stopwatch;
SocketInterruptablePollWrapperPtr poll_wrapper;
size_t response_id_counter = 0;
std::unordered_map<size_t, zkutil::TestKeeperStorage::AsyncResponse> responses;
ThreadSafeResponseQueuePtr responses;
Coordination::XID close_xid = Coordination::CLOSE_XID;
/// Streams for reading/writing from/to client connection socket.
std::shared_ptr<ReadBufferFromPocoSocket> in;
@ -46,8 +48,8 @@ private:
void sendHandshake();
void receiveHandshake();
Coordination::OpNum receiveRequest();
zkutil::TestKeeperStorage::AsyncResponse putCloseRequest();
std::pair<Coordination::OpNum, Coordination::XID> receiveRequest();
void finish();
};
}