ClickHouse/src/Common/ThreadStatus.cpp

119 lines
2.8 KiB
C++
Raw Normal View History

2018-08-22 00:24:55 +00:00
#include <sstream>
#include <Common/Exception.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/QueryProfiler.h>
2018-08-22 00:24:55 +00:00
#include <Common/ThreadStatus.h>
2018-08-22 00:24:55 +00:00
#include <Poco/Logger.h>
2020-02-02 02:35:47 +00:00
#include <common/getThreadId.h>
namespace DB
{
2018-05-29 18:14:31 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
thread_local ThreadStatus * current_thread = nullptr;
2019-01-13 18:51:57 +00:00
ThreadStatus::ThreadStatus()
: thread_id{getThreadId()}
{
last_rusage = std::make_unique<RUsageCounters>();
memory_tracker.setDescription("(for thread)");
log = &Poco::Logger::get("ThreadStatus");
2019-01-13 18:51:57 +00:00
current_thread = this;
/// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created
/// Otherwise it could lead to SIGSEGV due to current_thread dereferencing
2018-05-29 18:14:31 +00:00
}
2019-01-13 18:51:57 +00:00
ThreadStatus::~ThreadStatus()
2018-05-29 18:14:31 +00:00
{
2019-07-16 11:56:46 +00:00
try
{
if (untracked_memory > 0)
memory_tracker.alloc(untracked_memory);
else
memory_tracker.free(-untracked_memory);
}
catch (const DB::Exception &)
{
/// It's a minor tracked memory leak here (not the memory itself but it's counter).
/// We've already allocated a little bit more then the limit and cannot track it in the thread memory tracker or its parent.
}
2019-07-10 18:12:50 +00:00
2019-01-13 18:51:57 +00:00
if (deleter)
deleter();
current_thread = nullptr;
}
void ThreadStatus::updatePerformanceCounters()
{
try
{
RUsageCounters::updateProfileEvents(*last_rusage, performance_counters);
if (taskstats)
taskstats->updateCounters(performance_counters);
}
catch (...)
{
tryLogCurrentException(log);
}
}
2020-04-22 05:39:31 +00:00
void ThreadStatus::assertState(const std::initializer_list<int> & permitted_states, const char * description) const
{
for (auto permitted_state : permitted_states)
{
if (getCurrentState() == permitted_state)
return;
}
std::stringstream ss;
ss << "Unexpected thread state " << getCurrentState();
if (description)
ss << ": " << description;
throw Exception(ss.str(), ErrorCodes::LOGICAL_ERROR);
}
2019-07-09 10:39:05 +00:00
void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
LogsLevel client_logs_level)
{
logs_queue_ptr = logs_queue;
if (!thread_group)
return;
std::lock_guard lock(thread_group->mutex);
thread_group->logs_queue_ptr = logs_queue;
2019-07-09 10:39:05 +00:00
thread_group->client_logs_level = client_logs_level;
}
2020-06-20 11:17:15 +00:00
void ThreadStatus::setFatalErrorCallback(std::function<void()> callback)
{
fatal_error_callback = std::move(callback);
2020-07-09 04:15:45 +00:00
if (!thread_group)
return;
std::lock_guard lock(thread_group->mutex);
thread_group->fatal_error_callback = fatal_error_callback;
2020-06-20 11:17:15 +00:00
}
void ThreadStatus::onFatalError()
{
if (fatal_error_callback)
fatal_error_callback();
}
}