ClickHouse/src/Coordination/KeeperDispatcher.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

223 lines
6.7 KiB
C++
Raw Normal View History

2021-01-19 14:22:28 +00:00
#pragma once
#include "config.h"
2021-02-01 13:18:17 +00:00
#if USE_NURAFT
2021-01-19 14:22:28 +00:00
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
2021-01-22 16:04:57 +00:00
#include <Poco/Util/AbstractConfiguration.h>
2021-02-01 13:18:17 +00:00
#include <Common/Exception.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
2021-02-01 13:18:17 +00:00
#include <functional>
2021-03-29 08:24:56 +00:00
#include <Coordination/KeeperServer.h>
2021-02-09 14:47:18 +00:00
#include <Coordination/CoordinationSettings.h>
2021-11-19 09:30:58 +00:00
#include <Coordination/Keeper4LWInfo.h>
2021-11-18 20:17:22 +00:00
#include <Coordination/KeeperConnectionStats.h>
2022-09-21 11:53:54 +00:00
#include <Coordination/KeeperSnapshotManagerS3.h>
2021-01-19 14:22:28 +00:00
namespace DB
2021-01-19 14:22:28 +00:00
{
using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response)>;
/// Highlevel wrapper for ClickHouse Keeper.
/// Process user requests via consensus and return responses.
2021-11-18 20:17:22 +00:00
class KeeperDispatcher
2021-01-19 14:22:28 +00:00
{
private:
2021-10-27 12:26:42 +00:00
mutable std::mutex push_request_mutex;
2021-01-19 14:22:28 +00:00
2021-03-29 08:24:56 +00:00
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
2021-03-05 10:40:24 +00:00
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
2021-10-19 12:00:26 +00:00
using UpdateConfigurationQueue = ConcurrentBoundedQueue<ConfigUpdateAction>;
2021-03-05 10:40:24 +00:00
2021-04-16 13:50:09 +00:00
/// Size depends on coordination settings
std::unique_ptr<RequestsQueue> requests_queue;
ResponsesQueue responses_queue;
2021-03-05 10:40:24 +00:00
SnapshotsQueue snapshots_queue{1};
2021-10-19 13:11:29 +00:00
/// More than 1k updates is definitely misconfiguration.
2021-10-19 12:00:26 +00:00
UpdateConfigurationQueue update_configuration_queue{1000};
2021-03-05 10:40:24 +00:00
2021-01-26 14:08:31 +00:00
std::atomic<bool> shutdown_called{false};
2021-01-19 14:22:28 +00:00
2021-10-27 12:26:42 +00:00
mutable std::mutex session_to_response_callback_mutex;
2021-04-16 13:50:09 +00:00
/// These two maps looks similar, but serves different purposes.
/// The first map is subscription map for normal responses like
/// (get, set, list, etc.). Dispatcher determines callback for each response
/// using session id from this map.
2021-01-19 14:22:28 +00:00
SessionToResponseCallback session_to_response_callback;
2021-04-16 13:50:09 +00:00
/// But when client connects to the server for the first time it doesn't
/// have session_id. It request it from server. We give temporary
/// internal id for such requests just to much client with its response.
SessionToResponseCallback new_session_id_response_callback;
2021-01-19 14:22:28 +00:00
2021-04-16 13:50:09 +00:00
/// Reading and batching new requests from client handlers
ThreadFromGlobalPool request_thread;
2021-04-16 13:50:09 +00:00
/// Pushing responses to clients client handlers
/// using session_id.
ThreadFromGlobalPool responses_thread;
2021-04-16 13:50:09 +00:00
/// Cleaning old dead sessions
ThreadFromGlobalPool session_cleaner_thread;
2021-04-16 13:50:09 +00:00
/// Dumping new snapshots to disk
2021-03-05 10:40:24 +00:00
ThreadFromGlobalPool snapshot_thread;
2021-10-19 13:11:29 +00:00
/// Apply or wait for configuration changes
2021-10-19 12:00:26 +00:00
ThreadFromGlobalPool update_configuration_thread;
/// RAFT wrapper.
2021-03-29 08:24:56 +00:00
std::unique_ptr<KeeperServer> server;
2021-11-18 20:17:22 +00:00
KeeperConnectionStats keeper_stats;
2021-10-27 12:26:42 +00:00
2021-11-18 20:17:22 +00:00
KeeperConfigurationAndSettingsPtr configuration_and_settings;
2021-10-27 12:26:42 +00:00
Poco::Logger * log;
2021-01-19 14:22:28 +00:00
2021-04-16 13:50:09 +00:00
/// Counter for new session_id requests.
std::atomic<int64_t> internal_session_id_counter{0};
2022-09-21 11:53:54 +00:00
KeeperSnapshotManagerS3 snapshot_s3;
/// Thread put requests to raft
void requestThread();
/// Thread put responses for subscribed sessions
void responseThread();
/// Thread clean disconnected sessions from memory
void sessionCleanerTask();
/// Thread create snapshots in the background
2021-03-05 10:40:24 +00:00
void snapshotThread();
2021-10-19 13:11:29 +00:00
/// Thread apply or wait configuration changes from leader
2021-10-19 12:00:26 +00:00
void updateConfigurationThread();
2021-01-19 14:22:28 +00:00
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
2021-04-16 13:50:09 +00:00
/// Add error responses for requests to responses queue.
/// Clears requests.
2021-04-17 14:06:49 +00:00
void addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error);
2021-04-16 13:50:09 +00:00
/// Forcefully wait for result and sets errors if something when wrong.
/// Clears both arguments
2021-04-17 14:06:49 +00:00
void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions);
2021-04-16 13:50:09 +00:00
2021-01-19 14:22:28 +00:00
public:
/// Just allocate some objects, real initialization is done by `intialize method`
KeeperDispatcher();
/// Call shutdown
2021-11-18 20:17:22 +00:00
~KeeperDispatcher();
2021-01-25 12:29:12 +00:00
/// Initialization from config.
/// standalone_keeper -- we are standalone keeper application (not inside clickhouse server)
void initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async);
void startServer();
bool checkInit() const
{
return server && server->checkInit();
}
2021-01-22 16:04:57 +00:00
2022-04-26 07:32:02 +00:00
/// Is server accepting requests, i.e. connected to the cluster
/// and achieved quorum
2022-04-14 12:00:47 +00:00
bool isServerActive() const;
2021-10-19 13:49:36 +00:00
/// Registered in ConfigReloader callback. Add new configuration changes to
2021-10-19 13:11:29 +00:00
/// update_configuration_queue. Keeper Dispatcher apply them asynchronously.
2021-10-18 15:27:51 +00:00
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);
/// Shutdown internal keeper parts (server, state machine, log storage, etc)
2021-01-26 14:08:31 +00:00
void shutdown();
void forceRecovery();
/// Put request to ClickHouse Keeper
2021-01-25 12:29:12 +00:00
bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
2021-01-21 13:53:10 +00:00
2021-10-27 12:26:42 +00:00
/// Get new session ID
int64_t getSessionID(int64_t session_timeout_ms);
/// Register session and subscribe for responses with callback
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);
/// Call if we don't need any responses for this session no more (session was expired)
void finishSession(int64_t session_id);
/// Invoked when a request completes.
2021-11-18 20:17:22 +00:00
void updateKeeperStatLatency(uint64_t process_time_ms);
2021-10-27 12:26:42 +00:00
/// Are we leader
2021-11-18 20:17:22 +00:00
bool isLeader() const
2021-01-27 17:54:25 +00:00
{
return server->isLeader();
}
2022-06-30 22:07:22 +00:00
bool isFollower() const
{
return server->isFollower();
}
2021-11-18 20:17:22 +00:00
bool hasLeader() const
{
return server->isLeaderAlive();
}
2021-11-18 20:17:22 +00:00
bool isObserver() const
{
return server->isObserver();
}
2021-01-21 13:53:10 +00:00
2021-11-18 20:17:22 +00:00
uint64_t getLogDirSize() const;
2021-11-18 20:17:22 +00:00
uint64_t getSnapDirSize() const;
2021-11-05 10:21:34 +00:00
2021-10-27 12:26:42 +00:00
/// Request statistics such as qps, latency etc.
2022-03-03 06:30:22 +00:00
KeeperConnectionStats & getKeeperConnectionStats()
2021-10-27 12:26:42 +00:00
{
return keeper_stats;
}
2021-11-19 09:30:58 +00:00
Keeper4LWInfo getKeeper4LWInfo() const;
2021-10-27 12:26:42 +00:00
2021-11-18 20:17:22 +00:00
const KeeperStateMachine & getStateMachine() const
2021-10-27 12:26:42 +00:00
{
2021-11-18 20:17:22 +00:00
return *server->getKeeperStateMachine();
2021-10-27 12:26:42 +00:00
}
2021-11-18 20:17:22 +00:00
const KeeperConfigurationAndSettingsPtr & getKeeperConfigurationAndSettings() const
2021-10-27 12:26:42 +00:00
{
2021-11-18 20:17:22 +00:00
return configuration_and_settings;
2021-10-27 12:26:42 +00:00
}
2021-11-18 20:17:22 +00:00
void incrementPacketsSent()
2021-10-28 14:22:56 +00:00
{
2021-11-18 20:17:22 +00:00
keeper_stats.incrementPacketsSent();
2021-10-28 14:22:56 +00:00
}
2021-11-18 20:17:22 +00:00
void incrementPacketsReceived()
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
keeper_stats.incrementPacketsReceived();
2021-11-05 10:21:34 +00:00
}
2021-11-18 20:17:22 +00:00
void resetConnectionStats()
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
keeper_stats.reset();
2021-11-05 10:21:34 +00:00
}
2022-09-26 10:29:15 +00:00
2022-10-24 12:08:58 +00:00
/// Create snapshot manually, return the last committed log index in the snapshot
uint64_t createSnapshot()
2022-09-26 10:29:15 +00:00
{
return server->createSnapshot();
}
2022-10-25 09:46:24 +00:00
/// Get Raft information
2022-10-24 12:08:58 +00:00
KeeperLogInfo getKeeperLogInfo()
{
2022-10-24 12:08:58 +00:00
return server->getKeeperLogInfo();
}
2021-01-19 14:22:28 +00:00
};
}
2021-02-01 13:18:17 +00:00
#endif