ClickHouse/src/Coordination/TestKeeperStorageDispatcher.h

67 lines
1.8 KiB
C++
Raw Normal View History

2021-01-19 14:22:28 +00:00
#pragma once
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <functional>
2021-01-22 16:04:57 +00:00
#include <Coordination/NuKeeperServer.h>
#include <Poco/Util/AbstractConfiguration.h>
2021-01-19 14:22:28 +00:00
namespace DB
2021-01-19 14:22:28 +00:00
{
using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response)>;
class TestKeeperStorageDispatcher
{
private:
Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000};
std::mutex push_request_mutex;
2021-01-22 16:04:57 +00:00
using RequestsQueue = ConcurrentBoundedQueue<TestKeeperStorage::RequestForSession>;
2021-01-19 14:22:28 +00:00
RequestsQueue requests_queue{1};
2021-01-26 14:08:31 +00:00
std::atomic<bool> shutdown_called{false};
2021-01-19 14:22:28 +00:00
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
std::mutex session_to_response_callback_mutex;
SessionToResponseCallback session_to_response_callback;
ThreadFromGlobalPool processing_thread;
2021-01-25 12:29:12 +00:00
std::unique_ptr<NuKeeperServer> server;
2021-01-21 14:34:34 +00:00
std::mutex session_id_mutex;
2021-01-19 14:22:28 +00:00
private:
void processingThread();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
public:
2021-01-25 12:29:12 +00:00
TestKeeperStorageDispatcher() = default;
void initialize(const Poco::Util::AbstractConfiguration & config);
2021-01-22 16:04:57 +00:00
2021-01-26 14:08:31 +00:00
void shutdown();
2021-01-19 14:22:28 +00:00
~TestKeeperStorageDispatcher();
2021-01-25 12:29:12 +00:00
bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
2021-01-21 13:53:10 +00:00
2021-01-27 17:54:25 +00:00
bool isLeader() const
{
return server->isLeader();
}
2021-01-19 14:22:28 +00:00
int64_t getSessionID()
{
2021-01-21 14:34:34 +00:00
std::lock_guard lock(session_id_mutex);
2021-01-25 12:29:12 +00:00
return server->getSessionID();
2021-01-19 14:22:28 +00:00
}
2021-01-21 13:53:10 +00:00
2021-01-19 14:22:28 +00:00
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);
/// Call if we don't need any responses for this session no more (session was expired)
void finishSession(int64_t session_id);
2021-01-19 14:22:28 +00:00
};
}