C++ify taskstats getter, periodic auto update of profile events. [#CLICKHOUSE-2910]

This commit is contained in:
Vitaliy Lyudvichenko 2018-06-04 17:16:27 +03:00
parent b46f2ad946
commit 89f2107c75
7 changed files with 107 additions and 81 deletions

View File

@ -86,7 +86,7 @@ namespace ProfileEvents
parent = parent_;
}
/// Reset metrics
/// Set all counters to zero
void resetCounters();
/// Dumps profile events to two column Array(String) and Array(UInt64)

View File

@ -17,6 +17,7 @@
#include <syscall.h>
/// Based on: https://github.com/Tomas-M/iotop/tree/master/src
/// More info: https://www.kernel.org/doc/Documentation/accounting/taskstats.txt
/*
* Generic macros for dealing with netlink sockets. Might be duplicated
@ -44,10 +45,10 @@ namespace
static size_t constexpr MAX_MSG_SIZE = 1024;
struct msgtemplate
struct NetlinkMessage
{
struct nlmsghdr n;
struct genlmsghdr g;
::nlmsghdr n;
::genlmsghdr g;
char buf[MAX_MSG_SIZE];
};
@ -57,14 +58,9 @@ struct msgtemplate
int send_cmd(int sock_fd, __u16 nlmsg_type, __u32 nlmsg_pid,
__u8 genl_cmd, __u16 nla_type,
void *nla_data, int nla_len)
void * nla_data, int nla_len) noexcept
{
struct nlattr *na;
struct sockaddr_nl nladdr;
int r, buflen;
char *buf;
msgtemplate msg;
NetlinkMessage msg;
memset(&msg, 0, sizeof(msg));
msg.n.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN);
@ -75,20 +71,27 @@ int send_cmd(int sock_fd, __u16 nlmsg_type, __u32 nlmsg_pid,
msg.g.cmd = genl_cmd;
msg.g.version = 0x1;
na = (struct nlattr *) GENLMSG_DATA(&msg);
::nlattr * na = static_cast<::nlattr *>(GENLMSG_DATA(&msg));
na->nla_type = nla_type;
na->nla_len = nla_len + 1 + NLA_HDRLEN;
memcpy(NLA_DATA(na), nla_data, nla_len);
msg.n.nlmsg_len += NLMSG_ALIGN(na->nla_len);
buf = (char *) &msg;
buflen = msg.n.nlmsg_len ;
char * buf = reinterpret_cast<char *>(&msg);
ssize_t buflen = msg.n.nlmsg_len;
::sockaddr_nl nladdr;
memset(&nladdr, 0, sizeof(nladdr));
nladdr.nl_family = AF_NETLINK;
while ((r = sendto(sock_fd, buf, buflen, 0, (struct sockaddr *) &nladdr,
sizeof(nladdr))) < buflen)
while (true)
{
ssize_t r = ::sendto(sock_fd, buf, buflen, 0, reinterpret_cast<::sockaddr *>(&nladdr), sizeof(nladdr));
if (r >= buflen)
break;
if (r > 0)
{
buf += r;
@ -97,55 +100,52 @@ int send_cmd(int sock_fd, __u16 nlmsg_type, __u32 nlmsg_pid,
else if (errno != EAGAIN)
return -1;
}
return 0;
}
int get_family_id(int nl_sock_fd)
int get_family_id(int nl_sock_fd) noexcept
{
static char name[256];
struct
{
struct nlmsghdr n;
struct genlmsghdr g;
::nlmsghdr n;
::genlmsghdr g;
char buf[256];
} ans;
int id = 0;
struct nlattr *na;
int rep_len;
static char name[] = TASKSTATS_GENL_NAME;
strcpy(name, TASKSTATS_GENL_NAME);
if (send_cmd(nl_sock_fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY,
CTRL_ATTR_FAMILY_NAME, (void *) name,
strlen(TASKSTATS_GENL_NAME) + 1))
return 0;
rep_len = recv(nl_sock_fd, &ans, sizeof(ans), 0);
if (ans.n.nlmsg_type == NLMSG_ERROR
|| (rep_len < 0) || !NLMSG_OK((&ans.n), rep_len))
int id = 0;
ssize_t rep_len = ::recv(nl_sock_fd, &ans, sizeof(ans), 0);
if (ans.n.nlmsg_type == NLMSG_ERROR || (rep_len < 0) || !NLMSG_OK((&ans.n), rep_len))
return 0;
na = (struct nlattr *) GENLMSG_DATA(&ans);
na = (struct nlattr *) ((char *) na + NLA_ALIGN(na->nla_len));
::nlattr * na;
na = static_cast<::nlattr *>(GENLMSG_DATA(&ans));
na = reinterpret_cast<::nlattr *>((char *) na + NLA_ALIGN(na->nla_len));
if (na->nla_type == CTRL_ATTR_FAMILY_ID)
id = *(__u16 *) NLA_DATA(na);
id = *static_cast<__u16 *>(NLA_DATA(na));
return id;
}
bool get_taskstats(int nl_sock_fd, int nl_family_id, pid_t xxxid, struct taskstats & out_stats, Exception * out_exception = nullptr)
bool get_taskstats(int nl_sock_fd, int nl_family_id, pid_t xxxid, ::taskstats & out_stats, Exception * out_exception = nullptr)
{
if (send_cmd(nl_sock_fd, nl_family_id, xxxid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &xxxid, sizeof(pid_t)))
throwFromErrno("Can't send a Netlink command");
msgtemplate msg;
int rv = recv(nl_sock_fd, &msg, sizeof(msg), 0);
NetlinkMessage msg;
int rv = ::recv(nl_sock_fd, &msg, sizeof(msg), 0);
if (msg.n.nlmsg_type == NLMSG_ERROR || !NLMSG_OK((&msg.n), rv))
{
struct nlmsgerr *err = static_cast<struct nlmsgerr *>(NLMSG_DATA(&msg));
::nlmsgerr * err = static_cast<::nlmsgerr *>(NLMSG_DATA(&msg));
Exception e("Can't get Netlink response, error=" + std::to_string(err->error), ErrorCodes::NETLINK_ERROR);
if (out_exception)
@ -159,32 +159,33 @@ bool get_taskstats(int nl_sock_fd, int nl_family_id, pid_t xxxid, struct tasksta
rv = GENLMSG_PAYLOAD(&msg.n);
struct nlattr *na = (struct nlattr *) GENLMSG_DATA(&msg);
::nlattr * na = static_cast<::nlattr *>(GENLMSG_DATA(&msg));
int len = 0;
while (len < rv)
{
len += NLA_ALIGN(na->nla_len);
if (na->nla_type == TASKSTATS_TYPE_AGGR_TGID
|| na->nla_type == TASKSTATS_TYPE_AGGR_PID)
if (na->nla_type == TASKSTATS_TYPE_AGGR_TGID || na->nla_type == TASKSTATS_TYPE_AGGR_PID)
{
int aggr_len = NLA_PAYLOAD(na->nla_len);
int len2 = 0;
na = (struct nlattr *) NLA_DATA(na);
na = static_cast<::nlattr *>(NLA_DATA(na));
while (len2 < aggr_len)
{
if (na->nla_type == TASKSTATS_TYPE_STATS)
{
struct taskstats *ts = static_cast<struct taskstats *>(NLA_DATA(na));
::taskstats *ts = static_cast<::taskstats *>(NLA_DATA(na));
out_stats = *ts;
}
len2 += NLA_ALIGN(na->nla_len);
na = (struct nlattr *) ((char *) na + len2);
na = reinterpret_cast<::nlattr *>((char *) na + len2);
}
}
na = (struct nlattr *) ((char *) GENLMSG_DATA(&msg) + len);
na = reinterpret_cast<::nlattr *>((char *) GENLMSG_DATA(&msg) + len);
}
return true;
@ -201,11 +202,11 @@ TaskStatsInfoGetter::TaskStatsInfoGetter()
if (netlink_socket_fd < 0)
throwFromErrno("Can't create PF_NETLINK socket");
struct sockaddr_nl addr;
::sockaddr_nl addr;
memset(&addr, 0, sizeof(addr));
addr.nl_family = AF_NETLINK;
if (bind(netlink_socket_fd, (struct sockaddr *) &addr, sizeof(addr)) < 0)
if (::bind(netlink_socket_fd, reinterpret_cast<::sockaddr *>(&addr), sizeof(addr)) < 0)
throwFromErrno("Can't bind PF_NETLINK socket");
netlink_family_id = get_family_id(netlink_socket_fd);

View File

@ -52,7 +52,8 @@ public:
{
try
{
LOG_DEBUG(current_thread->log, "Thread " << current_thread->thread_number << " is started");
ThreadStatus & thread = *CurrentThread::get();
LOG_DEBUG(thread.log, "Thread " << thread.thread_number << " started");
}
catch (...)
{
@ -64,8 +65,14 @@ public:
{
try
{
current_thread->detachQuery(true);
LOG_DEBUG(current_thread->log, "Thread " << current_thread->thread_number << " is exited");
ThreadStatus & thread = *CurrentThread::get();
if (thread.getCurrentState() != ThreadStatus::ThreadState::DetachedFromQuery)
thread.detachQuery(true);
else
thread.thread_state = ThreadStatus::ThreadState::Died;
LOG_DEBUG(thread.log, "Thread " << thread.thread_number << " exited");
}
catch (...)
{
@ -192,30 +199,24 @@ struct TasksStatsCounters
}
};
struct ThreadStatus::Impl
{
RusageCounters last_rusage;
TasksStatsCounters last_taskstats;
TaskStatsInfoGetter info_getter;
};
TasksStatsCounters TasksStatsCounters::current()
{
TasksStatsCounters res;
current_thread->impl->info_getter.getStat(res.stat, current_thread->os_thread_id);
current_thread->taskstats_getter->getStat(res.stat);
return res;
}
ThreadStatus::ThreadStatus()
: thread_number(Poco::ThreadNumber::get()),
performance_counters(ProfileEvents::Level::Thread),
os_thread_id(TaskStatsInfoGetter::getCurrentTID()),
: performance_counters(ProfileEvents::Level::Thread),
log(&Poco::Logger::get("ThreadStatus"))
{
impl = std::make_unique<Impl>();
thread_number = Poco::ThreadNumber::get();
os_thread_id = TaskStatsInfoGetter::getCurrentTID();
last_rusage = std::make_unique<RusageCounters>();
last_taskstats = std::make_unique<TasksStatsCounters>();
taskstats_getter = std::make_unique<TaskStatsInfoGetter>();
LOG_DEBUG(log, "Thread " << thread_number << " created");
}
@ -291,16 +292,19 @@ void ThreadStatus::attachQuery(
query_start_time_nanoseconds = getCurrentTimeNanoseconds();
query_start_time = time(nullptr);
++queries_started;
impl->last_rusage = RusageCounters::current(query_start_time_nanoseconds);
impl->last_taskstats = TasksStatsCounters::current();
/// Clear stats from previous query if a new query is started
/// TODO: make separate query_thread_performance_counters and thread_performance_counters
if (queries_started != 1)
performance_counters.resetCounters();
*last_rusage = RusageCounters::current(query_start_time_nanoseconds);
*last_taskstats = TasksStatsCounters::current();
}
void ThreadStatus::detachQuery(bool thread_exits)
{
if (thread_state == ThreadStatus::DetachedFromQuery)
return;
if (thread_state != ThreadState::AttachedToQuery && thread_state != ThreadState::QueryInitializing)
throw Exception("Unexpected thread state " + std::to_string(getCurrentState()) + __PRETTY_FUNCTION__, ErrorCodes::LOGICAL_ERROR);
@ -337,8 +341,8 @@ void ThreadStatus::updatePerfomanceCountersImpl()
{
try
{
RusageCounters::updateProfileEvents(impl->last_rusage, performance_counters);
TasksStatsCounters::updateProfileEvents(impl->last_taskstats, performance_counters);
RusageCounters::updateProfileEvents(*last_rusage, performance_counters);
TasksStatsCounters::updateProfileEvents(*last_taskstats, performance_counters);
}
catch (...)
{

View File

@ -19,6 +19,9 @@ class Context;
class QueryStatus;
class ThreadStatus;
class QueryThreadLog;
struct TasksStatsCounters;
struct RusageCounters;
class TaskStatsInfoGetter;
using ThreadStatusPtr = std::shared_ptr<ThreadStatus>;
@ -31,9 +34,11 @@ public:
/// Poco's thread number (the same number is used in logs)
UInt32 thread_number = 0;
/// Linux's PID (or TGID) (the same id is shown by ps util)
Int32 os_thread_id = -1;
ProfileEvents::Counters performance_counters;
MemoryTracker memory_tracker;
Int32 os_thread_id = -1;
/// Statistics of read and write rows/bytes
Progress progress_in;
@ -99,14 +104,17 @@ protected:
bool log_to_query_thread_log = true;
bool log_profile_events = true;
size_t queries_started = 0;
Poco::Logger * log = nullptr;
friend class CurrentThread;
friend struct TasksStatsCounters;
struct Impl;
std::unique_ptr<Impl> impl;
/// Use ptr to not add extra dependencies in header
std::unique_ptr<RusageCounters> last_rusage;
std::unique_ptr<TasksStatsCounters> last_taskstats;
std::unique_ptr<TaskStatsInfoGetter> taskstats_getter;
public:
class CurrentThreadScope;

View File

@ -69,6 +69,7 @@ namespace Protocol
Totals = 7, /// A block with totals (compressed or not).
Extremes = 8, /// A block with minimums and maximums (compressed or not).
TablesStatusResponse = 9, /// A response to TablesStatus request.
Log = 10 /// System logs of the query execution
};
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10
@ -97,6 +98,7 @@ namespace Protocol
Cancel = 3, /// Cancel the query execution.
Ping = 4, /// Check that connection to the server is alive.
TablesStatusRequest = 5, /// Check status of tables on the server.
KeepAlive = 6 /// Keep the connection alive
};
inline const char * toString(UInt64 packet)

View File

@ -294,18 +294,27 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
size_t total_rows = progress.total_rows;
if (limits.min_execution_speed || (total_rows && limits.timeout_before_checking_execution_speed != 0))
{
double total_elapsed = info.total_stopwatch.elapsedSeconds();
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
UInt64 total_elapsed_microseconds = info.total_stopwatch.elapsedMicroseconds();
if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0)
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
{
CurrentThread::updatePerformanceCounters();
last_profile_events_update_time = total_elapsed_microseconds;
}
if ((limits.min_execution_speed || (total_rows && limits.timeout_before_checking_execution_speed != 0))
&& (static_cast<Int64>(total_elapsed_microseconds) > limits.timeout_before_checking_execution_speed.totalMicroseconds()))
{
/// Do not count sleeps in throttlers
double throttler_sleeps_seconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds] / 1000000.0;
total_elapsed = std::max(0.0, total_elapsed - throttler_sleeps_seconds);
UInt64 throttler_sleep_microseconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds];
double elapsed_seconds = (throttler_sleep_microseconds > total_elapsed_microseconds)
? 0.0 : (total_elapsed_microseconds - throttler_sleep_microseconds) / 1000000.0;
if (limits.min_execution_speed && progress.rows / total_elapsed < limits.min_execution_speed)
throw Exception("Query is executing too slow: " + toString(progress.rows / total_elapsed)
if (elapsed_seconds > 0)
{
if (limits.min_execution_speed && progress.rows / elapsed_seconds < limits.min_execution_speed)
throw Exception("Query is executing too slow: " + toString(progress.rows / elapsed_seconds)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
@ -314,7 +323,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
/// If the predicted execution time is longer than `max_execution_time`.
if (limits.max_execution_time != 0 && total_rows)
{
double estimated_execution_time_seconds = total_elapsed * (static_cast<double>(total_rows) / progress.rows);
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows) / progress.rows);
if (estimated_execution_time_seconds > limits.max_execution_time.totalSeconds())
throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)"

View File

@ -180,6 +180,8 @@ protected:
std::atomic<bool> is_killed{false};
ProgressCallback progress_callback;
QueryStatus * process_list_elem = nullptr;
/// According to total_stopwatch in microseconds
UInt64 last_profile_events_update_time = 0;
/// Additional information that can be generated during the work process.