Merge pull request #66124 from ClickHouse/keeper-correct-log-long-total

Correctly print long processing requests in Keeper
This commit is contained in:
Antonio Andelic 2024-07-10 11:34:28 +00:00 committed by GitHub
commit 65dc1c8515
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 86 additions and 62 deletions

View File

@ -6,6 +6,7 @@
#include <base/hex.h>
#include "Common/ZooKeeper/IKeeper.h"
#include "Common/ZooKeeper/ZooKeeperCommon.h"
#include <Common/setThreadName.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/checkStackSize.h>
@ -320,7 +321,7 @@ void KeeperDispatcher::responseThread()
try
{
setResponse(response_for_session.session_id, response_for_session.response);
setResponse(response_for_session.session_id, response_for_session.response, response_for_session.request);
}
catch (...)
{
@ -355,7 +356,7 @@ void KeeperDispatcher::snapshotThread()
}
}
void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response, Coordination::ZooKeeperRequestPtr request)
{
std::lock_guard lock(session_to_response_callback_mutex);
@ -369,7 +370,7 @@ void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKe
return;
auto callback = new_session_id_response_callback[session_id_resp.internal_id];
callback(response);
callback(response, request);
new_session_id_response_callback.erase(session_id_resp.internal_id);
}
else /// Normal response, just write to client
@ -380,7 +381,7 @@ void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKe
if (session_response_callback == session_to_response_callback.end())
return;
session_response_callback->second(response);
session_response_callback->second(response, request);
/// Session closed, no more writes
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
@ -771,21 +772,27 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
{
std::lock_guard lock(session_to_response_callback_mutex);
new_session_id_response_callback[request->internal_id] = [promise, internal_id = request->internal_id] (const Coordination::ZooKeeperResponsePtr & response)
new_session_id_response_callback[request->internal_id]
= [promise, internal_id = request->internal_id](
const Coordination::ZooKeeperResponsePtr & response, Coordination::ZooKeeperRequestPtr /*request*/)
{
if (response->getOpNum() != Coordination::OpNum::SessionID)
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response of type {} instead of SessionID response", response->getOpNum())));
promise->set_exception(std::make_exception_ptr(Exception(
ErrorCodes::LOGICAL_ERROR, "Incorrect response of type {} instead of SessionID response", response->getOpNum())));
auto session_id_response = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
if (session_id_response.internal_id != internal_id)
{
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response with internal id {} instead of {}", session_id_response.internal_id, internal_id)));
promise->set_exception(std::make_exception_ptr(Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect response with internal id {} instead of {}",
session_id_response.internal_id,
internal_id)));
}
if (response->error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException::fromMessage(response->error, "SessionID request failed with error")));
promise->set_exception(
std::make_exception_ptr(zkutil::KeeperException::fromMessage(response->error, "SessionID request failed with error")));
promise->set_value(session_id_response.session_id);
};

View File

@ -20,7 +20,7 @@
namespace DB
{
using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response)>;
using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response, Coordination::ZooKeeperRequestPtr request)>;
/// Highlevel wrapper for ClickHouse Keeper.
/// Process user requests via consensus and return responses.
@ -92,7 +92,7 @@ private:
void clusterUpdateWithReconfigDisabledThread();
void clusterUpdateThread();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response, Coordination::ZooKeeperRequestPtr request = nullptr);
/// Add error responses for requests to responses queue.
/// Clears requests.

View File

@ -407,7 +407,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
if (!keeper_context->localLogsPreprocessed() && !preprocess(*request_for_session))
return nullptr;
auto try_push = [&](const KeeperStorage::ResponseForSession& response)
auto try_push = [&](const KeeperStorage::ResponseForSession & response)
{
if (!responses_queue.push(response))
{
@ -416,17 +416,6 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
"Failed to push response with session id {} to the queue, probably because of shutdown",
response.session_id);
}
using namespace std::chrono;
uint64_t elapsed = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() - request_for_session->time;
if (elapsed > keeper_context->getCoordinationSettings()->log_slow_total_threshold_ms)
{
LOG_INFO(
log,
"Total time to process a request took too long ({}ms).\nRequest info: {}",
elapsed,
request_for_session->request->toString(/*short_format=*/true));
}
};
try
@ -443,6 +432,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
KeeperStorage::ResponseForSession response_for_session;
response_for_session.session_id = -1;
response_for_session.response = response;
response_for_session.request = request_for_session->request;
LockGuardWithStats lock(storage_and_responses_lock);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
@ -462,8 +452,14 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
LockGuardWithStats lock(storage_and_responses_lock);
KeeperStorage::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions)
{
if (response_for_session.response->xid != Coordination::WATCH_XID)
response_for_session.request = request_for_session->request;
try_push(response_for_session);
}
if (keeper_context->digestEnabled() && request_for_session->digest)
assertDigest(*request_for_session->digest, storage->getNodesDigest(true), *request_for_session->request, request_for_session->log_idx, true);
@ -797,9 +793,14 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
LockGuardWithStats lock(storage_and_responses_lock);
auto responses = storage->processRequest(
request_for_session.request, request_for_session.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/);
for (const auto & response : responses)
if (!responses_queue.push(response))
LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response.session_id);
for (auto & response_for_session : responses)
{
if (response_for_session.response->xid != Coordination::WATCH_XID)
response_for_session.request = request_for_session.request;
if (!responses_queue.push(response_for_session))
LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response_for_session.session_id);
}
}
void KeeperStateMachine::shutdownStorage()

View File

@ -206,6 +206,7 @@ public:
{
int64_t session_id;
Coordination::ZooKeeperResponsePtr response;
Coordination::ZooKeeperRequestPtr request = nullptr;
};
using ResponsesForSessions = std::vector<ResponseForSession>;

View File

@ -2,31 +2,31 @@
#if USE_NURAFT
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Core/Types.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <Poco/Net/NetException.h>
#include <Common/CurrentThread.h>
#include <Common/Stopwatch.h>
#include <Common/NetException.h>
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <base/defines.h>
#include <Common/PipeFDs.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <mutex>
#include <Coordination/FourLetterCommand.h>
#include <IO/CompressionMethod.h>
#include <base/hex.h>
# include <mutex>
# include <Coordination/FourLetterCommand.h>
# include <Core/Types.h>
# include <IO/CompressionMethod.h>
# include <IO/ReadBufferFromFileDescriptor.h>
# include <IO/ReadBufferFromPocoSocket.h>
# include <IO/WriteBufferFromPocoSocket.h>
# include <base/defines.h>
# include <base/hex.h>
# include <Poco/Net/NetException.h>
# include <Poco/Util/AbstractConfiguration.h>
# include <Common/CurrentThread.h>
# include <Common/NetException.h>
# include <Common/PipeFDs.h>
# include <Common/Stopwatch.h>
# include <Common/ZooKeeper/ZooKeeperIO.h>
# include <Common/logger_useful.h>
# include <Common/setThreadName.h>
#ifdef POCO_HAVE_FD_EPOLL
#include <sys/epoll.h>
#else
#include <poll.h>
#endif
# ifdef POCO_HAVE_FD_EPOLL
# include <sys/epoll.h>
# else
# include <poll.h>
# endif
namespace ProfileEvents
{
@ -400,13 +400,11 @@ void KeeperTCPHandler::runImpl()
}
auto response_fd = poll_wrapper->getResponseFD();
auto response_callback = [responses_ = this->responses, response_fd](const Coordination::ZooKeeperResponsePtr & response)
auto response_callback = [my_responses = this->responses,
response_fd](const Coordination::ZooKeeperResponsePtr & response, Coordination::ZooKeeperRequestPtr request)
{
if (!responses_->push(response))
throw Exception(ErrorCodes::SYSTEM_ERROR,
"Could not push response with xid {} and zxid {}",
response->xid,
response->zxid);
if (!my_responses->push(RequestWithResponse{response, std::move(request)}))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with xid {} and zxid {}", response->xid, response->zxid);
UInt8 single_byte = 1;
[[maybe_unused]] ssize_t result = write(response_fd, &single_byte, sizeof(single_byte));
@ -470,19 +468,20 @@ void KeeperTCPHandler::runImpl()
/// became inconsistent and race condition is possible.
while (result.responses_count != 0)
{
Coordination::ZooKeeperResponsePtr response;
RequestWithResponse request_with_response;
if (!responses->tryPop(response))
if (!responses->tryPop(request_with_response))
throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have ready response, but queue is empty. It's a bug.");
log_long_operation("Waiting for response to be ready");
auto & response = request_with_response.response;
if (response->xid == close_xid)
{
LOG_DEBUG(log, "Session #{} successfully closed", session_id);
return;
}
updateStats(response);
updateStats(response, request_with_response.request);
packageSent();
response->write(getWriteBuffer());
@ -609,7 +608,7 @@ void KeeperTCPHandler::packageReceived()
keeper_dispatcher->incrementPacketsReceived();
}
void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response)
void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response, const Coordination::ZooKeeperRequestPtr & request)
{
/// update statistics ignoring watch response and heartbeat.
if (response->xid != Coordination::WATCH_XID && response->getOpNum() != Coordination::OpNum::Heartbeat)
@ -617,6 +616,16 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response
Int64 elapsed = (Poco::Timestamp() - operations[response->xid]);
ProfileEvents::increment(ProfileEvents::KeeperTotalElapsedMicroseconds, elapsed);
Int64 elapsed_ms = elapsed / 1000;
if (request && elapsed_ms > static_cast<Int64>(keeper_dispatcher->getKeeperContext()->getCoordinationSettings()->log_slow_total_threshold_ms))
{
LOG_INFO(
log,
"Total time to process a request took too long ({}ms).\nRequest info: {}",
elapsed,
request->toString(/*short_format=*/true));
}
conn_stats.updateLatency(elapsed_ms);
operations.erase(response->xid);

View File

@ -26,7 +26,13 @@ namespace DB
struct SocketInterruptablePollWrapper;
using SocketInterruptablePollWrapperPtr = std::unique_ptr<SocketInterruptablePollWrapper>;
using ThreadSafeResponseQueue = ConcurrentBoundedQueue<Coordination::ZooKeeperResponsePtr>;
struct RequestWithResponse
{
Coordination::ZooKeeperResponsePtr response;
Coordination::ZooKeeperRequestPtr request; /// it can be nullptr for some responses
};
using ThreadSafeResponseQueue = ConcurrentBoundedQueue<RequestWithResponse>;
using ThreadSafeResponseQueuePtr = std::shared_ptr<ThreadSafeResponseQueue>;
struct LastOp;
@ -104,7 +110,7 @@ private:
void packageSent();
void packageReceived();
void updateStats(Coordination::ZooKeeperResponsePtr & response);
void updateStats(Coordination::ZooKeeperResponsePtr & response, const Coordination::ZooKeeperRequestPtr & request);
Poco::Timestamp established;