Implementation of new system metrics provider (Procfs) (#10544)

* New metrics provider (Procfs) + Refactored TasksStatsCounters

* Trivial statless test that ProcFS is provided

* Trivial perf test for ProcfsMetricsProvider

Co-authored-by: alexey-milovidov <milovidov@yandex-team.ru>
This commit is contained in:
Alexander Kazakov 2020-05-01 21:47:41 +03:00 committed by GitHub
parent 5594bd7042
commit e9baaa439b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 473 additions and 70 deletions

View File

@ -29,7 +29,7 @@
#include <Common/getMultipleKeysFromConfig.h> #include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h> #include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getExecutablePath.h> #include <Common/getExecutablePath.h>
#include <Common/TaskStatsInfoGetter.h> #include <Common/ThreadProfileEvents.h>
#include <Common/ThreadStatus.h> #include <Common/ThreadStatus.h>
#include <IO/HTTPCommon.h> #include <IO/HTTPCommon.h>
#include <IO/UseSSL.h> #include <IO/UseSSL.h>
@ -674,11 +674,13 @@ int Server::main(const std::vector<std::string> & /*args*/)
} }
#if defined(OS_LINUX) #if defined(OS_LINUX)
if (!TaskStatsInfoGetter::checkPermissions()) if (!TasksStatsCounters::checkIfAvailable())
{ {
LOG_INFO(log, "It looks like the process has no CAP_NET_ADMIN capability, 'taskstats' performance statistics will be disabled." LOG_INFO(log, "It looks like this system does not have procfs mounted at /proc location,"
" neither clickhouse-server process has CAP_NET_ADMIN capability."
" 'taskstats' performance statistics will be disabled."
" It could happen due to incorrect ClickHouse package installation." " It could happen due to incorrect ClickHouse package installation."
" You could resolve the problem manually with 'sudo setcap cap_net_admin=+ep " << executable_path << "'." " You can try to resolve the problem manually with 'sudo setcap cap_net_admin=+ep " << executable_path << "'."
" Note that it will not work on 'nosuid' mounted filesystems." " Note that it will not work on 'nosuid' mounted filesystems."
" It also doesn't work if you run clickhouse-server inside network namespace as it happens in some containers."); " It also doesn't work if you run clickhouse-server inside network namespace as it happens in some containers.");
} }

View File

@ -0,0 +1,182 @@
#include "ProcfsMetricsProvider.h"
#if defined(__linux__)
#include <Common/Exception.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadHelpers.h>
#include <common/find_symbols.h>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <linux/taskstats.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
}
static constexpr auto thread_schedstat = "/proc/thread-self/schedstat";
static constexpr auto thread_stat = "/proc/thread-self/stat";
static constexpr auto thread_io = "/proc/thread-self/io";
namespace
{
[[noreturn]] inline void throwWithFailedToOpenFile(const std::string & filename)
{
throwFromErrno(
"Cannot open file " + filename,
errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
}
ssize_t readFromFD(const int fd, const char * filename, char * buf, size_t buf_size)
{
ssize_t res = 0;
do
{
res = ::pread(fd, buf, buf_size, 0);
if (-1 == res)
{
if (errno == EINTR)
continue;
throwFromErrno(
"Cannot read from file " + std::string(filename),
ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
assert(res >= 0);
break;
} while (true);
return res;
}
}
bool ProcfsMetricsProvider::isAvailable() noexcept
{
struct stat sb;
int res = ::stat(thread_schedstat, &sb);
/// Verify that procfs is mounted, one of the stats file exists and is a regular file
return res != -1 && (sb.st_mode & S_IFMT) == S_IFREG;
}
ProcfsMetricsProvider::ProcfsMetricsProvider(const pid_t /*tid*/)
{
thread_schedstat_fd = ::open(thread_schedstat, O_RDONLY | O_CLOEXEC);
if (-1 == thread_schedstat_fd)
{
throwWithFailedToOpenFile(thread_schedstat);
}
thread_stat_fd = ::open(thread_stat, O_RDONLY | O_CLOEXEC);
if (-1 == thread_stat_fd)
{
::close(thread_schedstat_fd);
throwWithFailedToOpenFile(thread_stat);
}
thread_io_fd = ::open(thread_io, O_RDONLY | O_CLOEXEC);
if (-1 != thread_io_fd)
{
stats_version = 3;
}
}
ProcfsMetricsProvider::~ProcfsMetricsProvider()
{
if (stats_version >= 3 && 0 != ::close(thread_io_fd))
tryLogCurrentException(__PRETTY_FUNCTION__);
if (0 != ::close(thread_stat_fd))
tryLogCurrentException(__PRETTY_FUNCTION__);
if (0 != ::close(thread_schedstat_fd))
tryLogCurrentException(__PRETTY_FUNCTION__);
}
void ProcfsMetricsProvider::getTaskStats(::taskstats & out_stats) const
{
constexpr size_t buf_size = 1024;
char buf[buf_size];
out_stats.version = stats_version;
readParseAndSetThreadCPUStat(out_stats, buf, buf_size);
readParseAndSetThreadBlkIOStat(out_stats, buf, buf_size);
if (stats_version >= 3)
{
readParseAndSetThreadIOStat(out_stats, buf, buf_size);
}
}
void ProcfsMetricsProvider::readParseAndSetThreadCPUStat(::taskstats & out_stats, char * buf, size_t buf_size) const
{
ssize_t res = readFromFD(thread_schedstat_fd, thread_schedstat, buf, buf_size);
ReadBufferFromMemory in_schedstat(buf, res);
readIntText(out_stats.cpu_run_virtual_total, in_schedstat);
skipWhitespaceIfAny(in_schedstat);
readIntText(out_stats.cpu_delay_total, in_schedstat);
}
void ProcfsMetricsProvider::readParseAndSetThreadBlkIOStat(::taskstats & out_stats, char * buf, size_t buf_size) const
{
ssize_t res = readFromFD(thread_stat_fd, thread_stat, buf, buf_size - 1);
ReadBufferFromMemory in_stat(buf, res);
/// We need to skip the first 41 fields of the string read from /proc/thread-self/stat.
for (int i = 0; i < 41; ++i)
{
in_stat.position() = find_first_symbols<' ', '\t'>(in_stat.position(), in_stat.buffer().end());
skipWhitespaceIfAny(in_stat);
}
/// Read field #42 - Aggregated block I/O delays, measured in clock ticks (centiseconds)
readIntText(out_stats.blkio_delay_total, in_stat);
out_stats.blkio_delay_total *= 10000000ul; /// We need to return time in nanoseconds
}
void ProcfsMetricsProvider::readParseAndSetThreadIOStat(::taskstats & out_stats, char * buf, size_t buf_size) const
{
ssize_t res = readFromFD(thread_io_fd, thread_io, buf, buf_size);
ReadBufferFromMemory in_thread_io(buf, res);
assertString("rchar:", in_thread_io);
skipWhitespaceIfAny(in_thread_io);
readIntText(out_stats.read_char, in_thread_io);
skipWhitespaceIfAny(in_thread_io);
assertString("wchar:", in_thread_io);
skipWhitespaceIfAny(in_thread_io);
readIntText(out_stats.write_char, in_thread_io);
skipWhitespaceIfAny(in_thread_io);
skipToNextLineOrEOF(in_thread_io);
skipToNextLineOrEOF(in_thread_io);
assertString("read_bytes:", in_thread_io);
skipWhitespaceIfAny(in_thread_io);
readIntText(out_stats.read_bytes, in_thread_io);
skipWhitespaceIfAny(in_thread_io);
assertString("write_bytes:", in_thread_io);
skipWhitespaceIfAny(in_thread_io);
readIntText(out_stats.write_bytes, in_thread_io);
}
}
#endif

View File

@ -0,0 +1,44 @@
#pragma once
#include <sys/types.h>
#include <boost/noncopyable.hpp>
#if defined(__linux__)
struct taskstats;
namespace DB
{
/// Provides several essential per-task metrics by reading data from Procfs (when available).
class ProcfsMetricsProvider : private boost::noncopyable
{
public:
ProcfsMetricsProvider(const pid_t /*tid*/);
~ProcfsMetricsProvider();
/// Updates only a part of taskstats struct's fields:
/// - cpu_run_virtual_total, cpu_delay_total (when /proc/thread-self/schedstat is available)
/// - blkio_delay_total (when /proc/thread-self/stat is available)
/// - rchar, wchar, read_bytes, write_bytes (when /prod/thread-self/io is available)
/// See: man procfs
void getTaskStats(::taskstats & out_stats) const;
/// Tells whether this metrics (via Procfs) is provided on the current platform
static bool isAvailable() noexcept;
private:
void readParseAndSetThreadCPUStat(::taskstats & out_stats, char *, size_t) const;
void readParseAndSetThreadBlkIOStat(::taskstats & out_stats, char *, size_t) const;
void readParseAndSetThreadIOStat(::taskstats & out_stats, char *, size_t) const;
private:
int thread_schedstat_fd = -1;
int thread_stat_fd = -1;
int thread_io_fd = -1;
/// This field is used for compatibility with TasksStatsCounters::incrementProfileEvents()
unsigned short stats_version = 1;
};
}
#endif

View File

@ -0,0 +1,109 @@
#include "ThreadProfileEvents.h"
#if defined(__linux__)
#include "TaskStatsInfoGetter.h"
#include "ProcfsMetricsProvider.h"
#include <optional>
namespace DB
{
bool TasksStatsCounters::checkIfAvailable()
{
return findBestAvailableProvider() != MetricsProvider::None;
}
std::unique_ptr<TasksStatsCounters> TasksStatsCounters::create(const UInt64 tid)
{
std::unique_ptr<TasksStatsCounters> instance;
if (checkIfAvailable())
instance.reset(new TasksStatsCounters(tid, findBestAvailableProvider()));
return instance;
}
TasksStatsCounters::MetricsProvider TasksStatsCounters::findBestAvailableProvider()
{
/// This initialization is thread-safe and executed once since C++11
static std::optional<MetricsProvider> provider =
[]() -> MetricsProvider
{
if (TaskStatsInfoGetter::checkPermissions())
{
return MetricsProvider::Netlink;
}
else if (ProcfsMetricsProvider::isAvailable())
{
return MetricsProvider::Procfs;
}
return MetricsProvider::None;
}();
return *provider;
}
TasksStatsCounters::TasksStatsCounters(const UInt64 tid, const MetricsProvider provider)
{
switch (provider)
{
case MetricsProvider::Netlink:
stats_getter = [metrics_provider = std::make_shared<TaskStatsInfoGetter>(), tid]()
{
::taskstats result;
metrics_provider->getStat(result, tid);
return result;
};
break;
case MetricsProvider::Procfs:
stats_getter = [metrics_provider = std::make_shared<ProcfsMetricsProvider>(tid)]()
{
::taskstats result;
metrics_provider->getTaskStats(result);
return result;
};
break;
case MetricsProvider::None:
;
}
}
void TasksStatsCounters::reset()
{
if (stats_getter)
stats = stats_getter();
}
void TasksStatsCounters::updateCounters(ProfileEvents::Counters & profile_events)
{
if (!stats_getter)
return;
const auto new_stats = stats_getter();
incrementProfileEvents(stats, new_stats, profile_events);
stats = new_stats;
}
void TasksStatsCounters::incrementProfileEvents(const ::taskstats & prev, const ::taskstats & curr, ProfileEvents::Counters & profile_events)
{
profile_events.increment(ProfileEvents::OSCPUWaitMicroseconds,
safeDiff(prev.cpu_delay_total, curr.cpu_delay_total) / 1000U);
profile_events.increment(ProfileEvents::OSIOWaitMicroseconds,
safeDiff(prev.blkio_delay_total, curr.blkio_delay_total) / 1000U);
profile_events.increment(ProfileEvents::OSCPUVirtualTimeMicroseconds,
safeDiff(prev.cpu_run_virtual_total, curr.cpu_run_virtual_total) / 1000U);
/// Since TASKSTATS_VERSION = 3 extended accounting and IO accounting is available.
if (curr.version < 3)
return;
profile_events.increment(ProfileEvents::OSReadChars, safeDiff(prev.read_char, curr.read_char));
profile_events.increment(ProfileEvents::OSWriteChars, safeDiff(prev.write_char, curr.write_char));
profile_events.increment(ProfileEvents::OSReadBytes, safeDiff(prev.read_bytes, curr.read_bytes));
profile_events.increment(ProfileEvents::OSWriteBytes, safeDiff(prev.write_bytes, curr.write_bytes));
}
}
#endif

View File

@ -6,13 +6,13 @@
#include <sys/resource.h> #include <sys/resource.h>
#include <pthread.h> #include <pthread.h>
#if defined(__linux__) #if defined(__linux__)
#include <linux/taskstats.h> #include <linux/taskstats.h>
#else #else
struct taskstats {}; struct taskstats {};
#endif #endif
/** Implement ProfileEvents with statistics about resource consumption of the current thread. /** Implement ProfileEvents with statistics about resource consumption of the current thread.
*/ */
@ -37,7 +37,6 @@ namespace ProfileEvents
#endif #endif
} }
namespace DB namespace DB
{ {
@ -117,53 +116,48 @@ struct RUsageCounters
} }
}; };
#if defined(__linux__) #if defined(__linux__)
struct TasksStatsCounters class TasksStatsCounters
{ {
::taskstats stat; public:
static bool checkIfAvailable();
static std::unique_ptr<TasksStatsCounters> create(const UInt64 tid);
TasksStatsCounters() = default; void reset();
void updateCounters(ProfileEvents::Counters & profile_events);
static TasksStatsCounters current(); private:
::taskstats stats; //-V730_NOINIT
std::function<::taskstats()> stats_getter;
static void incrementProfileEvents(const TasksStatsCounters & prev, const TasksStatsCounters & curr, ProfileEvents::Counters & profile_events) enum class MetricsProvider
{ {
profile_events.increment(ProfileEvents::OSCPUWaitMicroseconds, None,
safeDiff(prev.stat.cpu_delay_total, curr.stat.cpu_delay_total) / 1000U); Procfs,
profile_events.increment(ProfileEvents::OSIOWaitMicroseconds, Netlink
safeDiff(prev.stat.blkio_delay_total, curr.stat.blkio_delay_total) / 1000U); };
profile_events.increment(ProfileEvents::OSCPUVirtualTimeMicroseconds,
safeDiff(prev.stat.cpu_run_virtual_total, curr.stat.cpu_run_virtual_total) / 1000U);
/// Since TASKSTATS_VERSION = 3 extended accounting and IO accounting is available. private:
if (curr.stat.version < 3) explicit TasksStatsCounters(const UInt64 tid, const MetricsProvider provider);
return;
profile_events.increment(ProfileEvents::OSReadChars, safeDiff(prev.stat.read_char, curr.stat.read_char)); static MetricsProvider findBestAvailableProvider();
profile_events.increment(ProfileEvents::OSWriteChars, safeDiff(prev.stat.write_char, curr.stat.write_char)); static void incrementProfileEvents(const ::taskstats & prev, const ::taskstats & curr, ProfileEvents::Counters & profile_events);
profile_events.increment(ProfileEvents::OSReadBytes, safeDiff(prev.stat.read_bytes, curr.stat.read_bytes));
profile_events.increment(ProfileEvents::OSWriteBytes, safeDiff(prev.stat.write_bytes, curr.stat.write_bytes));
}
static void updateProfileEvents(TasksStatsCounters & last_counters, ProfileEvents::Counters & profile_events)
{
auto current_counters = current();
incrementProfileEvents(last_counters, current_counters, profile_events);
last_counters = current_counters;
}
}; };
#else #else
struct TasksStatsCounters class TasksStatsCounters
{ {
::taskstats stat; public:
static bool checkIfAvailable() { return false; }
static std::unique_ptr<TasksStatsCounters> create(const UInt64 /*tid*/) { return {}; }
static TasksStatsCounters current(); void reset() {}
static void incrementProfileEvents(const TasksStatsCounters &, const TasksStatsCounters &, ProfileEvents::Counters &) {} void updateCounters(ProfileEvents::Counters &) {}
static void updateProfileEvents(TasksStatsCounters &, ProfileEvents::Counters &) {}
private:
TasksStatsCounters(const UInt64 /*tid*/) {}
}; };
#endif #endif

View File

@ -3,7 +3,6 @@
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/ThreadProfileEvents.h> #include <Common/ThreadProfileEvents.h>
#include <Common/TaskStatsInfoGetter.h>
#include <Common/QueryProfiler.h> #include <Common/QueryProfiler.h>
#include <Common/ThreadStatus.h> #include <Common/ThreadStatus.h>
@ -24,19 +23,10 @@ namespace ErrorCodes
thread_local ThreadStatus * current_thread = nullptr; thread_local ThreadStatus * current_thread = nullptr;
TasksStatsCounters TasksStatsCounters::current()
{
TasksStatsCounters res;
CurrentThread::get().taskstats_getter->getStat(res.stat, CurrentThread::get().thread_id);
return res;
}
ThreadStatus::ThreadStatus() ThreadStatus::ThreadStatus()
: thread_id{getThreadId()}
{ {
thread_id = getThreadId();
last_rusage = std::make_unique<RUsageCounters>(); last_rusage = std::make_unique<RUsageCounters>();
last_taskstats = std::make_unique<TasksStatsCounters>();
memory_tracker.setDescription("(for thread)"); memory_tracker.setDescription("(for thread)");
log = &Poco::Logger::get("ThreadStatus"); log = &Poco::Logger::get("ThreadStatus");
@ -82,22 +72,19 @@ void ThreadStatus::initPerformanceCounters()
++queries_started; ++queries_started;
*last_rusage = RUsageCounters::current(query_start_time_nanoseconds); *last_rusage = RUsageCounters::current(query_start_time_nanoseconds);
if (!taskstats)
try
{ {
if (TaskStatsInfoGetter::checkPermissions()) try
{ {
if (!taskstats_getter) taskstats = TasksStatsCounters::create(thread_id);
taskstats_getter = std::make_unique<TaskStatsInfoGetter>(); }
catch (...)
*last_taskstats = TasksStatsCounters::current(); {
tryLogCurrentException(log);
} }
} }
catch (...) if (taskstats)
{ taskstats->reset();
taskstats_getter.reset();
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
void ThreadStatus::updatePerformanceCounters() void ThreadStatus::updatePerformanceCounters()
@ -105,8 +92,8 @@ void ThreadStatus::updatePerformanceCounters()
try try
{ {
RUsageCounters::updateProfileEvents(*last_rusage, performance_counters); RUsageCounters::updateProfileEvents(*last_rusage, performance_counters);
if (taskstats_getter) if (taskstats)
TasksStatsCounters::updateProfileEvents(*last_taskstats, performance_counters); taskstats->updateCounters(performance_counters);
} }
catch (...) catch (...)
{ {

View File

@ -31,7 +31,7 @@ class ThreadStatus;
class QueryProfilerReal; class QueryProfilerReal;
class QueryProfilerCpu; class QueryProfilerCpu;
class QueryThreadLog; class QueryThreadLog;
struct TasksStatsCounters; class TasksStatsCounters;
struct RUsageCounters; struct RUsageCounters;
class TaskStatsInfoGetter; class TaskStatsInfoGetter;
class InternalTextLogsQueue; class InternalTextLogsQueue;
@ -88,7 +88,7 @@ public:
~ThreadStatus(); ~ThreadStatus();
/// Linux's PID (or TGID) (the same id is shown by ps util) /// Linux's PID (or TGID) (the same id is shown by ps util)
UInt64 thread_id = 0; const UInt64 thread_id = 0;
/// Also called "nice" value. If it was changed to non-zero (when attaching query) - will be reset to zero when query is detached. /// Also called "nice" value. If it was changed to non-zero (when attaching query) - will be reset to zero when query is detached.
Int32 os_thread_priority = 0; Int32 os_thread_priority = 0;
@ -191,14 +191,10 @@ protected:
Poco::Logger * log = nullptr; Poco::Logger * log = nullptr;
friend class CurrentThread; friend class CurrentThread;
friend struct TasksStatsCounters;
/// Use ptr not to add extra dependencies in the header /// Use ptr not to add extra dependencies in the header
std::unique_ptr<RUsageCounters> last_rusage; std::unique_ptr<RUsageCounters> last_rusage;
std::unique_ptr<TasksStatsCounters> last_taskstats; std::unique_ptr<TasksStatsCounters> taskstats;
/// Set to non-nullptr only if we have enough capabilities.
std::unique_ptr<TaskStatsInfoGetter> taskstats_getter;
private: private:
void setupState(const ThreadGroupStatusPtr & thread_group_); void setupState(const ThreadGroupStatusPtr & thread_group_);

View File

@ -71,3 +71,6 @@ target_link_libraries (chaos_sanitizer PRIVATE clickhouse_common_io)
add_executable (memory_statistics_os_perf memory_statistics_os_perf.cpp) add_executable (memory_statistics_os_perf memory_statistics_os_perf.cpp)
target_link_libraries (memory_statistics_os_perf PRIVATE clickhouse_common_io) target_link_libraries (memory_statistics_os_perf PRIVATE clickhouse_common_io)
add_executable (procfs_metrics_provider_perf procfs_metrics_provider_perf.cpp)
target_link_libraries (procfs_metrics_provider_perf PRIVATE clickhouse_common_io)

View File

@ -0,0 +1,41 @@
#if defined(__linux__)
#include <Common/ProcfsMetricsProvider.h>
#include <iostream>
#include <linux/taskstats.h>
#endif
#if defined(__linux__)
int main(int argc, char ** argv)
{
using namespace DB;
size_t num_iterations = argc >= 2 ? std::stoull(argv[1]) : 1000000;
if (!ProcfsMetricsProvider::isAvailable())
{
std::cerr << "Procfs statistics is not available on this system" << std::endl;
return -1;
}
ProcfsMetricsProvider stats_provider(0);
::taskstats stats;
stats_provider.getTaskStats(stats);
const auto start_cpu_time = stats.cpu_run_virtual_total;
for (size_t i = 0; i < num_iterations; ++i)
{
stats_provider.getTaskStats(stats);
}
if (num_iterations)
std::cerr << stats.cpu_run_virtual_total - start_cpu_time << '\n';
return 0;
}
#else
int main()
{
}
#endif

View File

@ -73,6 +73,7 @@ SRCS(
parseRemoteDescription.cpp parseRemoteDescription.cpp
PipeFDs.cpp PipeFDs.cpp
PODArray.cpp PODArray.cpp
ProcfsMetricsProvider.cpp
ProfileEvents.cpp ProfileEvents.cpp
QueryProfiler.cpp QueryProfiler.cpp
quoteString.cpp quoteString.cpp
@ -95,6 +96,7 @@ SRCS(
TerminalSize.cpp TerminalSize.cpp
thread_local_rng.cpp thread_local_rng.cpp
ThreadFuzzer.cpp ThreadFuzzer.cpp
ThreadProfileEvents.cpp
ThreadPool.cpp ThreadPool.cpp
ThreadStatus.cpp ThreadStatus.cpp
TraceCollector.cpp TraceCollector.cpp

View File

@ -0,0 +1,2 @@
1
Test OK

View File

@ -0,0 +1,41 @@
#!/usr/bin/env bash
# Sandbox does not provide CAP_NET_ADMIN capability but does have ProcFS mounted at /proc
# This ensures that OS metrics can be collected
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
function read_numbers_func()
{
$CLICKHOUSE_CLIENT -q "
SELECT * FROM numbers(600000000) FORMAT Null SETTINGS max_threads = 1
";
}
function show_processes_func()
{
sleep 0.1;
# These two system metrics for the generating query above are guaranteed to be nonzero when ProcFS is mounted at /proc
$CLICKHOUSE_CLIENT -q "
SELECT count() > 0 FROM system.processes\
WHERE has(ProfileEvents.Names, 'OSCPUVirtualTimeMicroseconds') AND has(ProfileEvents.Names, 'OSReadChars')\
SETTINGS max_threads = 1
";
}
export -f read_numbers_func;
export -f show_processes_func;
TIMEOUT=3
timeout $TIMEOUT bash -c read_numbers_func &
timeout $TIMEOUT bash -c show_processes_func &
wait
echo "Test OK"