mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-02 12:32:04 +00:00
KeeperDispatcher: remove reductant lock as the ConcurrentBoundedQueue is thread-safe
This commit is contained in:
parent
bcb106e138
commit
f443a00d14
@ -304,8 +304,6 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
|
|||||||
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
|
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
|
||||||
request_info.session_id = session_id;
|
request_info.session_id = session_id;
|
||||||
|
|
||||||
std::lock_guard lock(push_request_mutex);
|
|
||||||
|
|
||||||
if (shutdown_called)
|
if (shutdown_called)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
@ -393,13 +391,11 @@ void KeeperDispatcher::shutdown()
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
std::lock_guard lock(push_request_mutex);
|
bool expected = false;
|
||||||
|
if (!shutdown_called.compare_exchange_strong(expected, true))
|
||||||
if (shutdown_called)
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
LOG_DEBUG(log, "Shutting down storage dispatcher");
|
LOG_DEBUG(log, "Shutting down storage dispatcher");
|
||||||
shutdown_called = true;
|
|
||||||
|
|
||||||
if (session_cleaner_thread.joinable())
|
if (session_cleaner_thread.joinable())
|
||||||
session_cleaner_thread.join();
|
session_cleaner_thread.join();
|
||||||
@ -545,12 +541,9 @@ void KeeperDispatcher::sessionCleanerTask()
|
|||||||
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
|
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
|
||||||
.request = std::move(request),
|
.request = std::move(request),
|
||||||
};
|
};
|
||||||
{
|
|
||||||
std::lock_guard lock(push_request_mutex);
|
|
||||||
if (!requests_queue->push(std::move(request_info)))
|
if (!requests_queue->push(std::move(request_info)))
|
||||||
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
|
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
|
||||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove session from registered sessions
|
/// Remove session from registered sessions
|
||||||
finishSession(dead_session);
|
finishSession(dead_session);
|
||||||
@ -661,12 +654,9 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Push new session request to queue
|
/// Push new session request to queue
|
||||||
{
|
|
||||||
std::lock_guard lock(push_request_mutex);
|
|
||||||
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
|
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
|
||||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push session id request to queue within session timeout");
|
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push session id request to queue within session timeout");
|
||||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||||
}
|
|
||||||
|
|
||||||
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
|
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
|
||||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot receive session id within session timeout");
|
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot receive session id within session timeout");
|
||||||
@ -800,10 +790,7 @@ uint64_t KeeperDispatcher::getSnapDirSize() const
|
|||||||
Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
|
Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
|
||||||
{
|
{
|
||||||
Keeper4LWInfo result = server->getPartiallyFilled4LWInfo();
|
Keeper4LWInfo result = server->getPartiallyFilled4LWInfo();
|
||||||
{
|
|
||||||
std::lock_guard lock(push_request_mutex);
|
|
||||||
result.outstanding_requests_count = requests_queue->size();
|
result.outstanding_requests_count = requests_queue->size();
|
||||||
}
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock(session_to_response_callback_mutex);
|
std::lock_guard lock(session_to_response_callback_mutex);
|
||||||
result.alive_connections_count = session_to_response_callback.size();
|
result.alive_connections_count = session_to_response_callback.size();
|
||||||
|
@ -27,8 +27,6 @@ using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeep
|
|||||||
class KeeperDispatcher
|
class KeeperDispatcher
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
mutable std::mutex push_request_mutex;
|
|
||||||
|
|
||||||
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
|
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
|
||||||
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
|
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
|
||||||
using UpdateConfigurationQueue = ConcurrentBoundedQueue<ConfigUpdateAction>;
|
using UpdateConfigurationQueue = ConcurrentBoundedQueue<ConfigUpdateAction>;
|
||||||
|
Loading…
Reference in New Issue
Block a user