Rewritten ThreadStatus via adding shared thread state. [#CLICKHOUSE-2910]

Fixed race condition in SystemLog.
This commit is contained in:
Vitaliy Lyudvichenko 2018-06-19 23:30:35 +03:00
parent 8dc1bebf00
commit e13ba09004
21 changed files with 399 additions and 303 deletions

View File

@ -209,7 +209,13 @@ void HTTPHandler::processQuery(
Poco::Net::HTTPServerResponse & response,
Output & used_output)
{
Context context = server.context();
context.setGlobalContext(server.context());
CurrentThread::initializeQuery();
/// It will forcibly detach query even if unexpected error ocurred and detachQuery() was not called
/// Normal detaching is happen in BlockIO callbacks
SCOPE_EXIT({CurrentThread::detachQueryIfNotDetached();});
LOG_TRACE(log, "Request URI: " << request.getURI());
@ -260,14 +266,9 @@ void HTTPHandler::processQuery(
}
std::string query_id = params.get("query_id", "");
const auto & config = server.config();
Context context = server.context();
context.setGlobalContext(server.context());
context.setUser(user, password, request.clientAddress(), quota_key);
context.setCurrentQueryId(query_id);
CurrentThread::attachQueryContext(context);
/// The user could specify session identifier and session timeout.
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
@ -276,6 +277,7 @@ void HTTPHandler::processQuery(
String session_id;
std::chrono::steady_clock::duration session_timeout;
bool session_is_set = params.has("session_id");
const auto & config = server.config();
if (session_is_set)
{

View File

@ -160,7 +160,7 @@ void TCPHandler::runImpl()
{
state.logs_queue = std::make_shared<InternalTextLogsQueue>();
state.logs_queue->max_priority = Poco::Logger::parseLevel(query_context.getSettingsRef().server_logs_level.value);
CurrentThread::attachSystemLogsQueue(state.logs_queue);
CurrentThread::attachInternalTextLogsQueue(state.logs_queue);
}
query_context.setExternalTablesInitializer([&global_settings, this] (Context & context) {
@ -264,6 +264,9 @@ void TCPHandler::runImpl()
try
{
/// It will forcibly detach query even if unexpected error ocсurred and detachQuery() was not called
CurrentThread::detachQueryIfNotDetached();
state.reset();
}
catch (...)

View File

@ -7,6 +7,7 @@
#include <Common/CurrentThread.h>
#include <common/logger_useful.h>
#include <chrono>
#include <ext/scope_guard.h>
namespace CurrentMetrics
@ -141,6 +142,12 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size)
{
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads");
/// Put all threads of both thread pools to one thread group
/// The master thread exits immediately
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
CurrentThread::detachQuery();
threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
@ -213,7 +220,10 @@ void BackgroundSchedulePool::threadFunction()
{
setThreadName("BackgrSchedPool");
CurrentThread::attachQuery(nullptr);
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool);
while (!shutdown)
@ -231,6 +241,10 @@ void BackgroundSchedulePool::delayExecutionThreadFunction()
{
setThreadName("BckSchPoolDelay");
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
while (!shutdown)
{
TaskInfoPtr task;

View File

@ -12,6 +12,7 @@
#include <functional>
#include <boost/noncopyable.hpp>
#include <Common/ZooKeeper/Types.h>
#include <Common/CurrentThread.h>
namespace DB
{
@ -135,6 +136,9 @@ private:
std::thread delayed_thread;
/// Tasks ordered by scheduled time.
DelayedTasks delayed_tasks;
/// Thread group used for profiling purposes
ThreadGroupStatusPtr thread_group;
};
using BackgroundSchedulePoolPtr = std::shared_ptr<BackgroundSchedulePool>;

View File

@ -35,34 +35,19 @@ void CurrentThread::initializeQuery()
getCurrentThreadImpl()->initializeQuery();
}
void CurrentThread::attachQuery(QueryStatus * parent_process)
void CurrentThread::attachTo(const ThreadGroupStatusPtr & thread_group)
{
ThreadStatusPtr thread = getCurrentThreadImpl();
if (!parent_process)
thread->attachQuery(nullptr, nullptr, nullptr, CurrentThread::getInternalTextLogsQueue());
else
{
thread->attachQuery(
parent_process, &parent_process->performance_counters, &parent_process->memory_tracker,
CurrentThread::getInternalTextLogsQueue());
}
getCurrentThreadImpl()->attachQuery(thread_group, true);
}
void CurrentThread::attachQueryFromSiblingThread(const ThreadStatusPtr & sibling_thread)
void CurrentThread::attachToIfDetached(const ThreadGroupStatusPtr & thread_group)
{
attachQueryFromSiblingThreadImpl(sibling_thread, true);
}
void CurrentThread::attachQueryFromSiblingThreadIfDetached(const ThreadStatusPtr & sibling_thread)
{
attachQueryFromSiblingThreadImpl(sibling_thread, false);
getCurrentThreadImpl()->attachQuery(thread_group, false);
}
void CurrentThread::updatePerformanceCounters()
{
getCurrentThreadImpl()->updatePerformanceCountersImpl();
getCurrentThreadImpl()->updatePerformanceCounters();
}
ThreadStatusPtr CurrentThread::get()
@ -100,49 +85,9 @@ void CurrentThread::updateProgressOut(const Progress & value)
current_thread->progress_out.incrementPiecewiseAtomically(value);
}
void CurrentThread::attachQueryFromSiblingThreadImpl(ThreadStatusPtr sibling_thread, bool check_detached)
void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue)
{
if (sibling_thread == nullptr)
throw Exception("Sibling thread was not initialized", ErrorCodes::LOGICAL_ERROR);
ThreadStatusPtr thread = getCurrentThreadImpl();
if (sibling_thread->getCurrentState() == ThreadStatus::ThreadState::QueryInitializing)
{
LOG_WARNING(thread->log, "An attempt to \'fork\' from initializing thread detected."
<< " Performance statistics for this thread will be inaccurate");
}
QueryStatus * parent_query;
ProfileEvents::Counters * parent_counters;
MemoryTracker * parent_memory_tracker;
InternalTextLogsQueueWeakPtr logs_queue_ptr;
{
/// NOTE: It is almost the only place where ThreadStatus::mutex is required
/// In other cases ThreadStatus must be accessed only from the current_thread
std::lock_guard lock(sibling_thread->mutex);
parent_query = sibling_thread->parent_query;
if (parent_query)
{
parent_counters = &parent_query->performance_counters;
parent_memory_tracker = &parent_query->memory_tracker;
}
else
{
/// Fallback
parent_counters = sibling_thread->performance_counters.getParent();
parent_memory_tracker = sibling_thread->memory_tracker.getParent();
}
logs_queue_ptr = sibling_thread->logs_queue_ptr;
}
thread->attachQuery(parent_query, parent_counters, parent_memory_tracker, logs_queue_ptr, check_detached);
}
void CurrentThread::attachSystemLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue)
{
getCurrentThreadImpl()->attachSystemLogsQueue(logs_queue);
getCurrentThreadImpl()->attachInternalTextLogsQueue(logs_queue);
}
std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue()
@ -164,13 +109,22 @@ std::string CurrentThread::getCurrentQueryID()
if (!current_thread || current_thread.use_count() <= 0)
return {};
if (current_thread->parent_query)
return current_thread->parent_query->client_info.current_query_id;
return current_thread->getQueryID();
}
if (current_thread->query_context)
return current_thread->query_context->getClientInfo().current_query_id;
ThreadGroupStatusPtr CurrentThread::getGroup()
{
return getCurrentThreadImpl()->getThreadGroup();
}
return {};
void CurrentThread::attachQueryContext(Context & query_context)
{
return getCurrentThreadImpl()->attachQueryContext(query_context);
}
void CurrentThread::finalizePerformanceCounters()
{
getCurrentThreadImpl()->finalizePerformanceCounters();
}
}

View File

@ -14,11 +14,14 @@ class MemoryTracker;
namespace DB
{
class Context;
class QueryStatus;
class ThreadStatus;
struct Progress;
using ThreadStatusPtr = std::shared_ptr<ThreadStatus>;
class InternalTextLogsQueue;
class ThreadGroupStatus;
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
class CurrentThread
@ -26,25 +29,28 @@ class CurrentThread
public:
static ThreadStatusPtr get();
static ThreadGroupStatusPtr getGroup();
/// Call when thread accepted connection (but haven't called executeQuery())
/// Currently it is used only for debugging
/// Call from master thread as soon as possible (e.g. when thread accepted connection)
static void initializeQuery();
/// A logs queue used by TCPHandler to pass logs to a client
static void attachSystemLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue);
static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue);
static std::shared_ptr<InternalTextLogsQueue> getInternalTextLogsQueue();
/// Sets query_context for current thread group
static void attachQueryContext(Context & query_context);
/// You must call one of these methods when create a child thread:
/// Bundles the current thread with a query
static void attachQuery(QueryStatus * parent_process);
/// Bundles the current thread with a query bundled to the sibling thread
static void attachQueryFromSiblingThread(const ThreadStatusPtr & sibling_thread);
static void attachTo(const ThreadGroupStatusPtr & thread_group);
/// Is useful for a ThreadPool tasks
static void attachQueryFromSiblingThreadIfDetached(const ThreadStatusPtr & sibling_thread);
static void attachToIfDetached(const ThreadGroupStatusPtr & thread_group);
/// Makes system calls to update ProfileEvents derived from rusage and taskstats
static void updatePerformanceCounters();
/// Update ProfileEvents and dumps info to system.query_thread_log
static void finalizePerformanceCounters();
static ProfileEvents::Counters & getProfileEvents();
static MemoryTracker & getMemoryTracker();
@ -58,9 +64,6 @@ public:
/// Non-master threads call this method in destructor automatically
static void detachQuery();
static void detachQueryIfNotDetached();
private:
static void attachQueryFromSiblingThreadImpl(ThreadStatusPtr sibling_thread, bool check_detached = true);
};
}

View File

@ -18,7 +18,7 @@ namespace DB
MemoryTracker::~MemoryTracker()
{
if (level != VariableContext::Thread && peak)
if (static_cast<int>(level) < static_cast<int>(VariableContext::Process) && peak)
{
try
{

View File

@ -57,7 +57,7 @@ public:
{
ThreadStatus & thread = *CurrentThread::get();
LOG_DEBUG(thread.log, "Thread " << thread.thread_number << " exited");
LOG_TRACE(thread.log, "Thread " << thread.thread_number << " exited");
thread.detachQuery(true, true);
}
catch (...)
@ -218,18 +218,22 @@ ThreadStatus::~ThreadStatus() = default;
void ThreadStatus::initializeQuery()
{
if (thread_state != ThreadState::QueryInitializing && thread_state != ThreadState::DetachedFromQuery)
throw Exception("Unexpected thread state " + std::to_string(getCurrentState()) + __PRETTY_FUNCTION__, ErrorCodes::LOGICAL_ERROR);
assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__);
thread_state = ThreadState::QueryInitializing;
thread_group = std::make_shared<ThreadGroupStatus>();
performance_counters.setParent(&thread_group->performance_counters);
memory_tracker.setParent(&thread_group->memory_tracker);
thread_group->memory_tracker.setDescription("(for query)");
thread_group->master_thread = shared_from_this();
thread_group->thread_statuses.emplace(thread_number, shared_from_this());
initPerformanceCounters();
thread_state = ThreadState::AttachedToQuery;
}
void ThreadStatus::attachQuery(
QueryStatus * parent_query_,
ProfileEvents::Counters * parent_counters,
MemoryTracker * parent_memory_tracker,
const InternalTextLogsQueueWeakPtr & logs_queue_ptr_,
bool check_detached)
void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached)
{
if (thread_state == ThreadState::AttachedToQuery)
{
@ -238,50 +242,68 @@ void ThreadStatus::attachQuery(
return;
}
if (thread_state != ThreadState::DetachedFromQuery && thread_state != ThreadState::QueryInitializing)
throw Exception("Unexpected thread state " + std::to_string(getCurrentState()) + __PRETTY_FUNCTION__, ErrorCodes::LOGICAL_ERROR);
assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__);
if (!thread_group_)
throw Exception("Attempt to attach to nullptr thread group", ErrorCodes::LOGICAL_ERROR);
/// Attach current thread to thread group and copy useful information from it
thread_group = thread_group_;
performance_counters.setParent(&thread_group->performance_counters);
memory_tracker.setParent(&thread_group->memory_tracker);
{
std::lock_guard lock(mutex);
parent_query = parent_query_;
performance_counters.setParent(parent_counters);
memory_tracker.setParent(parent_memory_tracker);
logs_queue_ptr = logs_queue_ptr_;
std::unique_lock lock(thread_group->mutex);
logs_queue_ptr = thread_group->logs_queue_ptr;
query_context = thread_group->query_context;
if (!global_context)
global_context = thread_group->global_context;
if (!thread_group->thread_statuses.emplace(thread_number, shared_from_this()).second)
throw Exception("Thread " + std::to_string(thread_number) + " is attached twice", ErrorCodes::LOGICAL_ERROR);
}
initPerformanceCounters();
thread_state = ThreadState::AttachedToQuery;
}
void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
{
if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery)
{
thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;
return;
}
assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__);
finalizePerformanceCounters();
/// For better logging ({query_id} will be shown here)
if (thread_group && thread_group.use_count() == 1)
thread_group->memory_tracker.logPeakMemoryUsage();
/// Detach from thread group
performance_counters.setParent(&ProfileEvents::global_counters);
memory_tracker.setParent(nullptr);
query_context = nullptr;
thread_group.reset();
thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;
}
void ThreadStatus::initPerformanceCounters()
{
performance_counters_finalized = false;
/// Clear stats from previous query if a new query is started
/// TODO: make separate query_thread_performance_counters and thread_performance_counters
performance_counters.resetCounters();
memory_tracker.resetCounters();
memory_tracker.setDescription("(for thread)");
/// Try extract as many information as possible from ProcessList
if (parent_query)
{
/// Attach current thread to list of query threads
{
std::unique_lock lock(parent_query->threads_mutex);
if (parent_query->thread_statuses.empty())
parent_query->master_thread = shared_from_this();
if (!parent_query->thread_statuses.emplace(thread_number, shared_from_this()).second)
throw Exception("Thread " + std::to_string(thread_number) + " is attached twice", ErrorCodes::LOGICAL_ERROR);
}
query_context = parent_query->tryGetQueryContext();
if (query_context)
{
log_to_query_thread_log = query_context->getSettingsRef().log_query_threads.value != 0;
log_profile_events = query_context->getSettingsRef().log_profile_events.value != 0;
if (!getGlobalContext())
global_context = &query_context->getGlobalContext();
}
}
thread_state = ThreadState::AttachedToQuery;
query_start_time_nanoseconds = getCurrentTimeNanoseconds();
query_start_time = time(nullptr);
++queries_started;
@ -292,47 +314,7 @@ void ThreadStatus::attachQuery(
*last_taskstats = TasksStatsCounters::current();
}
void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
{
if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery)
{
thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;
return;
}
if (thread_state != ThreadState::AttachedToQuery && thread_state != ThreadState::QueryInitializing)
throw Exception("Unexpected thread state " + std::to_string(getCurrentState()) + __PRETTY_FUNCTION__, ErrorCodes::LOGICAL_ERROR);
updatePerformanceCountersImpl();
try
{
if (log_to_query_thread_log)
if (auto global_context = getGlobalContext())
if (auto thread_log = global_context->getQueryThreadLog())
logToQueryThreadLog(*thread_log);
}
catch (...)
{
tryLogCurrentException(log);
}
{
std::lock_guard lock(mutex);
/// Detach from parent
performance_counters.setParent(&ProfileEvents::global_counters);
memory_tracker.setParent(nullptr);
query_context = nullptr;
}
thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;
log_to_query_thread_log = true;
log_profile_events = true;
}
void ThreadStatus::updatePerformanceCountersImpl()
void ThreadStatus::updatePerformanceCounters()
{
try
{
@ -346,6 +328,28 @@ void ThreadStatus::updatePerformanceCountersImpl()
}
}
void ThreadStatus::finalizePerformanceCounters()
{
if (performance_counters_finalized)
return;
performance_counters_finalized = true;
updatePerformanceCounters();
try
{
bool log_to_query_thread_log = global_context && query_context && query_context->getSettingsRef().log_query_threads.value != 0;
if (log_to_query_thread_log)
if (auto thread_log = global_context->getQueryThreadLog())
logToQueryThreadLog(*thread_log);
}
catch (...)
{
tryLogCurrentException(log);
}
}
void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
{
QueryThreadLogElement elem;
@ -364,40 +368,83 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
elem.thread_number = thread_number;
elem.os_thread_id = os_thread_id;
if (parent_query)
if (thread_group)
{
{
std::shared_lock threads_mutex(parent_query->threads_mutex);
std::shared_lock lock(thread_group->mutex);
if (parent_query->master_thread)
if (thread_group->master_thread)
{
elem.master_thread_number = parent_query->master_thread->thread_number;
elem.master_os_thread_id = parent_query->master_thread->os_thread_id;
elem.master_thread_number = thread_group->master_thread->thread_number;
elem.master_os_thread_id = thread_group->master_thread->os_thread_id;
}
elem.query = thread_group->query;
}
}
elem.query = parent_query->query;
elem.client_info = parent_query->getClientInfo();
}
if (query_context)
{
elem.client_info = query_context->getClientInfo();
if (log_profile_events)
if (query_context->getSettingsRef().log_profile_events.value != 0)
{
/// NOTE: Here we are in the same thread, so we can make memcpy()
elem.profile_counters = std::make_shared<ProfileEvents::Counters>(performance_counters.getPartiallyAtomicSnapshot());
}
}
thread_log.add(elem);
}
void ThreadStatus::clean()
void ThreadStatus::assertState(const std::initializer_list<int> & permitted_states, const char * description)
{
for (auto permitted_state : permitted_states)
{
std::lock_guard lock(mutex);
parent_query = nullptr;
if (getCurrentState() == permitted_state)
return;
}
if (thread_state != ThreadState::DetachedFromQuery && thread_state != ThreadState::Died)
throw Exception("Unexpected thread state " + std::to_string(getCurrentState()) + __PRETTY_FUNCTION__, ErrorCodes::LOGICAL_ERROR);
std::stringstream ss;
ss << "Unexpected thread state " << getCurrentState();
if (description)
ss << ": " << description;
throw Exception(ss.str(), ErrorCodes::LOGICAL_ERROR);
}
void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue)
{
logs_queue_ptr = logs_queue;
if (!thread_group)
return;
std::unique_lock lock(thread_group->mutex);
thread_group->logs_queue_ptr = logs_queue;
}
void ThreadStatus::attachQueryContext(Context & query_context_)
{
query_context = &query_context_;
if (!global_context)
global_context = &query_context->getGlobalContext();
if (!thread_group)
return;
std::unique_lock lock(thread_group->mutex);
thread_group->query_context = query_context;
if (!thread_group->global_context)
thread_group->global_context = global_context;
}
String ThreadStatus::getQueryID()
{
if (query_context)
return query_context->getClientInfo().current_query_id;
return {};
}
}

View File

@ -3,7 +3,9 @@
#include <Common/MemoryTracker.h>
#include <IO/Progress.h>
#include <memory>
#include <map>
#include <mutex>
#include <shared_mutex>
namespace Poco
@ -31,6 +33,31 @@ using ThreadStatusPtr = std::shared_ptr<ThreadStatus>;
extern thread_local ThreadStatusPtr current_thread;
class ThreadGroupStatus
{
public:
mutable std::shared_mutex mutex;
ProfileEvents::Counters performance_counters{VariableContext::Process};
MemoryTracker memory_tracker{VariableContext::Process};
Context * query_context = nullptr;
Context * global_context = nullptr;
InternalTextLogsQueueWeakPtr logs_queue_ptr;
/// Key is Poco's thread_id
using QueryThreadStatuses = std::map<UInt32, ThreadStatusPtr>;
QueryThreadStatuses thread_statuses;
ThreadStatusPtr master_thread;
String query;
};
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
class ThreadStatus : public std::enable_shared_from_this<ThreadStatus>
{
public:
@ -52,13 +79,14 @@ public:
static ThreadStatusPtr create();
/// Called by master thread when the query finishes
void clean();
ThreadGroupStatusPtr getThreadGroup() const
{
return thread_group;
}
enum ThreadState
{
DetachedFromQuery = 0, /// We just created thread or it is background thread
QueryInitializing, /// We accepted a connection, but haven't enqueued a query to ProcessList
DetachedFromQuery = 0, /// We just created thread or it is a background thread
AttachedToQuery, /// Thread executes enqueued query
Died, /// Thread does not exist
};
@ -68,21 +96,33 @@ public:
return thread_state.load(std::memory_order_relaxed);
}
String getQueryID();
/// Starts new query and create new thread group fro it, current thread becomes master thread of the query
void initializeQuery();
/// Attaches slave thread to existing thread group
void attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached = true);
InternalTextLogsQueuePtr getInternalTextLogsQueue() const
{
return thread_state == Died ? nullptr : logs_queue_ptr.lock();
}
void attachSystemLogsQueue(const InternalTextLogsQueuePtr & logs_queue)
{
std::lock_guard lock(mutex);
logs_queue_ptr = logs_queue;
}
void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue);
Context * getGlobalContext()
{
return global_context.load(std::memory_order_relaxed);
}
/// Sets query context for current thread and its thread group
/// NOTE: query_context have to be alive until detachQuery() is called
void attachQueryContext(Context & query_context);
/// Update several ProfileEvents counters
void updatePerformanceCounters();
/// Update ProfileEvents and dumps info to system.query_thread_log
void finalizePerformanceCounters();
/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false);
~ThreadStatus();
@ -90,39 +130,27 @@ protected:
ThreadStatus();
void initializeQuery();
void attachQuery(
QueryStatus * parent_query_,
ProfileEvents::Counters * parent_counters,
MemoryTracker * parent_memory_tracker,
const InternalTextLogsQueueWeakPtr & logs_queue_ptr_,
bool check_detached = true);
void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false);
void initPerformanceCounters();
void logToQueryThreadLog(QueryThreadLog & thread_log);
void updatePerformanceCountersImpl();
void assertState(const std::initializer_list<int> & permitted_states, const char * description = nullptr);
ThreadGroupStatusPtr thread_group;
std::atomic<int> thread_state{ThreadState::DetachedFromQuery};
mutable std::mutex mutex;
QueryStatus * parent_query = nullptr;
/// Is set once
std::atomic<Context *> global_context{nullptr};
Context * global_context = nullptr;
/// Use it only from current thread
Context * query_context = nullptr;
/// A logs queue used by TCPHandler to pass logs to a client
InternalTextLogsQueueWeakPtr logs_queue_ptr;
bool performance_counters_finalized = false;
UInt64 query_start_time_nanoseconds = 0;
time_t query_start_time = 0;
bool log_to_query_thread_log = true;
bool log_profile_events = true;
size_t queries_started = 0;
Poco::Logger * log = nullptr;

View File

@ -35,7 +35,7 @@ void AsynchronousBlockInputStream::next()
{
ready.reset();
pool.schedule([this, main_thread=CurrentThread::get()] ()
pool.schedule([this, thread_group=CurrentThread::getGroup()] ()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
@ -43,7 +43,7 @@ void AsynchronousBlockInputStream::next()
{
if (first)
setThreadName("AsyncBlockInput");
CurrentThread::attachQueryFromSiblingThreadIfDetached(main_thread);
CurrentThread::attachToIfDetached(thread_group);
}
catch (...)
{

View File

@ -176,10 +176,10 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
{
auto & child = children[i];
auto main_thread = CurrentThread::get();
reading_pool->schedule([&child, main_thread]
auto thread_group = CurrentThread::getGroup();
reading_pool->schedule([&child, thread_group]
{
CurrentThread::attachQueryFromSiblingThreadIfDetached(main_thread);
CurrentThread::attachToIfDetached(thread_group);
setThreadName("MergeAggReadThr");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
child->readPrefix();
@ -197,7 +197,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
*/
for (size_t i = 0; i < merging_threads; ++i)
pool.schedule([this] () { mergeThread(current_thread); } );
pool.schedule([this, thread_group=CurrentThread::getGroup()] () { mergeThread(thread_group); } );
}
}
@ -293,15 +293,16 @@ void MergingAggregatedMemoryEfficientBlockInputStream::finalize()
}
void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadStatusPtr main_thread)
void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupStatusPtr thread_group)
{
if (main_thread)
CurrentThread::attachQueryFromSiblingThread(main_thread);
setThreadName("MergeAggMergThr");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
try
{
if (thread_group)
CurrentThread::attachTo(thread_group);
setThreadName("MergeAggMergThr");
while (!parallel_merge_data->finish)
{
/** Receiving next blocks is processing by one thread pool, and merge is in another.
@ -481,10 +482,10 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
{
if (need_that_input(input))
{
auto main_thread = current_thread;
reading_pool->schedule([&input, &read_from_input, main_thread]
auto thread_group = CurrentThread::getGroup();
reading_pool->schedule([&input, &read_from_input, thread_group]
{
CurrentThread::attachQueryFromSiblingThreadIfDetached(main_thread);
CurrentThread::attachToIfDetached(thread_group);
setThreadName("MergeAggReadThr");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
read_from_input(input);

View File

@ -3,7 +3,7 @@
#include <Interpreters/Aggregator.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadStatus.h>
#include <Common/CurrentThread.h>
#include <common/ThreadPool.h>
#include <condition_variable>
@ -152,7 +152,7 @@ private:
std::unique_ptr<ParallelMergeData> parallel_merge_data;
void mergeThread(ThreadStatusPtr main_thread);
void mergeThread(ThreadGroupStatusPtr main_thread);
void finalize();
};

View File

@ -106,9 +106,9 @@ public:
{
active_threads = max_threads;
threads.reserve(max_threads);
auto main_thread = CurrentThread::get();
auto thread_group = CurrentThread::getGroup();
for (size_t i = 0; i < max_threads; ++i)
threads.emplace_back([=] () { thread(main_thread, i); } );
threads.emplace_back([=] () { thread(thread_group, i); } );
}
/// Ask all sources to stop earlier than they run out.
@ -176,16 +176,16 @@ private:
}
}
void thread(ThreadStatusPtr main_thread, size_t thread_num)
void thread(ThreadGroupStatusPtr thread_group, size_t thread_num)
{
CurrentThread::attachQueryFromSiblingThread(main_thread);
std::exception_ptr exception;
setThreadName("ParalInputsProc");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
try
{
setThreadName("ParalInputsProc");
CurrentThread::attachTo(thread_group);
while (!finish)
{
InputData unprepared_input;

View File

@ -1274,9 +1274,9 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
bool final,
ThreadPool * thread_pool) const
{
auto converter = [&](size_t bucket, ThreadStatusPtr main_thread)
auto converter = [&](size_t bucket, ThreadGroupStatusPtr thread_group)
{
CurrentThread::attachQueryFromSiblingThreadIfDetached(main_thread);
CurrentThread::attachToIfDetached(thread_group);
return convertOneBucketToBlock(data_variants, method, final, bucket);
};
@ -1291,7 +1291,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
if (method.data.impls[bucket].empty())
continue;
tasks[bucket] = std::packaged_task<Block()>(std::bind(converter, bucket, CurrentThread::get()));
tasks[bucket] = std::packaged_task<Block()>(std::bind(converter, bucket, CurrentThread::getGroup()));
if (thread_pool)
thread_pool->schedule([bucket, &tasks] { tasks[bucket](); });
@ -1721,15 +1721,15 @@ private:
return;
parallel_merge_data->pool.schedule(std::bind(&MergingAndConvertingBlockInputStream::thread, this,
max_scheduled_bucket_num, CurrentThread::get()));
max_scheduled_bucket_num, CurrentThread::getGroup()));
}
void thread(Int32 bucket_num, ThreadStatusPtr main_thread)
void thread(Int32 bucket_num, ThreadGroupStatusPtr thread_group)
{
try
{
setThreadName("MergingAggregtd");
CurrentThread::attachQueryFromSiblingThreadIfDetached(main_thread);
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
/// TODO: add no_more_keys support maybe
@ -2031,9 +2031,9 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
LOG_TRACE(log, "Merging partially aggregated two-level data.");
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadStatusPtr main_thread)
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadGroupStatusPtr thread_group)
{
CurrentThread::attachQueryFromSiblingThreadIfDetached(main_thread);
CurrentThread::attachToIfDetached(thread_group);
for (Block & block : bucket_to_blocks[bucket])
{
@ -2066,7 +2066,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
result.aggregates_pools.push_back(std::make_shared<Arena>());
Arena * aggregates_pool = result.aggregates_pools.back().get();
auto task = std::bind(merge_bucket, bucket, aggregates_pool, CurrentThread::get());
auto task = std::bind(merge_bucket, bucket, aggregates_pool, CurrentThread::getGroup());
if (thread_pool)
thread_pool->schedule(task);

View File

@ -166,11 +166,28 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
/// Query-level memory tracker is already set in the QueryStatus constructor
/// Attach master thread
CurrentThread::attachQuery(&(*process_it));
/// Actualize thread group info
{
auto thread_group = CurrentThread::getGroup();
std::unique_lock lock(thread_group->mutex);
thread_group->performance_counters.setParent(&user_process_list.user_performance_counters);
thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker);
thread_group->query = process_it->query;
/// Set memory trackers
thread_group->memory_tracker.setOrRaiseLimit(process_it->max_memory_usage);
thread_group->memory_tracker.setDescription("(for query)");
if (process_it->memory_tracker_fault_probability)
thread_group->memory_tracker.setFaultProbability(process_it->memory_tracker_fault_probability);
/// NOTE: Do not set the limit for thread-level memory tracker since it could show unreal values
/// since allocation and deallocation could happen in different threads
process_it->thread_group = std::move(thread_group);
}
if (!user_process_list.user_throttler)
{
if (settings.max_network_bandwidth_for_user)
@ -195,14 +212,6 @@ ProcessListEntry::~ProcessListEntry()
/// Destroy all streams to avoid long lock of ProcessList
it->releaseQueryStreams();
/// Finalize all threads statuses
CurrentThread::detachQueryIfNotDetached();
{
std::shared_lock lock(it->threads_mutex);
for (auto & elem : it->thread_statuses)
elem.second->clean();
}
std::lock_guard<std::mutex> lock(parent.mutex);
String user = it->getClientInfo().current_user;
@ -248,7 +257,7 @@ ProcessListEntry::~ProcessListEntry()
/// If there are no more queries for the user, then we will reset memory tracker and network throttler.
if (user_process_list.queries.empty())
user_process_list.reset();
user_process_list.resetTrackers();
/// This removes memory_tracker for all requests. At this time, no other memory_trackers live.
if (parent.processes.size() == 0)
@ -264,20 +273,17 @@ ProcessListEntry::~ProcessListEntry()
QueryStatus::QueryStatus(
const String & query_,
const ClientInfo & client_info_,
size_t max_memory_usage,
double memory_tracker_fault_probability,
size_t max_memory_usage_,
double memory_tracker_fault_probability_,
QueryPriorities::Handle && priority_handle_)
:
query(query_),
client_info(client_info_),
priority_handle(std::move(priority_handle_)),
num_queries_increment{CurrentMetrics::Query}
num_queries_increment{CurrentMetrics::Query},
max_memory_usage(max_memory_usage_),
memory_tracker_fault_probability(memory_tracker_fault_probability_)
{
memory_tracker.setOrRaiseLimit(max_memory_usage);
memory_tracker.setDescription("(for query)");
if (memory_tracker_fault_probability)
memory_tracker.setFaultProbability(memory_tracker_fault_probability);
}
QueryStatus::~QueryStatus() = default;
@ -330,8 +336,6 @@ bool QueryStatus::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStream
void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
{
user_process_list = user_process_list_;
performance_counters.setParent(&user_process_list->user_performance_counters);
memory_tracker.setParent(&user_process_list->user_memory_tracker);
}
@ -403,20 +407,24 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
res.total_rows = progress_in.total_rows;
res.written_rows = progress_out.rows;
res.written_bytes = progress_out.bytes;
res.memory_usage = memory_tracker.get();
res.peak_memory_usage = memory_tracker.getPeak();
if (thread_group)
{
res.memory_usage = thread_group->memory_tracker.get();
res.peak_memory_usage = thread_group->memory_tracker.getPeak();
if (get_thread_list)
{
std::shared_lock lock(threads_mutex);
res.thread_numbers.reserve(thread_statuses.size());
std::shared_lock lock(thread_group->mutex);
res.thread_numbers.reserve(thread_group->thread_statuses.size());
for (auto & thread_status_elem : thread_statuses)
for (auto & thread_status_elem : thread_group->thread_statuses)
res.thread_numbers.emplace_back(thread_status_elem.second->thread_number);
}
if (get_profile_events)
res.profile_counters = std::make_shared<ProfileEvents::Counters>(performance_counters.getPartiallyAtomicSnapshot());
res.profile_counters = std::make_shared<ProfileEvents::Counters>(thread_group->performance_counters.getPartiallyAtomicSnapshot());
}
if (get_settings && query_context)
res.query_settings = std::make_shared<Settings>(query_context->getSettingsRef());

View File

@ -87,6 +87,9 @@ protected:
/// Is set once when init
Context * query_context = nullptr;
/// Info about all threads involved in query execution
ThreadGroupStatusPtr thread_group;
Stopwatch watch;
/// Progress of input stream
@ -96,17 +99,11 @@ protected:
QueryPriorities::Handle priority_handle;
ProfileEvents::Counters performance_counters{VariableContext::Process};
MemoryTracker memory_tracker{VariableContext::Process};
mutable std::shared_mutex threads_mutex;
/// Key is Poco's thread_id
using QueryThreadStatuses = std::map<UInt32, ThreadStatusPtr>;
QueryThreadStatuses thread_statuses;
ThreadStatusPtr master_thread;
CurrentMetrics::Increment num_queries_increment{CurrentMetrics::Query};
size_t max_memory_usage = 0;
double memory_tracker_fault_probability = 0.0;
std::atomic<bool> is_killed { false };
void setUserProcessList(ProcessListForUser * user_process_list_);
@ -218,7 +215,7 @@ struct ProcessListForUser
/// Sometimes it is important to reset the MemoryTracker, because it may accumulate skew
/// due to the fact that there are cases when memory can be allocated while processing the query, but released later.
/// Clears network bandwidth Throttler, so it will not count periods of inactivity.
void reset()
void resetTrackers()
{
user_memory_tracker.reset();
if (user_throttler)

View File

@ -106,7 +106,7 @@ public:
}
/// Flush data in the buffer to disk
void flush();
void flush(bool quiet = false);
protected:
Context & context;
@ -115,7 +115,6 @@ protected:
const String storage_def;
StoragePtr table;
const size_t flush_interval_milliseconds;
std::mutex flush_mutex;
using QueueItem = std::pair<bool, LogElement>; /// First element is shutdown flag for thread.
@ -127,6 +126,7 @@ protected:
* than accumulation of large amount of log records (for example, for query log - processing of large amount of queries).
*/
std::vector<LogElement> data;
std::mutex data_mutex;
Logger * log;
@ -192,7 +192,16 @@ void SystemLog<LogElement>::threadFunction()
QueueItem element;
bool has_element = false;
if (data.empty())
bool is_empty;
{
std::unique_lock lock(data_mutex);
is_empty = data.empty();
}
/// data.size() is increased only in this function
/// TODO: get rid of data and queue duality
if (is_empty)
{
queue.pop(element);
has_element = true;
@ -214,14 +223,17 @@ void SystemLog<LogElement>::threadFunction()
break;
}
else
{
std::unique_lock lock(data_mutex);
data.push_back(element.second);
}
}
size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
if (milliseconds_elapsed >= flush_interval_milliseconds)
{
/// Write data to a table.
flush();
flush(true);
time_after_last_write.restart();
}
}
@ -236,12 +248,15 @@ void SystemLog<LogElement>::threadFunction()
template <typename LogElement>
void SystemLog<LogElement>::flush()
void SystemLog<LogElement>::flush(bool quiet)
{
std::lock_guard flush_lock(flush_mutex);
std::unique_lock lock(data_mutex);
try
{
if (quiet && data.empty())
return;
LOG_TRACE(log, "Flushing system log");
/// We check for existence of the table and create it as needed at every flush.

View File

@ -118,6 +118,9 @@ static void onExceptionBeforeStart(const String & query, Context & context, time
setExceptionStackTrace(elem);
logException(context, elem);
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
if (log_queries)
if (auto query_log = context.getQueryLog())
query_log->add(elem);
@ -134,6 +137,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
time_t current_time = time(nullptr);
context.setQueryContext(context);
CurrentThread::attachQueryContext(context);
const Settings & settings = context.getSettingsRef();
@ -270,7 +274,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
return;
/// Update performance counters before logging to query_log
CurrentThread::detachQuery();
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo(true, settings.log_profile_events);
@ -343,7 +347,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const Settings & settings = context.getSettingsRef();
/// Update performance counters before logging to query_log
CurrentThread::detachQuery();
CurrentThread::finalizePerformanceCounters();
if (process_list_elem)
{

View File

@ -193,10 +193,10 @@ void DistributedBlockOutputStream::waitForJobs()
ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block)
{
auto main_thread = CurrentThread::get();
return [this, main_thread, &job, &current_block]()
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block]()
{
CurrentThread::attachQueryFromSiblingThreadIfDetached(main_thread);
CurrentThread::attachToIfDetached(thread_group);
setThreadName("DistrOutStrProc");
++job.blocks_started;

View File

@ -9,6 +9,7 @@
#include <Common/CurrentThread.h>
#include <Interpreters/DNSCacheUpdater.h>
#include <ext/scope_guard.h>
#include <pcg_random.hpp>
#include <random>
@ -57,6 +58,14 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
{
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with " << size << " threads");
/// Put all threads to one thread group
/// The master thread exits immediately
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "thread_group " << thread_group.get());
CurrentThread::detachQuery();
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "thread_group " << thread_group.get());
threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
@ -115,7 +124,10 @@ void BackgroundProcessingPool::threadFunction()
{
setThreadName("BackgrProcPool");
CurrentThread::attachQuery(nullptr);
/// Put all threads to one thread pool
CurrentThread::attachTo(thread_group);
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
pcg64 rng(randomSeed());

View File

@ -12,6 +12,8 @@
#include <Poco/Event.h>
#include <Poco/Timestamp.h>
#include <Core/Types.h>
#include <Common/CurrentThread.h>
namespace DB
{
@ -64,6 +66,8 @@ protected:
std::atomic<bool> shutdown {false};
std::condition_variable wake_event;
/// Thread group used for profiling purposes
ThreadGroupStatusPtr thread_group;
void threadFunction();
};