Add more perfomance counters. [#CLICKHOUSE-2910]

This commit is contained in:
Vitaliy Lyudvichenko 2018-05-28 22:53:03 +03:00
parent aa40931824
commit d81744fd06
15 changed files with 161 additions and 17 deletions

View File

@ -6,6 +6,8 @@
#include <Common/Stopwatch.h>
#include <common/logger_useful.h>
#include <chrono>
#include "ThreadStatus.h"
namespace CurrentMetrics
{
@ -230,9 +232,11 @@ void BackgroundSchedulePool::threadFunction()
{
setThreadName("BackgrSchedPool");
MemoryTracker memory_tracker;
memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool);
current_memory_tracker = &memory_tracker;
if (current_thread)
{
ThreadStatus::setCurrentThreadParentQuery(nullptr);
current_thread->memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool);
}
while (!shutdown)
{
@ -242,8 +246,6 @@ void BackgroundSchedulePool::threadFunction()
task_notification.execute();
}
}
current_memory_tracker = nullptr;
}

View File

@ -175,7 +175,7 @@ namespace CurrentMemoryTracker
}
}
DB::ActionBlockerSingleThread::LockHolder getCurrentMemoryTrackerBlocker()
DB::ActionLock getCurrentMemoryTrackerBlocker()
{
return (DB::current_thread) ? DB::current_thread->memory_tracker.blocker.cancel() : DB::ActionBlockerSingleThread::LockHolder(nullptr);
return (DB::current_thread) ? DB::current_thread->memory_tracker.blocker.cancel() : DB::ActionLock();
}

View File

@ -110,7 +110,8 @@ public:
void logPeakMemoryUsage() const;
/// To be able to temporarily stop memory tracker
DB::ActionBlockerSingleThread blocker;
/// TODO: Use more lightweight implementation
DB::ActionBlocker blocker;
};
@ -123,4 +124,4 @@ namespace CurrentMemoryTracker
}
DB::ActionBlockerSingleThread::LockHolder getCurrentMemoryTrackerBlocker();
DB::ActionLock getCurrentMemoryTrackerBlocker();

View File

@ -39,6 +39,11 @@
M(CreatedReadBufferAIO) \
M(CreatedWriteBufferOrdinary) \
M(CreatedWriteBufferAIO) \
M(DiskReadElapsedMicroseconds) \
M(DiskWriteElapsedMicroseconds) \
M(NetworkReceiveElapsedMicroseconds) \
M(NetworkSendElapsedMicroseconds) \
M(ThrottlerSleepMicroseconds) \
\
M(ReplicatedPartFetches) \
M(ReplicatedPartFailedFetches) \

View File

@ -0,0 +1,14 @@
#include <sys/resource.h>
#include "Stopwatch.h"
StopWatchRusage::Timestamp StopWatchRusage::Timestamp::current()
{
StopWatchRusage::Timestamp res;
::rusage rusage;
::getrusage(RUSAGE_THREAD, &rusage);
res.user_ns = rusage.ru_utime.tv_sec * 1000000000UL + rusage.ru_utime.tv_usec;
res.sys_ns = rusage.ru_stime.tv_sec * 1000000000UL + rusage.ru_stime.tv_usec;
return res;
}

View File

@ -134,3 +134,59 @@ private:
/// Most significant bit is a lock. When it is set, compareAndRestartDeferred method will return false.
UInt64 nanoseconds() const { return StopWatchDetail::nanoseconds(clock_type) & 0x7FFFFFFFFFFFFFFFULL; }
};
/// Like ordinary StopWatch, but uses getrusage() system call
struct StopWatchRusage
{
StopWatchRusage() = default;
void start() { start_ts = Timestamp::current(); is_running = true; }
void stop() { stop_ts = Timestamp::current(); is_running = false; }
void reset() { start_ts = Timestamp(); stop_ts = Timestamp(); is_running = false; }
void restart() { start(); }
UInt64 elapsed(bool count_user = true, bool count_sys = true) const
{
return elapsedNanoseconds(count_user, count_sys);
}
UInt64 elapsedNanoseconds(bool count_user = true, bool count_sys = true) const
{
return (is_running ? Timestamp::current() : stop_ts).nanoseconds(count_user, count_sys) - start_ts.nanoseconds(count_user, count_sys);
}
UInt64 elapsedMicroseconds(bool count_user = true, bool count_sys = true) const
{
return elapsedNanoseconds(count_user, count_sys) / 1000UL;
}
UInt64 elapsedMilliseconds(bool count_user = true, bool count_sys = true) const
{
return elapsedNanoseconds(count_user, count_sys) / 1000000UL;
}
double elapsedSeconds(bool count_user = true, bool count_sys = true) const
{
return static_cast<double>(elapsedNanoseconds(count_user, count_sys)) / 1000000000.0;
}
private:
struct Timestamp
{
UInt64 user_ns = 0;
UInt64 sys_ns = 0;
static Timestamp current();
UInt64 nanoseconds(bool count_user = true, bool count_sys = true) const
{
return (count_user ? user_ns : 0) + (count_sys ? sys_ns : 0);
}
};
Timestamp start_ts;
Timestamp stop_ts;
bool is_running = false;
};

View File

@ -45,7 +45,6 @@ static UInt64 getCurrentTimeMicroseconds(clockid_t clock_type = CLOCK_MONOTONIC)
return ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000UL;
}
struct RusageCounters
{
/// In microseconds
@ -79,16 +78,16 @@ struct RusageCounters
void setCurrent()
{
rusage rusage;
getrusage(RUSAGE_THREAD, &rusage);
::rusage rusage;
::getrusage(RUSAGE_THREAD, &rusage);
set(rusage, getCurrentTimeMicroseconds());
}
void set(const ::rusage & rusage, UInt64 real_time_)
{
real_time = real_time_;
user_time = rusage.ru_utime.tv_sec * 1000000UL + rusage.ru_utime.tv_usec;
sys_time = rusage.ru_utime.tv_sec * 1000000UL + rusage.ru_utime.tv_usec;
user_time = rusage.ru_utime.tv_sec * 1000000UL + rusage.ru_utime.tv_usec / 1000UL;
sys_time = rusage.ru_stime.tv_sec * 1000000UL + rusage.ru_stime.tv_usec / 1000UL;
page_reclaims = static_cast<UInt64>(rusage.ru_minflt);
voluntary_context_switches = static_cast<UInt64>(rusage.ru_nvcsw);
@ -293,6 +292,8 @@ struct ScopeCurrentThread
}
};
ThreadStatusPtr ThreadStatus::getCurrent() const { return current_thread; }
thread_local ThreadStatusPtr current_thread = ThreadStatus::create();
/// Order of current_thread and current_thread_scope matters

View File

@ -41,6 +41,7 @@ public:
static void setCurrentThreadParentQuery(QueryStatus * parent_process);
static void setCurrentThreadFromSibling(const ThreadStatusPtr & sibling_thread);
ThreadStatusPtr getCurrent() const;
~ThreadStatus();

View File

@ -5,9 +5,16 @@
#include <memory>
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <IO/WriteHelpers.h>
namespace ProfileEvents
{
extern const Event ThrottlerSleepMicroseconds;
}
namespace DB
{
@ -68,10 +75,14 @@ public:
if (desired_ns > elapsed_ns)
{
UInt64 sleep_ns = desired_ns - elapsed_ns;
timespec sleep_ts;
::timespec sleep_ts;
sleep_ts.tv_sec = sleep_ns / 1000000000;
sleep_ts.tv_nsec = sleep_ns % 1000000000;
nanosleep(&sleep_ts, nullptr); /// NOTE Returns early in case of a signal. This is considered normal.
/// NOTE: Returns early in case of a signal. This is considered normal.
::nanosleep(&sleep_ts, nullptr);
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_ns / 1000UL);
}
}

View File

@ -1,6 +1,13 @@
#include <Interpreters/Quota.h>
#include <Interpreters/ProcessList.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/ThreadStatus.h>
namespace ProfileEvents
{
extern const Event ThrottlerSleepMicroseconds;
}
namespace DB
@ -293,6 +300,13 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0)
{
/// Do not count sleeps in throttlers
if (current_thread)
{
UInt64 throttler_sleeps_ms = current_thread->performance_counters[ProfileEvents::ThrottlerSleepMicroseconds];
total_elapsed -= static_cast<double>(throttler_sleeps_ms) / 1000000.0;
}
if (limits.min_execution_speed && progress.rows / total_elapsed < limits.min_execution_speed)
throw Exception("Query is executing too slow: " + toString(progress.rows / total_elapsed)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),

View File

@ -14,6 +14,7 @@ namespace ProfileEvents
extern const Event ReadBufferFromFileDescriptorRead;
extern const Event ReadBufferFromFileDescriptorReadFailed;
extern const Event ReadBufferFromFileDescriptorReadBytes;
extern const Event DiskReadElapsedMicroseconds;
extern const Event Seek;
}
@ -47,6 +48,7 @@ bool ReadBufferFromFileDescriptor::nextImpl()
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
StopWatchRusage watch_ru;
std::optional<Stopwatch> watch;
if (profile_callback)
watch.emplace(clock_type);
@ -68,6 +70,9 @@ bool ReadBufferFromFileDescriptor::nextImpl()
if (res > 0)
bytes_read += res;
watch_ru.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch_ru.elapsedMicroseconds());
if (profile_callback)
{
ProfileInfo info;
@ -114,12 +119,16 @@ off_t ReadBufferFromFileDescriptor::doSeek(off_t offset, int whence)
else
{
ProfileEvents::increment(ProfileEvents::Seek);
StopWatchRusage watch_ru;
pos = working_buffer.end();
off_t res = lseek(fd, new_pos, SEEK_SET);
off_t res = ::lseek(fd, new_pos, SEEK_SET);
if (-1 == res)
throwFromErrno("Cannot seek through file " + getFileName(), ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
pos_in_file = new_pos;
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch_ru.elapsedMicroseconds());
return res;
}
}

View File

@ -4,6 +4,13 @@
#include <IO/ReadBufferFromPocoSocket.h>
#include <Common/NetException.h>
#include <Common/Stopwatch.h>
namespace ProfileEvents
{
extern const Event NetworkReceiveElapsedMicroseconds;
}
namespace DB
@ -20,6 +27,7 @@ namespace ErrorCodes
bool ReadBufferFromPocoSocket::nextImpl()
{
ssize_t bytes_read = 0;
StopWatchRusage watch_ru;
/// Add more details to exceptions.
try
@ -42,6 +50,8 @@ bool ReadBufferFromPocoSocket::nextImpl()
if (bytes_read < 0)
throw NetException("Cannot read from socket (" + peer_address.toString() + ")", ErrorCodes::CANNOT_READ_FROM_SOCKET);
ProfileEvents::increment(ProfileEvents::NetworkReceiveElapsedMicroseconds, watch_ru.elapsedMicroseconds());
if (bytes_read)
working_buffer.resize(bytes_read);
else

View File

@ -7,6 +7,7 @@
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <Common/Stopwatch.h>
namespace ProfileEvents
@ -14,6 +15,7 @@ namespace ProfileEvents
extern const Event WriteBufferFromFileDescriptorWrite;
extern const Event WriteBufferFromFileDescriptorWriteFailed;
extern const Event WriteBufferFromFileDescriptorWriteBytes;
extern const Event DiskWriteElapsedMicroseconds;
}
namespace CurrentMetrics
@ -38,6 +40,8 @@ void WriteBufferFromFileDescriptor::nextImpl()
if (!offset())
return;
StopWatchRusage watch_ru;
size_t bytes_written = 0;
while (bytes_written != offset())
{
@ -59,6 +63,7 @@ void WriteBufferFromFileDescriptor::nextImpl()
bytes_written += res;
}
ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch_ru.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written);
}

View File

@ -4,6 +4,13 @@
#include <IO/WriteBufferFromPocoSocket.h>
#include <Common/NetException.h>
#include <Common/Stopwatch.h>
namespace ProfileEvents
{
extern const Event NetworkSendElapsedMicroseconds;
}
namespace DB
@ -22,6 +29,8 @@ void WriteBufferFromPocoSocket::nextImpl()
if (!offset())
return;
StopWatchRusage watch_ru;
size_t bytes_written = 0;
while (bytes_written < offset())
{
@ -47,7 +56,10 @@ void WriteBufferFromPocoSocket::nextImpl()
if (res < 0)
throw NetException("Cannot write to socket (" + peer_address.toString() + ")", ErrorCodes::CANNOT_WRITE_TO_SOCKET);
bytes_written += res;
ProfileEvents::increment(ProfileEvents::NetworkSendElapsedMicroseconds, watch_ru.elapsedMicroseconds());
}
}

View File

@ -115,7 +115,10 @@ void BackgroundProcessingPool::threadFunction()
{
setThreadName("BackgrProcPool");
if (current_thread)
{
ThreadStatus::setCurrentThreadParentQuery(nullptr);
current_thread->memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
}
pcg64 rng(randomSeed());
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));