ClickHouse/src/Common/ZooKeeper/TestKeeperStorageDispatcher.h

59 lines
1.5 KiB
C++
Raw Normal View History

2021-01-19 14:22:28 +00:00
#pragma once
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/TestKeeperStorage.h>
#include <functional>
namespace zkutil
{
using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response)>;
class TestKeeperStorageDispatcher
{
private:
Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000};
using clock = std::chrono::steady_clock;
struct RequestInfo
{
Coordination::ZooKeeperRequestPtr request;
clock::time_point time;
int64_t session_id;
};
std::mutex push_request_mutex;
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1};
std::atomic<bool> shutdown{false};
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
std::mutex session_to_response_callback_mutex;
SessionToResponseCallback session_to_response_callback;
ThreadFromGlobalPool processing_thread;
TestKeeperStorage storage;
private:
void processingThread();
void finalize();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
public:
TestKeeperStorageDispatcher();
~TestKeeperStorageDispatcher();
void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
int64_t getSessionID()
{
return storage.getSessionID();
}
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);
};
}