mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
log long operations in Keeper
This commit is contained in:
parent
4498aecaf6
commit
856502fa81
@ -489,20 +489,20 @@ void ZooKeeperMultiResponse::writeImpl(WriteBuffer & out) const
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return std::make_shared<ZooKeeperHeartbeatResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperHeartbeatResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return std::make_shared<ZooKeeperSyncResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSyncResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return std::make_shared<ZooKeeperAuthResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperAuthResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperCreateRequest::makeResponse() const { return std::make_shared<ZooKeeperCreateResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperCreateRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCreateResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return std::make_shared<ZooKeeperRemoveResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperRemoveResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return std::make_shared<ZooKeeperExistsResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperExistsResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return std::make_shared<ZooKeeperGetResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return std::make_shared<ZooKeeperSetResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return std::make_shared<ZooKeeperListResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperListResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return std::make_shared<ZooKeeperCheckResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCheckResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return std::make_shared<ZooKeeperMultiResponse>(requests); }
|
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperMultiResponse>(requests)); }
|
||||||
ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::make_shared<ZooKeeperCloseResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCloseResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperSetACLRequest::makeResponse() const { return std::make_shared<ZooKeeperSetACLResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperSetACLRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetACLResponse>()); }
|
||||||
ZooKeeperResponsePtr ZooKeeperGetACLRequest::makeResponse() const { return std::make_shared<ZooKeeperGetACLResponse>(); }
|
ZooKeeperResponsePtr ZooKeeperGetACLRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetACLResponse>()); }
|
||||||
|
|
||||||
void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const
|
void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const
|
||||||
{
|
{
|
||||||
@ -690,6 +690,40 @@ std::shared_ptr<ZooKeeperRequest> ZooKeeperRequest::read(ReadBuffer & in)
|
|||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ZooKeeperRequest::~ZooKeeperRequest()
|
||||||
|
{
|
||||||
|
if (!request_created_time_ns)
|
||||||
|
return;
|
||||||
|
UInt64 elapsed_ns = clock_gettime_ns() - request_created_time_ns;
|
||||||
|
constexpr UInt64 max_request_time_ns = 1000000000ULL; /// 1 sec
|
||||||
|
if (max_request_time_ns < elapsed_ns)
|
||||||
|
{
|
||||||
|
LOG_TEST(&Poco::Logger::get(__PRETTY_FUNCTION__), "Processing of request xid={} took {} ms", xid, elapsed_ns / 1000000UL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ZooKeeperResponsePtr ZooKeeperRequest::setTime(ZooKeeperResponsePtr response) const
|
||||||
|
{
|
||||||
|
if (request_created_time_ns)
|
||||||
|
{
|
||||||
|
response->response_created_time_ns = clock_gettime_ns();
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
ZooKeeperResponse::~ZooKeeperResponse()
|
||||||
|
{
|
||||||
|
if (!response_created_time_ns)
|
||||||
|
return;
|
||||||
|
UInt64 elapsed_ns = clock_gettime_ns() - response_created_time_ns;
|
||||||
|
constexpr UInt64 max_request_time_ns = 1000000000ULL; /// 1 sec
|
||||||
|
if (max_request_time_ns < elapsed_ns)
|
||||||
|
{
|
||||||
|
LOG_TEST(&Poco::Logger::get(__PRETTY_FUNCTION__), "Processing of response xid={} took {} ms", xid, elapsed_ns / 1000000UL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
ZooKeeperRequestPtr ZooKeeperRequestFactory::get(OpNum op_num) const
|
ZooKeeperRequestPtr ZooKeeperRequestFactory::get(OpNum op_num) const
|
||||||
{
|
{
|
||||||
auto it = op_num_to_request.find(op_num);
|
auto it = op_num_to_request.find(op_num);
|
||||||
@ -708,7 +742,12 @@ ZooKeeperRequestFactory & ZooKeeperRequestFactory::instance()
|
|||||||
template<OpNum num, typename RequestT>
|
template<OpNum num, typename RequestT>
|
||||||
void registerZooKeeperRequest(ZooKeeperRequestFactory & factory)
|
void registerZooKeeperRequest(ZooKeeperRequestFactory & factory)
|
||||||
{
|
{
|
||||||
factory.registerRequest(num, [] { return std::make_shared<RequestT>(); });
|
factory.registerRequest(num, []
|
||||||
|
{
|
||||||
|
auto res = std::make_shared<RequestT>();
|
||||||
|
res->request_created_time_ns = clock_gettime_ns();
|
||||||
|
return res;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
ZooKeeperRequestFactory::ZooKeeperRequestFactory()
|
ZooKeeperRequestFactory::ZooKeeperRequestFactory()
|
||||||
|
@ -30,9 +30,11 @@ struct ZooKeeperResponse : virtual Response
|
|||||||
XID xid = 0;
|
XID xid = 0;
|
||||||
int64_t zxid = 0;
|
int64_t zxid = 0;
|
||||||
|
|
||||||
|
UInt64 response_created_time_ns = 0;
|
||||||
|
|
||||||
ZooKeeperResponse() = default;
|
ZooKeeperResponse() = default;
|
||||||
ZooKeeperResponse(const ZooKeeperResponse &) = default;
|
ZooKeeperResponse(const ZooKeeperResponse &) = default;
|
||||||
virtual ~ZooKeeperResponse() override = default;
|
~ZooKeeperResponse() override;
|
||||||
virtual void readImpl(ReadBuffer &) = 0;
|
virtual void readImpl(ReadBuffer &) = 0;
|
||||||
virtual void writeImpl(WriteBuffer &) const = 0;
|
virtual void writeImpl(WriteBuffer &) const = 0;
|
||||||
virtual void write(WriteBuffer & out) const;
|
virtual void write(WriteBuffer & out) const;
|
||||||
@ -54,9 +56,11 @@ struct ZooKeeperRequest : virtual Request
|
|||||||
|
|
||||||
bool restored_from_zookeeper_log = false;
|
bool restored_from_zookeeper_log = false;
|
||||||
|
|
||||||
|
UInt64 request_created_time_ns = 0;
|
||||||
|
|
||||||
ZooKeeperRequest() = default;
|
ZooKeeperRequest() = default;
|
||||||
ZooKeeperRequest(const ZooKeeperRequest &) = default;
|
ZooKeeperRequest(const ZooKeeperRequest &) = default;
|
||||||
virtual ~ZooKeeperRequest() override = default;
|
~ZooKeeperRequest() override;
|
||||||
|
|
||||||
virtual OpNum getOpNum() const = 0;
|
virtual OpNum getOpNum() const = 0;
|
||||||
|
|
||||||
@ -69,6 +73,7 @@ struct ZooKeeperRequest : virtual Request
|
|||||||
static std::shared_ptr<ZooKeeperRequest> read(ReadBuffer & in);
|
static std::shared_ptr<ZooKeeperRequest> read(ReadBuffer & in);
|
||||||
|
|
||||||
virtual ZooKeeperResponsePtr makeResponse() const = 0;
|
virtual ZooKeeperResponsePtr makeResponse() const = 0;
|
||||||
|
ZooKeeperResponsePtr setTime(ZooKeeperResponsePtr response) const;
|
||||||
virtual bool isReadRequest() const = 0;
|
virtual bool isReadRequest() const = 0;
|
||||||
|
|
||||||
virtual void createLogElements(LogElements & elems) const;
|
virtual void createLogElements(LogElements & elems) const;
|
||||||
|
@ -206,7 +206,10 @@ void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKe
|
|||||||
|
|
||||||
/// Session was disconnected, just skip this response
|
/// Session was disconnected, just skip this response
|
||||||
if (session_response_callback == session_to_response_callback.end())
|
if (session_response_callback == session_to_response_callback.end())
|
||||||
|
{
|
||||||
|
LOG_TEST(log, "Cannot write response xid={}, op={}, session {} disconnected", response->xid, response->getOpNum(), session_id);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
session_response_callback->second(response);
|
session_response_callback->second(response);
|
||||||
|
|
||||||
|
@ -333,6 +333,16 @@ void KeeperTCPHandler::runImpl()
|
|||||||
};
|
};
|
||||||
keeper_dispatcher->registerSession(session_id, response_callback);
|
keeper_dispatcher->registerSession(session_id, response_callback);
|
||||||
|
|
||||||
|
Stopwatch logging_stopwatch;
|
||||||
|
auto log_long_operation = [&](const String & operation)
|
||||||
|
{
|
||||||
|
constexpr UInt64 operation_max_ms = 500;
|
||||||
|
auto elapsed_ms = logging_stopwatch.elapsedMilliseconds();
|
||||||
|
if (operation_max_ms < elapsed_ms)
|
||||||
|
LOG_TEST(log, "{} for session {} took {} ms", operation, session_id, elapsed_ms);
|
||||||
|
logging_stopwatch.restart();
|
||||||
|
};
|
||||||
|
|
||||||
session_stopwatch.start();
|
session_stopwatch.start();
|
||||||
bool close_received = false;
|
bool close_received = false;
|
||||||
try
|
try
|
||||||
@ -342,9 +352,11 @@ void KeeperTCPHandler::runImpl()
|
|||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
PollResult result = poll_wrapper->poll(session_timeout, in);
|
PollResult result = poll_wrapper->poll(session_timeout, in);
|
||||||
|
log_long_operation("Polling socket");
|
||||||
if (result.has_requests && !close_received)
|
if (result.has_requests && !close_received)
|
||||||
{
|
{
|
||||||
auto [received_op, received_xid] = receiveRequest();
|
auto [received_op, received_xid] = receiveRequest();
|
||||||
|
log_long_operation("Receiving request");
|
||||||
|
|
||||||
if (received_op == Coordination::OpNum::Close)
|
if (received_op == Coordination::OpNum::Close)
|
||||||
{
|
{
|
||||||
@ -370,6 +382,7 @@ void KeeperTCPHandler::runImpl()
|
|||||||
|
|
||||||
if (!responses->tryPop(response))
|
if (!responses->tryPop(response))
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have ready response, but queue is empty. It's a bug.");
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have ready response, but queue is empty. It's a bug.");
|
||||||
|
log_long_operation("Waiting for response to be ready");
|
||||||
|
|
||||||
if (response->xid == close_xid)
|
if (response->xid == close_xid)
|
||||||
{
|
{
|
||||||
@ -378,6 +391,7 @@ void KeeperTCPHandler::runImpl()
|
|||||||
}
|
}
|
||||||
|
|
||||||
response->write(*out);
|
response->write(*out);
|
||||||
|
log_long_operation("Sending response");
|
||||||
if (response->error == Coordination::Error::ZSESSIONEXPIRED)
|
if (response->error == Coordination::Error::ZSESSIONEXPIRED)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Session #{} expired because server shutting down or quorum is not alive", session_id);
|
LOG_DEBUG(log, "Session #{} expired because server shutting down or quorum is not alive", session_id);
|
||||||
@ -401,6 +415,8 @@ void KeeperTCPHandler::runImpl()
|
|||||||
}
|
}
|
||||||
catch (const Exception & ex)
|
catch (const Exception & ex)
|
||||||
{
|
{
|
||||||
|
log_long_operation("Unknown operation");
|
||||||
|
LOG_TRACE(log, "Has {} responses in the queue", responses->size());
|
||||||
LOG_INFO(log, "Got exception processing session #{}: {}", session_id, getExceptionMessage(ex, true));
|
LOG_INFO(log, "Got exception processing session #{}: {}", session_id, getExceptionMessage(ex, true));
|
||||||
keeper_dispatcher->finishSession(session_id);
|
keeper_dispatcher->finishSession(session_id);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user