Merge branch 'master' into fix_0729

This commit is contained in:
Pxl 2022-07-29 16:48:26 +08:00 committed by GitHub
commit b4842860e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 23 additions and 9 deletions

View File

@ -701,6 +701,7 @@ void ZooKeeper::receiveEvent()
RequestInfo request_info;
ZooKeeperResponsePtr response;
UInt64 elapsed_ms = 0;
if (xid == PING_XID)
{
@ -761,8 +762,8 @@ void ZooKeeper::receiveEvent()
CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest);
}
auto elapsed_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(clock::now() - request_info.time).count();
ProfileEvents::increment(ProfileEvents::ZooKeeperWaitMicroseconds, elapsed_microseconds);
elapsed_ms = std::chrono::duration_cast<std::chrono::microseconds>(clock::now() - request_info.time).count();
ProfileEvents::increment(ProfileEvents::ZooKeeperWaitMicroseconds, elapsed_ms);
}
try
@ -812,7 +813,7 @@ void ZooKeeper::receiveEvent()
if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + DB::toString(length) + ", actual: " + DB::toString(actual_length), Error::ZMARSHALLINGERROR);
logOperationIfNeeded(request_info.request, response); //-V614
logOperationIfNeeded(request_info.request, response, /* finalize= */ false, elapsed_ms); //-V614
}
catch (...)
{
@ -831,7 +832,7 @@ void ZooKeeper::receiveEvent()
if (request_info.callback)
request_info.callback(*response);
logOperationIfNeeded(request_info.request, response);
logOperationIfNeeded(request_info.request, response, /* finalize= */ false, elapsed_ms);
}
catch (...)
{
@ -919,13 +920,14 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
? Error::ZCONNECTIONLOSS
: Error::ZSESSIONEXPIRED;
response->xid = request_info.request->xid;
UInt64 elapsed_ms = std::chrono::duration_cast<std::chrono::microseconds>(clock::now() - request_info.time).count();
if (request_info.callback)
{
try
{
request_info.callback(*response);
logOperationIfNeeded(request_info.request, response, true);
logOperationIfNeeded(request_info.request, response, true, elapsed_ms);
}
catch (...)
{
@ -985,7 +987,8 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
try
{
info.callback(*response);
logOperationIfNeeded(info.request, response, true);
UInt64 elapsed_ms = std::chrono::duration_cast<std::chrono::microseconds>(clock::now() - info.time).count();
logOperationIfNeeded(info.request, response, true, elapsed_ms);
}
catch (...)
{
@ -1310,7 +1313,7 @@ void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
}
#ifdef ZOOKEEPER_LOG
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize)
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize, UInt64 elapsed_ms)
{
auto maybe_zk_log = std::atomic_load(&zk_log);
if (!maybe_zk_log)
@ -1348,6 +1351,7 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const
elem.event_time = event_time;
elem.address = socket_address;
elem.session_id = session_id;
elem.duration_ms = elapsed_ms;
if (request)
{
elem.thread_id = request->thread_id;
@ -1357,7 +1361,7 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const
}
}
#else
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr &, const ZooKeeperResponsePtr &, bool)
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr &, const ZooKeeperResponsePtr &, bool, UInt64)
{}
#endif

View File

@ -276,7 +276,7 @@ private:
template <typename T>
void read(T &);
void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false);
void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false, UInt64 elapsed_ms = 0);
void initApiVersion();

View File

@ -122,6 +122,7 @@ NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{"address", DataTypeFactory::instance().get("IPv6")},
{"port", std::make_shared<DataTypeUInt16>()},
{"session_id", std::make_shared<DataTypeInt64>()},
{"duration_ms", std::make_shared<DataTypeUInt64>()},
{"xid", std::make_shared<DataTypeInt32>()},
{"has_watch", std::make_shared<DataTypeUInt8>()},
@ -172,6 +173,7 @@ void ZooKeeperLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insertData(IPv6ToBinary(address.host()).data(), 16);
columns[i++]->insert(address.port());
columns[i++]->insert(session_id);
columns[i++]->insert(duration_ms);
columns[i++]->insert(xid);
columns[i++]->insert(has_watch);

View File

@ -27,6 +27,8 @@ struct ZooKeeperLogElement
Poco::Net::SocketAddress address;
Int64 session_id = 0;
UInt64 duration_ms = 0;
/// Common request info
Int32 xid = 0;
bool has_watch = false;

View File

@ -38,3 +38,5 @@ Response 0 Error /test/01158/default/rmt/blocks/all_6308706741995381342_24957917
Response 0 Error /test/01158/default/rmt/temp/abandonable_lock- 1 1 \N 0 3 ZRUNTIMEINCONSISTENCY \N \N 0 0 0 0
Request 0 Get /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 0 \N \N \N 0 0 0 0
Response 0 Get /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 0 ZOK \N \N 0 0 9 0
duration_ms
1

View File

@ -31,3 +31,7 @@ where (session_id, xid) in (select session_id, xid from system.zookeeper_log whe
order by xid, type, request_idx;
drop table rmt;
system flush logs;
select 'duration_ms';
select count()>0 from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt%' and duration_ms > 0;