Removing garbage, part 1

This commit is contained in:
Alexey Milovidov 2019-01-13 21:51:57 +03:00
parent a6bf5a7e28
commit 00a4b2cf8a
10 changed files with 61 additions and 111 deletions

View File

@ -2,6 +2,7 @@
#include "CurrentThread.h" #include "CurrentThread.h"
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <common/likely.h>
#include <Common/ThreadStatus.h> #include <Common/ThreadStatus.h>
#include <Common/TaskStatsInfoGetter.h> #include <Common/TaskStatsInfoGetter.h>
#include <Interpreters/ProcessList.h> #include <Interpreters/ProcessList.h>
@ -10,11 +11,6 @@
#include <Poco/Logger.h> #include <Poco/Logger.h>
#if defined(ARCADIA_ROOT)
# include <util/thread/singleton.h>
#endif
namespace DB namespace DB
{ {
@ -23,91 +19,59 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
// Smoker's implementation to avoid thread_local usage: error: undefined symbol: __cxa_thread_atexit
#if defined(ARCADIA_ROOT)
struct ThreadStatusPtrHolder : ThreadStatusPtr
{
ThreadStatusPtrHolder() { ThreadStatusPtr::operator=(ThreadStatus::create()); }
};
struct ThreadScopePtrHolder : CurrentThread::ThreadScopePtr
{
ThreadScopePtrHolder() { CurrentThread::ThreadScopePtr::operator=(std::make_shared<CurrentThread::ThreadScope>()); }
};
# define current_thread (*FastTlsSingleton<ThreadStatusPtrHolder>())
# define current_thread_scope (*FastTlsSingleton<ThreadScopePtrHolder>())
#else
/// Order of current_thread and current_thread_scope matters
thread_local ThreadStatusPtr _current_thread = ThreadStatus::create();
thread_local CurrentThread::ThreadScopePtr _current_thread_scope = std::make_shared<CurrentThread::ThreadScope>();
# define current_thread _current_thread
# define current_thread_scope _current_thread_scope
#endif
void CurrentThread::updatePerformanceCounters() void CurrentThread::updatePerformanceCounters()
{ {
get()->updatePerformanceCounters(); get().updatePerformanceCounters();
} }
ThreadStatusPtr CurrentThread::get() ThreadStatus & CurrentThread::get()
{ {
#ifndef NDEBUG if (unlikely(!current_thread))
if (!current_thread || current_thread.use_count() <= 0)
throw Exception("Thread #" + std::to_string(Poco::ThreadNumber::get()) + " status was not initialized", ErrorCodes::LOGICAL_ERROR); throw Exception("Thread #" + std::to_string(Poco::ThreadNumber::get()) + " status was not initialized", ErrorCodes::LOGICAL_ERROR);
if (Poco::ThreadNumber::get() != current_thread->thread_number) return *current_thread;
throw Exception("Current thread has different thread number", ErrorCodes::LOGICAL_ERROR);
#endif
return current_thread;
}
CurrentThread::ThreadScopePtr CurrentThread::getScope()
{
return current_thread_scope;
} }
ProfileEvents::Counters & CurrentThread::getProfileEvents() ProfileEvents::Counters & CurrentThread::getProfileEvents()
{ {
return current_thread->performance_counters; return get().performance_counters;
} }
MemoryTracker & CurrentThread::getMemoryTracker() MemoryTracker & CurrentThread::getMemoryTracker()
{ {
return current_thread->memory_tracker; return get().memory_tracker;
} }
void CurrentThread::updateProgressIn(const Progress & value) void CurrentThread::updateProgressIn(const Progress & value)
{ {
current_thread->progress_in.incrementPiecewiseAtomically(value); get().progress_in.incrementPiecewiseAtomically(value);
} }
void CurrentThread::updateProgressOut(const Progress & value) void CurrentThread::updateProgressOut(const Progress & value)
{ {
current_thread->progress_out.incrementPiecewiseAtomically(value); get().progress_out.incrementPiecewiseAtomically(value);
} }
void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue) void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue)
{ {
get()->attachInternalTextLogsQueue(logs_queue); get().attachInternalTextLogsQueue(logs_queue);
} }
std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue() std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue()
{ {
/// NOTE: this method could be called at early server startup stage /// NOTE: this method could be called at early server startup stage
/// NOTE: this method could be called in ThreadStatus destructor, therefore we make use_count() check just in case if (!current_thread)
if (!current_thread || current_thread.use_count() <= 0)
return nullptr; return nullptr;
if (current_thread->getCurrentState() == ThreadStatus::ThreadState::Died) if (get().getCurrentState() == ThreadStatus::ThreadState::Died)
return nullptr; return nullptr;
return current_thread->getInternalTextLogsQueue(); return get().getInternalTextLogsQueue();
} }
ThreadGroupStatusPtr CurrentThread::getGroup() ThreadGroupStatusPtr CurrentThread::getGroup()
{ {
return get()->getThreadGroup(); return get().getThreadGroup();
} }
} }

View File

@ -32,7 +32,7 @@ class CurrentThread
{ {
public: public:
/// Handler to current thread /// Handler to current thread
static ThreadStatusPtr get(); static ThreadStatus & get();
/// Group to which belongs current thread /// Group to which belongs current thread
static ThreadGroupStatusPtr getGroup(); static ThreadGroupStatusPtr getGroup();
@ -85,25 +85,6 @@ public:
bool log_peak_memory_usage_in_destructor = true; bool log_peak_memory_usage_in_destructor = true;
}; };
/// Implicitly finalizes current thread in the destructor
class ThreadScope
{
public:
void (*deleter)() = nullptr;
ThreadScope() = default;
~ThreadScope()
{
if (deleter)
deleter();
/// std::terminate on exception: this is Ok.
}
};
using ThreadScopePtr = std::shared_ptr<ThreadScope>;
static ThreadScopePtr getScope();
private: private:
static void defaultThreadDeleter(); static void defaultThreadDeleter();
}; };

View File

@ -21,10 +21,13 @@ namespace ErrorCodes
} }
thread_local ThreadStatusPtr current_thread = nullptr;
TasksStatsCounters TasksStatsCounters::current() TasksStatsCounters TasksStatsCounters::current()
{ {
TasksStatsCounters res; TasksStatsCounters res;
CurrentThread::get()->taskstats_getter->getStat(res.stat, CurrentThread::get()->os_thread_id); CurrentThread::get().taskstats_getter->getStat(res.stat, CurrentThread::get().os_thread_id);
return res; return res;
} }
@ -39,17 +42,19 @@ ThreadStatus::ThreadStatus()
memory_tracker.setDescription("(for thread)"); memory_tracker.setDescription("(for thread)");
log = &Poco::Logger::get("ThreadStatus"); log = &Poco::Logger::get("ThreadStatus");
current_thread = this;
/// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created /// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created
/// Otherwise it could lead to SIGSEGV due to current_thread dereferencing /// Otherwise it could lead to SIGSEGV due to current_thread dereferencing
} }
ThreadStatusPtr ThreadStatus::create() ThreadStatus::~ThreadStatus()
{ {
return ThreadStatusPtr(new ThreadStatus); if (deleter)
deleter();
current_thread = nullptr;
} }
ThreadStatus::~ThreadStatus() = default;
void ThreadStatus::initPerformanceCounters() void ThreadStatus::initPerformanceCounters()
{ {
performance_counters_finalized = false; performance_counters_finalized = false;

View File

@ -9,6 +9,8 @@
#include <map> #include <map>
#include <mutex> #include <mutex>
#include <shared_mutex> #include <shared_mutex>
#include <functional>
#include <boost/noncopyable.hpp>
namespace Poco namespace Poco
@ -23,7 +25,7 @@ namespace DB
class Context; class Context;
class QueryStatus; class QueryStatus;
class ThreadStatus; class ThreadStatus;
using ThreadStatusPtr = std::shared_ptr<ThreadStatus>; using ThreadStatusPtr = ThreadStatus*;
class QueryThreadLog; class QueryThreadLog;
struct TasksStatsCounters; struct TasksStatsCounters;
struct RUsageCounters; struct RUsageCounters;
@ -67,14 +69,20 @@ public:
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>; using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
extern thread_local ThreadStatusPtr current_thread;
/** Encapsulates all per-thread info (ProfileEvents, MemoryTracker, query_id, query context, etc.). /** Encapsulates all per-thread info (ProfileEvents, MemoryTracker, query_id, query context, etc.).
* Used inside thread-local variable. See variables in CurrentThread.cpp * The object must be created in thread function and destroyed in the same thread before the exit.
* It is accessed through thread-local pointer.
* *
* This object should be used only via "CurrentThread", see CurrentThread.h * This object should be used only via "CurrentThread", see CurrentThread.h
*/ */
class ThreadStatus : public std::enable_shared_from_this<ThreadStatus> class ThreadStatus : public boost::noncopyable
{ {
public: public:
ThreadStatus();
~ThreadStatus();
/// Poco's thread number (the same number is used in logs) /// Poco's thread number (the same number is used in logs)
UInt32 thread_number = 0; UInt32 thread_number = 0;
/// Linux's PID (or TGID) (the same id is shown by ps util) /// Linux's PID (or TGID) (the same id is shown by ps util)
@ -88,8 +96,8 @@ public:
Progress progress_in; Progress progress_in;
Progress progress_out; Progress progress_out;
public: using Deleter = std::function<void()>;
static ThreadStatusPtr create(); Deleter deleter;
ThreadGroupStatusPtr getThreadGroup() const ThreadGroupStatusPtr getThreadGroup() const
{ {
@ -136,11 +144,7 @@ public:
/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped /// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false); void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false);
~ThreadStatus();
protected: protected:
ThreadStatus();
void initPerformanceCounters(); void initPerformanceCounters();
void logToQueryThreadLog(QueryThreadLog & thread_log); void logToQueryThreadLog(QueryThreadLog & thread_log);

View File

@ -23,7 +23,7 @@ void thread2(MV & x, const char * result)
} }
int main(int argc, char ** argv) int main(int, char **)
{ {
try try
{ {

View File

@ -1,6 +1,6 @@
#include <atomic> #include <atomic>
#include <iostream> #include <iostream>
#include <common/ThreadPool.h> #include <Common/ThreadPool.h>
int main(int, char **) int main(int, char **)

View File

@ -36,7 +36,7 @@ String ThreadStatus::getQueryID()
void CurrentThread::defaultThreadDeleter() void CurrentThread::defaultThreadDeleter()
{ {
ThreadStatus & thread = *CurrentThread::get(); ThreadStatus & thread = CurrentThread::get();
LOG_TRACE(thread.log, "Thread " << thread.thread_number << " exited"); LOG_TRACE(thread.log, "Thread " << thread.thread_number << " exited");
thread.detachQuery(true, true); thread.detachQuery(true, true);
} }
@ -51,8 +51,8 @@ void ThreadStatus::initializeQuery()
memory_tracker.setParent(&thread_group->memory_tracker); memory_tracker.setParent(&thread_group->memory_tracker);
thread_group->memory_tracker.setDescription("(for query)"); thread_group->memory_tracker.setDescription("(for query)");
thread_group->master_thread = shared_from_this(); thread_group->master_thread = this;
thread_group->thread_statuses.emplace(thread_number, shared_from_this()); thread_group->thread_statuses.emplace(thread_number, this);
initPerformanceCounters(); initPerformanceCounters();
thread_state = ThreadState::AttachedToQuery; thread_state = ThreadState::AttachedToQuery;
@ -87,7 +87,7 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
if (!global_context) if (!global_context)
global_context = thread_group->global_context; global_context = thread_group->global_context;
if (!thread_group->thread_statuses.emplace(thread_number, shared_from_this()).second) if (!thread_group->thread_statuses.emplace(thread_number, this).second)
throw Exception("Thread " + std::to_string(thread_number) + " is attached twice", ErrorCodes::LOGICAL_ERROR); throw Exception("Thread " + std::to_string(thread_number) + " is attached twice", ErrorCodes::LOGICAL_ERROR);
} }
@ -193,48 +193,47 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
void CurrentThread::initializeQuery() void CurrentThread::initializeQuery()
{ {
get()->initializeQuery(); get().initializeQuery();
getScope()->deleter = CurrentThread::defaultThreadDeleter; get().deleter = CurrentThread::defaultThreadDeleter;
} }
void CurrentThread::attachTo(const ThreadGroupStatusPtr & thread_group) void CurrentThread::attachTo(const ThreadGroupStatusPtr & thread_group)
{ {
get()->attachQuery(thread_group, true); get().attachQuery(thread_group, true);
getScope()->deleter = CurrentThread::defaultThreadDeleter; get().deleter = CurrentThread::defaultThreadDeleter;
} }
void CurrentThread::attachToIfDetached(const ThreadGroupStatusPtr & thread_group) void CurrentThread::attachToIfDetached(const ThreadGroupStatusPtr & thread_group)
{ {
get()->attachQuery(thread_group, false); get().attachQuery(thread_group, false);
getScope()->deleter = CurrentThread::defaultThreadDeleter; get().deleter = CurrentThread::defaultThreadDeleter;
} }
std::string CurrentThread::getCurrentQueryID() std::string CurrentThread::getCurrentQueryID()
{ {
if (!get() || get().use_count() <= 0) if (!current_thread)
return {}; return {};
return get().getQueryID();
return get()->getQueryID();
} }
void CurrentThread::attachQueryContext(Context & query_context) void CurrentThread::attachQueryContext(Context & query_context)
{ {
return get()->attachQueryContext(query_context); return get().attachQueryContext(query_context);
} }
void CurrentThread::finalizePerformanceCounters() void CurrentThread::finalizePerformanceCounters()
{ {
get()->finalizePerformanceCounters(); get().finalizePerformanceCounters();
} }
void CurrentThread::detachQuery() void CurrentThread::detachQuery()
{ {
get()->detachQuery(false); get().detachQuery(false);
} }
void CurrentThread::detachQueryIfNotDetached() void CurrentThread::detachQueryIfNotDetached()
{ {
get()->detachQuery(true); get().detachQuery(true);
} }

View File

@ -11,7 +11,7 @@
#include <Poco/Exception.h> #include <Poco/Exception.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <cstdlib> #include <cstdlib>

View File

@ -1,5 +1,5 @@
#if __APPLE__ || __FreeBSD__ #if __APPLE__ || __FreeBSD__
int main(int argc, char ** argv) { return 0; } int main(int, char **) { return 0; }
#else #else
#include <fcntl.h> #include <fcntl.h>
@ -11,7 +11,7 @@ int main(int argc, char ** argv) { return 0; }
#include <vector> #include <vector>
#include <Poco/Exception.h> #include <Poco/Exception.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
@ -22,10 +22,7 @@ int main(int argc, char ** argv) { return 0; }
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <IO/AIO.h> #include <IO/AIO.h>
#include <malloc.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__)
#include <malloc.h>
#endif
#include <sys/syscall.h> #include <sys/syscall.h>

View File

@ -16,7 +16,7 @@
#include <Poco/Exception.h> #include <Poco/Exception.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <port/clock.h> #include <port/clock.h>