mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
work with comments on PR
This commit is contained in:
parent
67ac858a52
commit
3c6deddd1d
@ -54,10 +54,10 @@ public:
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
setThreadName("UniqExactMerger");
|
setThreadName("UniqExactMerger");
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
|
@ -89,13 +89,13 @@ void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries
|
|||||||
if (!--num_active_jobs)
|
if (!--num_active_jobs)
|
||||||
event.notify_all();
|
event.notify_all();
|
||||||
if (async)
|
if (async)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (async && thread_group)
|
if (async && thread_group)
|
||||||
CurrentThread::attachTo(thread_group);
|
CurrentThread::attachToGroup(thread_group);
|
||||||
|
|
||||||
if (async)
|
if (async)
|
||||||
setThreadName("BackupWorker");
|
setThreadName("BackupWorker");
|
||||||
@ -154,13 +154,13 @@ void restoreTablesData(DataRestoreTasks && tasks, ThreadPool & thread_pool)
|
|||||||
if (!--num_active_jobs)
|
if (!--num_active_jobs)
|
||||||
event.notify_all();
|
event.notify_all();
|
||||||
if (async)
|
if (async)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (async && thread_group)
|
if (async && thread_group)
|
||||||
CurrentThread::attachTo(thread_group);
|
CurrentThread::attachToGroup(thread_group);
|
||||||
|
|
||||||
if (async)
|
if (async)
|
||||||
setThreadName("RestoreWorker");
|
setThreadName("RestoreWorker");
|
||||||
|
@ -203,7 +203,12 @@ static void incrementProfileEventsBlock(Block & dst, const Block & src)
|
|||||||
|
|
||||||
for (size_t src_row = 0; src_row < src.rows(); ++src_row)
|
for (size_t src_row = 0; src_row < src.rows(); ++src_row)
|
||||||
{
|
{
|
||||||
/// Filter out threads stats, use thead group stats
|
/// Filter out threads stats, use stats from thread group
|
||||||
|
/// Exactly stats from thread group is stored to the table system.query_log
|
||||||
|
/// The stats from threads are less useful.
|
||||||
|
/// They take more records, they need to be combined,
|
||||||
|
/// there even could be several records from one thread.
|
||||||
|
/// Server doesn't send it any more to the clients, so this code left for compatible
|
||||||
auto thread_id = src_array_thread_id[src_row];
|
auto thread_id = src_array_thread_id[src_row];
|
||||||
if (thread_id != THREAD_GROUP_ID)
|
if (thread_id != THREAD_GROUP_ID)
|
||||||
continue;
|
continue;
|
||||||
|
@ -57,14 +57,6 @@ void CurrentThread::updateProgressOut(const Progress & value)
|
|||||||
current_thread->progress_out.incrementPiecewiseAtomically(value);
|
current_thread->progress_out.incrementPiecewiseAtomically(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
|
|
||||||
LogsLevel client_logs_level)
|
|
||||||
{
|
|
||||||
if (unlikely(!current_thread))
|
|
||||||
return;
|
|
||||||
current_thread->attachInternalTextLogsQueue(logs_queue, client_logs_level);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue()
|
std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue()
|
||||||
{
|
{
|
||||||
/// NOTE: this method could be called at early server startup stage
|
/// NOTE: this method could be called at early server startup stage
|
||||||
@ -74,13 +66,6 @@ std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue()
|
|||||||
return current_thread->getInternalTextLogsQueue();
|
return current_thread->getInternalTextLogsQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentThread::attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & queue)
|
|
||||||
{
|
|
||||||
if (unlikely(!current_thread))
|
|
||||||
return;
|
|
||||||
current_thread->attachInternalProfileEventsQueue(queue);
|
|
||||||
}
|
|
||||||
|
|
||||||
InternalProfileEventsQueuePtr CurrentThread::getInternalProfileEventsQueue()
|
InternalProfileEventsQueuePtr CurrentThread::getInternalProfileEventsQueue()
|
||||||
{
|
{
|
||||||
if (unlikely(!current_thread))
|
if (unlikely(!current_thread))
|
||||||
@ -89,6 +74,15 @@ InternalProfileEventsQueuePtr CurrentThread::getInternalProfileEventsQueue()
|
|||||||
return current_thread->getInternalProfileEventsQueue();
|
return current_thread->getInternalProfileEventsQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
|
||||||
|
LogsLevel client_logs_level)
|
||||||
|
{
|
||||||
|
if (unlikely(!current_thread))
|
||||||
|
return;
|
||||||
|
current_thread->attachInternalTextLogsQueue(logs_queue, client_logs_level);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
ThreadGroupStatusPtr CurrentThread::getGroup()
|
ThreadGroupStatusPtr CurrentThread::getGroup()
|
||||||
{
|
{
|
||||||
if (unlikely(!current_thread))
|
if (unlikely(!current_thread))
|
||||||
@ -97,4 +91,12 @@ ThreadGroupStatusPtr CurrentThread::getGroup()
|
|||||||
return current_thread->getThreadGroup();
|
return current_thread->getThreadGroup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string_view CurrentThread::getQueryId()
|
||||||
|
{
|
||||||
|
if (unlikely(!current_thread))
|
||||||
|
return {};
|
||||||
|
|
||||||
|
return current_thread->getQueryId();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <string_view>
|
||||||
|
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
@ -48,6 +49,8 @@ public:
|
|||||||
static void attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & queue);
|
static void attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & queue);
|
||||||
static InternalProfileEventsQueuePtr getInternalProfileEventsQueue();
|
static InternalProfileEventsQueuePtr getInternalProfileEventsQueue();
|
||||||
|
|
||||||
|
static void attachQueryForLog(const String & query_);
|
||||||
|
|
||||||
/// Makes system calls to update ProfileEvents that contain info from rusage and taskstats
|
/// Makes system calls to update ProfileEvents that contain info from rusage and taskstats
|
||||||
static void updatePerformanceCounters();
|
static void updatePerformanceCounters();
|
||||||
|
|
||||||
@ -65,24 +68,18 @@ public:
|
|||||||
|
|
||||||
/// You must call one of these methods when create a query child thread:
|
/// You must call one of these methods when create a query child thread:
|
||||||
/// Add current thread to a group associated with the thread group
|
/// Add current thread to a group associated with the thread group
|
||||||
static void attachTo(const ThreadGroupStatusPtr & thread_group);
|
static void attachToGroup(const ThreadGroupStatusPtr & thread_group);
|
||||||
/// Is useful for a ThreadPool tasks
|
/// Is useful for a ThreadPool tasks
|
||||||
static void attachToIfDetached(const ThreadGroupStatusPtr & thread_group);
|
static void attachToGroupIfDetached(const ThreadGroupStatusPtr & thread_group);
|
||||||
|
|
||||||
/// Non-master threads call this method in destructor automatically
|
/// Non-master threads call this method in destructor automatically
|
||||||
static void detachGroupIfNotDetached();
|
static void detachFromGroupIfNotDetached();
|
||||||
static void detachQueryIfNotDetached();
|
|
||||||
|
|
||||||
/// Update ProfileEvents and dumps info to system.query_thread_log
|
/// Update ProfileEvents and dumps info to system.query_thread_log
|
||||||
static void finalizePerformanceCounters();
|
static void finalizePerformanceCounters();
|
||||||
|
|
||||||
/// Returns a non-empty string if the thread is attached to a query
|
/// Returns a non-empty string if the thread is attached to a query
|
||||||
static std::string_view getQueryId()
|
static std::string_view getQueryId();
|
||||||
{
|
|
||||||
if (unlikely(!current_thread))
|
|
||||||
return {};
|
|
||||||
return current_thread->getQueryId();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Initializes query with current thread as master thread in constructor, and detaches it in destructor
|
/// Initializes query with current thread as master thread in constructor, and detaches it in destructor
|
||||||
struct QueryScope : private boost::noncopyable
|
struct QueryScope : private boost::noncopyable
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
#include <Common/ThreadProfileEvents.h>
|
#include <Common/ThreadProfileEvents.h>
|
||||||
#include <Common/ConcurrentBoundedQueue.h>
|
|
||||||
#include <Common/QueryProfiler.h>
|
#include <Common/QueryProfiler.h>
|
||||||
#include <Common/ThreadStatus.h>
|
#include <Common/ThreadStatus.h>
|
||||||
#include <base/errnoToString.h>
|
#include <base/errnoToString.h>
|
||||||
@ -11,14 +10,12 @@
|
|||||||
#include <base/getPageSize.h>
|
#include <base/getPageSize.h>
|
||||||
|
|
||||||
#include <csignal>
|
#include <csignal>
|
||||||
#include <mutex>
|
|
||||||
#include <sys/mman.h>
|
#include <sys/mman.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
@ -71,35 +68,9 @@ static thread_local ThreadStack alt_stack;
|
|||||||
static thread_local bool has_alt_stack = false;
|
static thread_local bool has_alt_stack = false;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
ContextWeakPtr ThreadGroupStatus::getQueryContextWeak() const
|
ThreadGroupStatus::ThreadGroupStatus()
|
||||||
{
|
: master_thread_id(CurrentThread::get().thread_id)
|
||||||
return query_context;
|
{}
|
||||||
}
|
|
||||||
|
|
||||||
ContextWeakPtr ThreadGroupStatus::getGlobalContextWeak() const
|
|
||||||
{
|
|
||||||
return global_context;
|
|
||||||
}
|
|
||||||
|
|
||||||
ThreadGroupStatus::FatalErrorCallback ThreadGroupStatus::getFatalErrorCallback() const
|
|
||||||
{
|
|
||||||
return fatal_error_callback;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ThreadGroupStatus::link(ThreadStatusPtr thread)
|
|
||||||
{
|
|
||||||
std::lock_guard lock(mutex);
|
|
||||||
|
|
||||||
/// NOTE: thread may be attached multiple times if it is reused from a thread pool.
|
|
||||||
thread_ids.insert(thread->thread_id);
|
|
||||||
threads.insert(thread);
|
|
||||||
}
|
|
||||||
|
|
||||||
void ThreadGroupStatus::unlink(ThreadStatusPtr thread)
|
|
||||||
{
|
|
||||||
std::lock_guard guard(mutex);
|
|
||||||
threads.erase(thread);
|
|
||||||
}
|
|
||||||
|
|
||||||
ThreadStatus::ThreadStatus()
|
ThreadStatus::ThreadStatus()
|
||||||
: thread_id{getThreadId()}
|
: thread_id{getThreadId()}
|
||||||
@ -155,6 +126,64 @@ ThreadStatus::ThreadStatus()
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadGroupStatusPtr ThreadStatus::getThreadGroup() const
|
||||||
|
{
|
||||||
|
return thread_group;
|
||||||
|
}
|
||||||
|
|
||||||
|
const String & ThreadStatus::getQueryId() const
|
||||||
|
{
|
||||||
|
return query_id_from_query_context;
|
||||||
|
}
|
||||||
|
|
||||||
|
ContextPtr ThreadStatus::getQueryContext() const
|
||||||
|
{
|
||||||
|
return query_context.lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
ContextPtr ThreadStatus::getGlobalContext() const
|
||||||
|
{
|
||||||
|
return global_context.lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadGroupStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, LogsLevel logs_level)
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
shared_data.logs_queue_ptr = logs_queue;
|
||||||
|
shared_data.client_logs_level = logs_level;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
|
||||||
|
LogsLevel logs_level)
|
||||||
|
{
|
||||||
|
if (!thread_group)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "No thread group attached to the thread {}", thread_id);
|
||||||
|
|
||||||
|
shared_data.logs_queue_ptr = logs_queue;
|
||||||
|
shared_data.client_logs_level = logs_level;
|
||||||
|
thread_group->attachInternalTextLogsQueue(logs_queue, logs_level);
|
||||||
|
}
|
||||||
|
|
||||||
|
InternalTextLogsQueuePtr ThreadStatus::getInternalTextLogsQueue() const
|
||||||
|
{
|
||||||
|
return shared_data.logs_queue_ptr.lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
InternalProfileEventsQueuePtr ThreadStatus::getInternalProfileEventsQueue() const
|
||||||
|
{
|
||||||
|
return shared_data.profile_queue_ptr.lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
const String & ThreadStatus::getQueryForLog() const
|
||||||
|
{
|
||||||
|
return shared_data.query_for_logs;
|
||||||
|
}
|
||||||
|
|
||||||
|
LogsLevel ThreadStatus::getClientLogsLevel() const
|
||||||
|
{
|
||||||
|
return shared_data.client_logs_level;
|
||||||
|
}
|
||||||
|
|
||||||
void ThreadStatus::flushUntrackedMemory()
|
void ThreadStatus::flushUntrackedMemory()
|
||||||
{
|
{
|
||||||
if (untracked_memory == 0)
|
if (untracked_memory == 0)
|
||||||
@ -170,7 +199,7 @@ ThreadStatus::~ThreadStatus()
|
|||||||
|
|
||||||
/// It may cause segfault if query_context was destroyed, but was not detached
|
/// It may cause segfault if query_context was destroyed, but was not detached
|
||||||
auto query_context_ptr = query_context.lock();
|
auto query_context_ptr = query_context.lock();
|
||||||
assert((!query_context_ptr && query_id.empty()) || (query_context_ptr && query_id == query_context_ptr->getCurrentQueryId()));
|
assert((!query_context_ptr && getQueryId().empty()) || (query_context_ptr && getQueryId() == query_context_ptr->getCurrentQueryId()));
|
||||||
|
|
||||||
/// detachGroup if it was attached
|
/// detachGroup if it was attached
|
||||||
if (deleter)
|
if (deleter)
|
||||||
@ -196,61 +225,25 @@ void ThreadStatus::updatePerformanceCounters()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadStatus::assertState(ThreadState permitted_state, const char * description) const
|
|
||||||
{
|
|
||||||
auto curr_state = thread_state.load();
|
|
||||||
|
|
||||||
if (curr_state == permitted_state)
|
|
||||||
return;
|
|
||||||
|
|
||||||
if (description)
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected thread state {}: {}", curr_state, description);
|
|
||||||
else
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected thread state {}", curr_state);
|
|
||||||
}
|
|
||||||
|
|
||||||
void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
|
|
||||||
LogsLevel client_logs_level)
|
|
||||||
{
|
|
||||||
logs_queue_ptr = logs_queue;
|
|
||||||
chassert(thread_group);
|
|
||||||
|
|
||||||
std::lock_guard lock(thread_group->mutex);
|
|
||||||
thread_group->logs_queue_ptr = logs_queue;
|
|
||||||
thread_group->client_logs_level = client_logs_level;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ThreadStatus::attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue)
|
|
||||||
{
|
|
||||||
profile_queue_ptr = profile_queue;
|
|
||||||
|
|
||||||
chassert(thread_group);
|
|
||||||
|
|
||||||
std::lock_guard lock(thread_group->mutex);
|
|
||||||
thread_group->profile_queue_ptr = profile_queue;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ThreadStatus::onFatalError()
|
void ThreadStatus::onFatalError()
|
||||||
{
|
{
|
||||||
/// No thread group - no callback
|
|
||||||
if (!thread_group)
|
|
||||||
return;
|
|
||||||
|
|
||||||
std::lock_guard lock(thread_group->mutex);
|
|
||||||
if (fatal_error_callback)
|
if (fatal_error_callback)
|
||||||
fatal_error_callback();
|
fatal_error_callback();
|
||||||
}
|
}
|
||||||
|
|
||||||
ThreadStatus * MainThreadStatus::main_thread = nullptr;
|
ThreadStatus * MainThreadStatus::main_thread = nullptr;
|
||||||
|
|
||||||
MainThreadStatus & MainThreadStatus::getInstance()
|
MainThreadStatus & MainThreadStatus::getInstance()
|
||||||
{
|
{
|
||||||
static MainThreadStatus thread_status;
|
static MainThreadStatus thread_status;
|
||||||
return thread_status;
|
return thread_status;
|
||||||
}
|
}
|
||||||
|
|
||||||
MainThreadStatus::MainThreadStatus()
|
MainThreadStatus::MainThreadStatus()
|
||||||
{
|
{
|
||||||
main_thread = current_thread;
|
main_thread = current_thread;
|
||||||
}
|
}
|
||||||
|
|
||||||
MainThreadStatus::~MainThreadStatus()
|
MainThreadStatus::~MainThreadStatus()
|
||||||
{
|
{
|
||||||
main_thread = nullptr;
|
main_thread = nullptr;
|
||||||
|
@ -63,50 +63,57 @@ using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
|
|||||||
class ThreadGroupStatus
|
class ThreadGroupStatus
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
ThreadGroupStatus();
|
||||||
|
using FatalErrorCallback = std::function<void()>;
|
||||||
|
ThreadGroupStatus(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
|
||||||
|
|
||||||
/// The first thread created this thread group
|
/// The first thread created this thread group
|
||||||
UInt64 master_thread_id = 0;
|
const UInt64 master_thread_id;
|
||||||
|
|
||||||
|
/// Set up at creation, no race when reading
|
||||||
|
const ContextWeakPtr query_context;
|
||||||
|
const ContextWeakPtr global_context;
|
||||||
|
|
||||||
|
const FatalErrorCallback fatal_error_callback;
|
||||||
|
|
||||||
ProfileEvents::Counters performance_counters{VariableContext::Process};
|
ProfileEvents::Counters performance_counters{VariableContext::Process};
|
||||||
MemoryTracker memory_tracker{VariableContext::Process};
|
MemoryTracker memory_tracker{VariableContext::Process};
|
||||||
|
|
||||||
/// Access to the members below has to be in critical section with mutex
|
struct SharedData
|
||||||
mutable std::mutex mutex;
|
{
|
||||||
|
InternalProfileEventsQueueWeakPtr profile_queue_ptr;
|
||||||
|
|
||||||
InternalTextLogsQueueWeakPtr logs_queue_ptr;
|
InternalTextLogsQueueWeakPtr logs_queue_ptr;
|
||||||
InternalProfileEventsQueueWeakPtr profile_queue_ptr;
|
LogsLevel client_logs_level = LogsLevel::none;
|
||||||
|
|
||||||
LogsLevel client_logs_level = LogsLevel::none;
|
String query_for_logs;
|
||||||
|
UInt64 normalized_query_hash = 0;
|
||||||
|
};
|
||||||
|
|
||||||
String query;
|
SharedData getSharedData()
|
||||||
UInt64 normalized_query_hash = 0;
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
return shared_data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mutation shared data
|
||||||
|
void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, LogsLevel logs_level);
|
||||||
|
void attachQueryForLog(const String & query_, UInt64 normalized_hash = 0);
|
||||||
|
void attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue);
|
||||||
|
|
||||||
/// When new query starts, new thread group is created for it, current thread becomes master thread of the query
|
/// When new query starts, new thread group is created for it, current thread becomes master thread of the query
|
||||||
static ThreadGroupStatusPtr createForQuery(ContextPtr query_context_, std::function<void()> fatal_error_callback_ = {});
|
static ThreadGroupStatusPtr createForQuery(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
|
||||||
|
|
||||||
const std::vector<UInt64> getInvolvedThreadIds() const;
|
std::vector<UInt64> getInvolvedThreadIds() const;
|
||||||
|
void linkThread(UInt64 thread_it);
|
||||||
void link(ThreadStatusPtr thread);
|
|
||||||
void unlink(ThreadStatusPtr thread);
|
|
||||||
|
|
||||||
ContextWeakPtr getQueryContextWeak() const;
|
|
||||||
ContextWeakPtr getGlobalContextWeak() const;
|
|
||||||
|
|
||||||
using FatalErrorCallback = std::function<void()>;
|
|
||||||
FatalErrorCallback getFatalErrorCallback() const;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// Set up at creation, no race when reading
|
mutable std::mutex mutex;
|
||||||
ContextWeakPtr query_context;
|
|
||||||
ContextWeakPtr global_context;
|
|
||||||
|
|
||||||
/// Set up at creation, no race when reading
|
/// Set up at creation, no race when reading
|
||||||
FatalErrorCallback fatal_error_callback;
|
SharedData shared_data;
|
||||||
|
|
||||||
/// Set of all thread ids which has been attached to the group
|
/// Set of all thread ids which has been attached to the group
|
||||||
std::unordered_set<UInt64> thread_ids;
|
std::unordered_set<UInt64> thread_ids;
|
||||||
|
|
||||||
/// Set of active threads
|
|
||||||
std::unordered_set<ThreadStatusPtr> threads;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -150,53 +157,24 @@ public:
|
|||||||
Progress progress_in;
|
Progress progress_in;
|
||||||
Progress progress_out;
|
Progress progress_out;
|
||||||
|
|
||||||
protected:
|
private:
|
||||||
/// Group of threads, to which this thread attached
|
/// Group of threads, to which this thread attached
|
||||||
ThreadGroupStatusPtr thread_group;
|
ThreadGroupStatusPtr thread_group;
|
||||||
|
|
||||||
std::atomic<int> thread_state{ThreadState::DetachedFromQuery};
|
|
||||||
|
|
||||||
/// Is set once
|
/// Is set once
|
||||||
ContextWeakPtr global_context;
|
ContextWeakPtr global_context;
|
||||||
/// Use it only from current thread
|
/// Use it only from current thread
|
||||||
ContextWeakPtr query_context;
|
ContextWeakPtr query_context;
|
||||||
|
|
||||||
String query_id_from_query_context;
|
/// Is used to send logs from logs_queue to client in case of fatal errors.
|
||||||
|
using FatalErrorCallback = std::function<void()>;
|
||||||
|
FatalErrorCallback fatal_error_callback;
|
||||||
|
|
||||||
/// A logs queue used by TCPHandler to pass logs to a client
|
ThreadGroupStatus::SharedData shared_data;
|
||||||
InternalTextLogsQueueWeakPtr logs_queue_ptr;
|
|
||||||
|
|
||||||
InternalProfileEventsQueueWeakPtr profile_queue_ptr;
|
|
||||||
|
|
||||||
struct TimePoint
|
|
||||||
{
|
|
||||||
void setUp();
|
|
||||||
void SetUp(std::chrono::time_point<std::chrono::system_clock> now);
|
|
||||||
|
|
||||||
UInt64 nanoseconds = 0;
|
|
||||||
UInt64 microseconds = 0;
|
|
||||||
time_t seconds = 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
bool performance_counters_finalized = false;
|
bool performance_counters_finalized = false;
|
||||||
TimePoint query_start_time{};
|
|
||||||
|
|
||||||
// CPU and Real time query profilers
|
|
||||||
std::unique_ptr<QueryProfilerReal> query_profiler_real;
|
|
||||||
std::unique_ptr<QueryProfilerCPU> query_profiler_cpu;
|
|
||||||
|
|
||||||
Poco::Logger * log = nullptr;
|
|
||||||
|
|
||||||
/// Use ptr not to add extra dependencies in the header
|
|
||||||
std::unique_ptr<RUsageCounters> last_rusage;
|
|
||||||
std::unique_ptr<TasksStatsCounters> taskstats;
|
|
||||||
|
|
||||||
/// Is used to send logs from logs_queue to client in case of fatal errors.
|
|
||||||
std::function<void()> fatal_error_callback;
|
|
||||||
|
|
||||||
/// See setInternalThread()
|
|
||||||
bool internal_thread = false;
|
|
||||||
|
|
||||||
|
String query_id_from_query_context;
|
||||||
/// Requires access to query_id.
|
/// Requires access to query_id.
|
||||||
friend class MemoryTrackerThreadSwitcher;
|
friend class MemoryTrackerThreadSwitcher;
|
||||||
void setQueryId(const String & query_id_)
|
void setQueryId(const String & query_id_)
|
||||||
@ -204,39 +182,45 @@ protected:
|
|||||||
query_id_from_query_context = query_id_;
|
query_id_from_query_context = query_id_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct TimePoint
|
||||||
|
{
|
||||||
|
void setUp();
|
||||||
|
UInt64 nanoseconds() const;
|
||||||
|
UInt64 microseconds() const;
|
||||||
|
UInt64 seconds() const;
|
||||||
|
|
||||||
|
std::chrono::time_point<std::chrono::system_clock> point;
|
||||||
|
};
|
||||||
|
|
||||||
|
TimePoint query_start_time{};
|
||||||
|
|
||||||
|
// CPU and Real time query profilers
|
||||||
|
std::unique_ptr<QueryProfilerReal> query_profiler_real;
|
||||||
|
std::unique_ptr<QueryProfilerCPU> query_profiler_cpu;
|
||||||
|
|
||||||
|
/// Use ptr not to add extra dependencies in the header
|
||||||
|
std::unique_ptr<RUsageCounters> last_rusage;
|
||||||
|
std::unique_ptr<TasksStatsCounters> taskstats;
|
||||||
|
|
||||||
|
/// See setInternalThread()
|
||||||
|
bool internal_thread = false;
|
||||||
|
|
||||||
/// This is helpful for cut linking dependencies for clickhouse_common_io
|
/// This is helpful for cut linking dependencies for clickhouse_common_io
|
||||||
using Deleter = std::function<void()>;
|
using Deleter = std::function<void()>;
|
||||||
Deleter deleter;
|
Deleter deleter;
|
||||||
|
|
||||||
|
Poco::Logger * log = nullptr;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ThreadStatus();
|
ThreadStatus();
|
||||||
~ThreadStatus();
|
~ThreadStatus();
|
||||||
|
|
||||||
ThreadGroupStatusPtr getThreadGroup() const
|
ThreadGroupStatusPtr getThreadGroup() const;
|
||||||
{
|
|
||||||
return thread_group;
|
|
||||||
}
|
|
||||||
|
|
||||||
enum ThreadState
|
const String & getQueryId() const;
|
||||||
{
|
|
||||||
DetachedFromQuery = 0, /// We just created thread or it is a background thread
|
|
||||||
AttachedToQuery, /// Thread executes enqueued query
|
|
||||||
};
|
|
||||||
|
|
||||||
std::string_view getQueryId() const
|
ContextPtr getQueryContext() const;
|
||||||
{
|
ContextPtr getGlobalContext() const;
|
||||||
return query_id_from_query_context;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto getQueryContext() const
|
|
||||||
{
|
|
||||||
return query_context.lock();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto getGlobalContext() const
|
|
||||||
{
|
|
||||||
return global_context.lock();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// "Internal" ThreadStatus is used for materialized views for separate
|
/// "Internal" ThreadStatus is used for materialized views for separate
|
||||||
/// tracking into system.query_views_log
|
/// tracking into system.query_views_log
|
||||||
@ -255,29 +239,25 @@ public:
|
|||||||
void setInternalThread();
|
void setInternalThread();
|
||||||
|
|
||||||
/// Attaches slave thread to existing thread group
|
/// Attaches slave thread to existing thread group
|
||||||
void attachTo(const ThreadGroupStatusPtr & thread_group_, bool check_detached = true);
|
void attachToGroup(const ThreadGroupStatusPtr & thread_group_, bool check_detached = true);
|
||||||
|
|
||||||
/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
|
/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
|
||||||
void detachGroup();
|
void detachFromGroup();
|
||||||
|
|
||||||
/// Returns pointer to the current profile counters to restore them back.
|
/// Returns pointer to the current profile counters to restore them back.
|
||||||
/// Note: consequent call with new scope will detach previous scope.
|
/// Note: consequent call with new scope will detach previous scope.
|
||||||
ProfileEvents::Counters * attachProfileCountersScope(ProfileEvents::Counters * performance_counters_scope);
|
ProfileEvents::Counters * attachProfileCountersScope(ProfileEvents::Counters * performance_counters_scope);
|
||||||
|
|
||||||
InternalTextLogsQueuePtr getInternalTextLogsQueue() const
|
|
||||||
{
|
|
||||||
return logs_queue_ptr.lock();
|
|
||||||
}
|
|
||||||
|
|
||||||
void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
|
void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
|
||||||
LogsLevel client_logs_level);
|
LogsLevel client_logs_level);
|
||||||
|
InternalTextLogsQueuePtr getInternalTextLogsQueue() const;
|
||||||
InternalProfileEventsQueuePtr getInternalProfileEventsQueue() const
|
LogsLevel getClientLogsLevel() const;
|
||||||
{
|
|
||||||
return profile_queue_ptr.lock();
|
|
||||||
}
|
|
||||||
|
|
||||||
void attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue);
|
void attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue);
|
||||||
|
InternalProfileEventsQueuePtr getInternalProfileEventsQueue() const;
|
||||||
|
|
||||||
|
void attachQueryForLog(const String & query_);
|
||||||
|
const String & getQueryForLog() const;
|
||||||
|
|
||||||
/// Proper cal for fatal_error_callback
|
/// Proper cal for fatal_error_callback
|
||||||
void onFatalError();
|
void onFatalError();
|
||||||
@ -295,7 +275,7 @@ public:
|
|||||||
|
|
||||||
void flushUntrackedMemory();
|
void flushUntrackedMemory();
|
||||||
|
|
||||||
protected:
|
private:
|
||||||
void applyQuerySettings();
|
void applyQuerySettings();
|
||||||
|
|
||||||
void initPerformanceCounters();
|
void initPerformanceCounters();
|
||||||
@ -304,12 +284,9 @@ protected:
|
|||||||
|
|
||||||
void finalizeQueryProfiler();
|
void finalizeQueryProfiler();
|
||||||
|
|
||||||
void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database, std::chrono::time_point<std::chrono::system_clock> now);
|
void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database);
|
||||||
|
|
||||||
void assertState(ThreadState permitted_state, const char * description = nullptr) const;
|
void attachToGroupImpl(const ThreadGroupStatusPtr & thread_group_);
|
||||||
|
|
||||||
private:
|
|
||||||
void attachGroupImp(const ThreadGroupStatusPtr & thread_group_);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -34,7 +34,7 @@ namespace
|
|||||||
{ \
|
{ \
|
||||||
auto _logger = ::getLogger(logger); \
|
auto _logger = ::getLogger(logger); \
|
||||||
const bool _is_clients_log = (DB::CurrentThread::getGroup() != nullptr) && \
|
const bool _is_clients_log = (DB::CurrentThread::getGroup() != nullptr) && \
|
||||||
(DB::CurrentThread::getGroup()->client_logs_level >= (priority)); \
|
(DB::CurrentThread::get().getClientLogsLevel() >= (priority)); \
|
||||||
if (_is_clients_log || _logger->is((PRIORITY))) \
|
if (_is_clients_log || _logger->is((PRIORITY))) \
|
||||||
{ \
|
{ \
|
||||||
std::string formatted_message = numArgs(__VA_ARGS__) > 1 ? fmt::format(__VA_ARGS__) : firstArg(__VA_ARGS__); \
|
std::string formatted_message = numArgs(__VA_ARGS__) > 1 ? fmt::format(__VA_ARGS__) : firstArg(__VA_ARGS__); \
|
||||||
|
@ -312,12 +312,8 @@ private:
|
|||||||
/// It will allow client to see failure messages directly.
|
/// It will allow client to see failure messages directly.
|
||||||
if (thread_ptr)
|
if (thread_ptr)
|
||||||
{
|
{
|
||||||
query_id = std::string(thread_ptr->getQueryId());
|
query_id = thread_ptr->getQueryId();
|
||||||
|
query = thread_ptr->getQueryForLog();
|
||||||
if (auto thread_group = thread_ptr->getThreadGroup())
|
|
||||||
{
|
|
||||||
query = DB::toOneLineQuery(thread_group->query);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (auto logs_queue = thread_ptr->getInternalTextLogsQueue())
|
if (auto logs_queue = thread_ptr->getInternalTextLogsQueue())
|
||||||
{
|
{
|
||||||
|
@ -75,7 +75,7 @@ public:
|
|||||||
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
|
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
|
||||||
{
|
{
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
setThreadName("HashedDictLoad");
|
setThreadName("HashedDictLoad");
|
||||||
|
|
||||||
threadWorker(shard);
|
threadWorker(shard);
|
||||||
@ -224,7 +224,7 @@ HashedDictionary<dictionary_key_type, sparse, sharded>::~HashedDictionary()
|
|||||||
pool.trySchedule([&container, thread_group = CurrentThread::getGroup()]
|
pool.trySchedule([&container, thread_group = CurrentThread::getGroup()]
|
||||||
{
|
{
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
setThreadName("HashedDictDtor");
|
setThreadName("HashedDictDtor");
|
||||||
|
|
||||||
if constexpr (sparse)
|
if constexpr (sparse)
|
||||||
|
@ -2309,10 +2309,10 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
BlocksList blocks;
|
BlocksList blocks;
|
||||||
while (true)
|
while (true)
|
||||||
@ -3030,10 +3030,10 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
for (Block & block : bucket_to_blocks[bucket])
|
for (Block & block : bucket_to_blocks[bucket])
|
||||||
{
|
{
|
||||||
|
@ -971,11 +971,11 @@ private:
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
|
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachTo(thread_group);
|
CurrentThread::attachToGroup(thread_group);
|
||||||
|
|
||||||
LOG_TRACE(log, "Start loading object '{}'", name);
|
LOG_TRACE(log, "Start loading object '{}'", name);
|
||||||
try
|
try
|
||||||
|
@ -203,10 +203,10 @@ ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr q
|
|||||||
ProcessListForUser & user_process_list = user_process_list_it->second;
|
ProcessListForUser & user_process_list = user_process_list_it->second;
|
||||||
|
|
||||||
/// Actualize thread group info
|
/// Actualize thread group info
|
||||||
|
CurrentThread::attachQueryForLog(query_);
|
||||||
auto thread_group = CurrentThread::getGroup();
|
auto thread_group = CurrentThread::getGroup();
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
{
|
{
|
||||||
std::lock_guard lock_thread_group(thread_group->mutex);
|
|
||||||
thread_group->performance_counters.setParent(&user_process_list.user_performance_counters);
|
thread_group->performance_counters.setParent(&user_process_list.user_performance_counters);
|
||||||
thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker);
|
thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker);
|
||||||
if (user_process_list.user_temp_data_on_disk)
|
if (user_process_list.user_temp_data_on_disk)
|
||||||
@ -214,8 +214,6 @@ ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr q
|
|||||||
query_context->setTempDataOnDisk(std::make_shared<TemporaryDataOnDiskScope>(
|
query_context->setTempDataOnDisk(std::make_shared<TemporaryDataOnDiskScope>(
|
||||||
user_process_list.user_temp_data_on_disk, settings.max_temporary_data_on_disk_size_for_query));
|
user_process_list.user_temp_data_on_disk, settings.max_temporary_data_on_disk_size_for_query));
|
||||||
}
|
}
|
||||||
thread_group->query = query_;
|
|
||||||
thread_group->normalized_query_hash = normalizedQueryHash<false>(query_);
|
|
||||||
|
|
||||||
/// Set query-level memory trackers
|
/// Set query-level memory trackers
|
||||||
thread_group->memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage);
|
thread_group->memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage);
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
#include <Interpreters/QueryViewsLog.h>
|
#include <Interpreters/QueryViewsLog.h>
|
||||||
#include <Interpreters/TraceCollector.h>
|
#include <Interpreters/TraceCollector.h>
|
||||||
#include <Parsers/formatAST.h>
|
#include <Parsers/formatAST.h>
|
||||||
|
#include <Parsers/queryNormalization.h>
|
||||||
#include <Common/CurrentThread.h>
|
#include <Common/CurrentThread.h>
|
||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
#include <Common/ProfileEvents.h>
|
#include <Common/ProfileEvents.h>
|
||||||
@ -40,7 +41,14 @@ namespace ErrorCodes
|
|||||||
extern const int CANNOT_SET_THREAD_PRIORITY;
|
extern const int CANNOT_SET_THREAD_PRIORITY;
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::vector<UInt64> ThreadGroupStatus::getInvolvedThreadIds() const
|
ThreadGroupStatus::ThreadGroupStatus(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_)
|
||||||
|
: master_thread_id(CurrentThread::get().thread_id)
|
||||||
|
, query_context(query_context_)
|
||||||
|
, global_context(query_context_->getGlobalContext())
|
||||||
|
, fatal_error_callback(fatal_error_callback_)
|
||||||
|
{}
|
||||||
|
|
||||||
|
std::vector<UInt64> ThreadGroupStatus::getInvolvedThreadIds() const
|
||||||
{
|
{
|
||||||
std::vector<UInt64> res;
|
std::vector<UInt64> res;
|
||||||
|
|
||||||
@ -52,24 +60,75 @@ const std::vector<UInt64> ThreadGroupStatus::getInvolvedThreadIds() const
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ThreadGroupStatus::linkThread(UInt64 thread_it)
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
thread_ids.insert(thread_it);
|
||||||
|
}
|
||||||
|
|
||||||
ThreadGroupStatusPtr ThreadGroupStatus::createForQuery(ContextPtr query_context_, std::function<void()> fatal_error_callback_)
|
ThreadGroupStatusPtr ThreadGroupStatus::createForQuery(ContextPtr query_context_, std::function<void()> fatal_error_callback_)
|
||||||
{
|
{
|
||||||
auto group = std::make_shared<ThreadGroupStatus>();
|
auto group = std::make_shared<ThreadGroupStatus>(query_context_, std::move(fatal_error_callback_));
|
||||||
group->memory_tracker.setDescription("(for query)");
|
group->memory_tracker.setDescription("(for query)");
|
||||||
group->master_thread_id = CurrentThread::get().thread_id;
|
|
||||||
|
|
||||||
group->query_context = query_context_;
|
|
||||||
group->global_context = query_context_->getGlobalContext();
|
|
||||||
|
|
||||||
group->fatal_error_callback = std::move(fatal_error_callback_);
|
|
||||||
|
|
||||||
return group;
|
return group;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ThreadGroupStatus::attachQueryForLog(const String & query_, UInt64 normalized_hash)
|
||||||
|
{
|
||||||
|
auto hash = normalized_hash ? normalized_hash : normalizedQueryHash<false>(query_);
|
||||||
|
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
shared_data.query_for_logs = query_;
|
||||||
|
shared_data.normalized_query_hash = hash;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatus::attachQueryForLog(const String & query_)
|
||||||
|
{
|
||||||
|
shared_data.query_for_logs = query_;
|
||||||
|
shared_data.normalized_query_hash = normalizedQueryHash<false>(query_);
|
||||||
|
|
||||||
|
if (!thread_group)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "No thread group attached to the thread {}", thread_id);
|
||||||
|
|
||||||
|
thread_group->attachQueryForLog(shared_data.query_for_logs, shared_data.normalized_query_hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadGroupStatus::attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue)
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
shared_data.profile_queue_ptr = profile_queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStatus::attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue)
|
||||||
|
{
|
||||||
|
if (!thread_group)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "No thread group attached to the thread {}", thread_id);
|
||||||
|
|
||||||
|
shared_data.profile_queue_ptr = profile_queue;
|
||||||
|
thread_group->attachInternalProfileEventsQueue(profile_queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
void CurrentThread::attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & queue)
|
||||||
|
{
|
||||||
|
if (unlikely(!current_thread))
|
||||||
|
return;
|
||||||
|
current_thread->attachInternalProfileEventsQueue(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
void CurrentThread::attachQueryForLog(const String & query_)
|
||||||
|
{
|
||||||
|
if (unlikely(!current_thread))
|
||||||
|
return;
|
||||||
|
current_thread->attachQueryForLog(query_);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void ThreadStatus::applyQuerySettings()
|
void ThreadStatus::applyQuerySettings()
|
||||||
{
|
{
|
||||||
auto query_context_ptr = query_context.lock();
|
auto query_context_ptr = query_context.lock();
|
||||||
assert(query_context_ptr);
|
if (!query_context_ptr)
|
||||||
|
return;
|
||||||
|
|
||||||
const Settings & settings = query_context_ptr->getSettingsRef();
|
const Settings & settings = query_context_ptr->getSettingsRef();
|
||||||
|
|
||||||
query_id_from_query_context = query_context_ptr->getCurrentQueryId();
|
query_id_from_query_context = query_context_ptr->getCurrentQueryId();
|
||||||
@ -94,69 +153,53 @@ void ThreadStatus::applyQuerySettings()
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadStatus::attachGroupImp(const ThreadGroupStatusPtr & thread_group_)
|
void ThreadStatus::attachToGroupImpl(const ThreadGroupStatusPtr & thread_group_)
|
||||||
{
|
{
|
||||||
assertState(ThreadState::DetachedFromQuery, __PRETTY_FUNCTION__);
|
|
||||||
|
|
||||||
/// Attach or init current thread to thread group and copy useful information from it
|
/// Attach or init current thread to thread group and copy useful information from it
|
||||||
thread_group = thread_group_;
|
thread_group = thread_group_;
|
||||||
|
thread_group->linkThread(thread_id);
|
||||||
|
|
||||||
performance_counters.setParent(&thread_group->performance_counters);
|
performance_counters.setParent(&thread_group->performance_counters);
|
||||||
memory_tracker.setParent(&thread_group->memory_tracker);
|
memory_tracker.setParent(&thread_group->memory_tracker);
|
||||||
|
|
||||||
thread_group->link(this);
|
query_context = thread_group->query_context;
|
||||||
|
global_context = thread_group->global_context;
|
||||||
|
|
||||||
query_context = thread_group->getQueryContextWeak();
|
fatal_error_callback = thread_group->fatal_error_callback;
|
||||||
if (global_context.expired())
|
|
||||||
global_context = thread_group->getGlobalContextWeak();
|
|
||||||
|
|
||||||
fatal_error_callback = thread_group->getFatalErrorCallback();
|
shared_data = thread_group->getSharedData();
|
||||||
|
|
||||||
{
|
|
||||||
std::lock_guard lock(thread_group->mutex);
|
|
||||||
|
|
||||||
logs_queue_ptr = thread_group->logs_queue_ptr;
|
|
||||||
profile_queue_ptr = thread_group->profile_queue_ptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (auto query_context_ptr = query_context.lock())
|
|
||||||
{
|
|
||||||
applyQuerySettings();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
applyQuerySettings();
|
||||||
initPerformanceCounters();
|
initPerformanceCounters();
|
||||||
|
|
||||||
thread_state = ThreadState::AttachedToQuery;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadStatus::detachGroup()
|
void ThreadStatus::detachFromGroup()
|
||||||
{
|
{
|
||||||
|
if (!thread_group)
|
||||||
|
return;
|
||||||
|
|
||||||
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
|
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
|
||||||
|
|
||||||
if (thread_state == ThreadState::DetachedFromQuery)
|
/// flash untracked memory before resetting memory_tracker parent
|
||||||
return;
|
flushUntrackedMemory();
|
||||||
|
|
||||||
finalizeQueryProfiler();
|
finalizeQueryProfiler();
|
||||||
finalizePerformanceCounters();
|
finalizePerformanceCounters();
|
||||||
|
|
||||||
thread_group->unlink(this);
|
|
||||||
|
|
||||||
performance_counters.setParent(&ProfileEvents::global_counters);
|
performance_counters.setParent(&ProfileEvents::global_counters);
|
||||||
|
|
||||||
flushUntrackedMemory();
|
|
||||||
|
|
||||||
memory_tracker.reset();
|
memory_tracker.reset();
|
||||||
memory_tracker.setParent(thread_group->memory_tracker.getParent());
|
memory_tracker.setParent(thread_group->memory_tracker.getParent());
|
||||||
|
|
||||||
|
thread_group.reset();
|
||||||
|
|
||||||
query_id_from_query_context.clear();
|
query_id_from_query_context.clear();
|
||||||
query_context.reset();
|
query_context.reset();
|
||||||
|
|
||||||
|
shared_data = {};
|
||||||
|
|
||||||
fatal_error_callback = {};
|
fatal_error_callback = {};
|
||||||
|
|
||||||
thread_group.reset();
|
|
||||||
|
|
||||||
thread_state = ThreadState::DetachedFromQuery;
|
|
||||||
|
|
||||||
#if defined(OS_LINUX)
|
#if defined(OS_LINUX)
|
||||||
if (os_thread_priority)
|
if (os_thread_priority)
|
||||||
{
|
{
|
||||||
@ -176,19 +219,19 @@ void ThreadStatus::setInternalThread()
|
|||||||
internal_thread = true;
|
internal_thread = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadStatus::attachTo(const ThreadGroupStatusPtr & thread_group_, bool check_detached)
|
void ThreadStatus::attachToGroup(const ThreadGroupStatusPtr & thread_group_, bool check_detached)
|
||||||
{
|
{
|
||||||
if (check_detached)
|
if (thread_group && check_detached)
|
||||||
assertState(ThreadState::DetachedFromQuery, "Can't attach query to the thread, it is already attached");
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't attach query to the thread, it is already attached");
|
||||||
|
|
||||||
if (!thread_group_)
|
if (!thread_group_)
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to attach to nullptr thread group");
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to attach to nullptr thread group");
|
||||||
|
|
||||||
if (thread_state == ThreadState::AttachedToQuery)
|
if (thread_group)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
deleter = [this] () { detachGroup(); };
|
deleter = [this] () { detachFromGroup(); };
|
||||||
attachGroupImp(thread_group_);
|
attachToGroupImpl(thread_group_);
|
||||||
}
|
}
|
||||||
|
|
||||||
ProfileEvents::Counters * ThreadStatus::attachProfileCountersScope(ProfileEvents::Counters * performance_counters_scope)
|
ProfileEvents::Counters * ThreadStatus::attachProfileCountersScope(ProfileEvents::Counters * performance_counters_scope)
|
||||||
@ -208,18 +251,24 @@ ProfileEvents::Counters * ThreadStatus::attachProfileCountersScope(ProfileEvents
|
|||||||
return prev_counters;
|
return prev_counters;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadStatus::TimePoint::SetUp(std::chrono::time_point<std::chrono::system_clock> now)
|
|
||||||
{
|
|
||||||
// query_start_time_{microseconds, nanoseconds} are all constructed from the same time point
|
|
||||||
// to ensure that they are all equal up to the precision of a second.
|
|
||||||
nanoseconds = timeInNanoseconds(now);
|
|
||||||
microseconds = timeInMicroseconds(now);
|
|
||||||
seconds = timeInSeconds(now);
|
|
||||||
}
|
|
||||||
|
|
||||||
void ThreadStatus::TimePoint::setUp()
|
void ThreadStatus::TimePoint::setUp()
|
||||||
{
|
{
|
||||||
SetUp(std::chrono::system_clock::now());
|
point = std::chrono::system_clock::now();
|
||||||
|
}
|
||||||
|
|
||||||
|
UInt64 ThreadStatus::TimePoint::nanoseconds() const
|
||||||
|
{
|
||||||
|
return timeInNanoseconds(point);
|
||||||
|
}
|
||||||
|
|
||||||
|
UInt64 ThreadStatus::TimePoint::microseconds() const
|
||||||
|
{
|
||||||
|
return timeInMicroseconds(point);
|
||||||
|
}
|
||||||
|
|
||||||
|
UInt64 ThreadStatus::TimePoint::seconds() const
|
||||||
|
{
|
||||||
|
return timeInSeconds(point);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadStatus::initPerformanceCounters()
|
void ThreadStatus::initPerformanceCounters()
|
||||||
@ -309,11 +358,11 @@ void ThreadStatus::finalizePerformanceCounters()
|
|||||||
if (settings.log_queries && settings.log_query_threads)
|
if (settings.log_queries && settings.log_query_threads)
|
||||||
{
|
{
|
||||||
const auto now = std::chrono::system_clock::now();
|
const auto now = std::chrono::system_clock::now();
|
||||||
Int64 query_duration_ms = (timeInMicroseconds(now) - query_start_time.microseconds) / 1000;
|
Int64 query_duration_ms = std::chrono::duration_cast<std::chrono::microseconds>(now - query_start_time.point).count();
|
||||||
if (query_duration_ms >= settings.log_queries_min_query_duration_ms.totalMilliseconds())
|
if (query_duration_ms >= settings.log_queries_min_query_duration_ms.totalMilliseconds())
|
||||||
{
|
{
|
||||||
if (auto thread_log = global_context_ptr->getQueryThreadLog())
|
if (auto thread_log = global_context_ptr->getQueryThreadLog())
|
||||||
logToQueryThreadLog(*thread_log, query_context_ptr->getCurrentDatabase(), now);
|
logToQueryThreadLog(*thread_log, query_context_ptr->getCurrentDatabase());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -368,20 +417,20 @@ void ThreadStatus::finalizeQueryProfiler()
|
|||||||
query_profiler_cpu.reset();
|
query_profiler_cpu.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database, std::chrono::time_point<std::chrono::system_clock> now)
|
void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database)
|
||||||
{
|
{
|
||||||
QueryThreadLogElement elem;
|
QueryThreadLogElement elem;
|
||||||
|
|
||||||
// construct current_time and current_time_microseconds using the same time point
|
// construct current_time and current_time_microseconds using the same time point
|
||||||
// so that the two times will always be equal up to a precision of a second.
|
// so that the two times will always be equal up to a precision of a second.
|
||||||
TimePoint current_time;
|
TimePoint current_time;
|
||||||
current_time.SetUp(now);
|
current_time.setUp();
|
||||||
|
|
||||||
elem.event_time = current_time.seconds;
|
elem.event_time = current_time.seconds();
|
||||||
elem.event_time_microseconds = current_time.microseconds;
|
elem.event_time_microseconds = current_time.microseconds();
|
||||||
elem.query_start_time = query_start_time.seconds;
|
elem.query_start_time = query_start_time.seconds();
|
||||||
elem.query_start_time_microseconds = query_start_time.microseconds;
|
elem.query_start_time_microseconds = query_start_time.microseconds();
|
||||||
elem.query_duration_ms = (current_time.nanoseconds - query_start_time.nanoseconds) / 1000000U;
|
elem.query_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(current_time.point - query_start_time.point).count();
|
||||||
|
|
||||||
elem.read_rows = progress_in.read_rows.load(std::memory_order_relaxed);
|
elem.read_rows = progress_in.read_rows.load(std::memory_order_relaxed);
|
||||||
elem.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed);
|
elem.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed);
|
||||||
@ -397,13 +446,9 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String
|
|||||||
elem.current_database = current_database;
|
elem.current_database = current_database;
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
{
|
{
|
||||||
{
|
elem.master_thread_id = thread_group->master_thread_id;
|
||||||
std::lock_guard lock(thread_group->mutex);
|
elem.query = shared_data.query_for_logs;
|
||||||
|
elem.normalized_query_hash = shared_data.normalized_query_hash;
|
||||||
elem.master_thread_id = thread_group->master_thread_id;
|
|
||||||
elem.query = thread_group->query;
|
|
||||||
elem.normalized_query_hash = thread_group->normalized_query_hash;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto query_context_ptr = query_context.lock();
|
auto query_context_ptr = query_context.lock();
|
||||||
@ -478,18 +523,18 @@ void ThreadStatus::logToQueryViewsLog(const ViewRuntimeData & vinfo)
|
|||||||
views_log->add(element);
|
views_log->add(element);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentThread::attachTo(const ThreadGroupStatusPtr & thread_group)
|
void CurrentThread::attachToGroup(const ThreadGroupStatusPtr & thread_group)
|
||||||
{
|
{
|
||||||
if (unlikely(!current_thread))
|
if (unlikely(!current_thread))
|
||||||
return;
|
return;
|
||||||
current_thread->attachTo(thread_group, true);
|
current_thread->attachToGroup(thread_group, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentThread::attachToIfDetached(const ThreadGroupStatusPtr & thread_group)
|
void CurrentThread::attachToGroupIfDetached(const ThreadGroupStatusPtr & thread_group)
|
||||||
{
|
{
|
||||||
if (unlikely(!current_thread))
|
if (unlikely(!current_thread))
|
||||||
return;
|
return;
|
||||||
current_thread->attachTo(thread_group, false);
|
current_thread->attachToGroup(thread_group, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentThread::finalizePerformanceCounters()
|
void CurrentThread::finalizePerformanceCounters()
|
||||||
@ -499,16 +544,11 @@ void CurrentThread::finalizePerformanceCounters()
|
|||||||
current_thread->finalizePerformanceCounters();
|
current_thread->finalizePerformanceCounters();
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentThread::detachGroupIfNotDetached()
|
void CurrentThread::detachFromGroupIfNotDetached()
|
||||||
{
|
{
|
||||||
if (unlikely(!current_thread))
|
if (unlikely(!current_thread))
|
||||||
return;
|
return;
|
||||||
current_thread->detachGroup();
|
current_thread->detachFromGroup();
|
||||||
}
|
|
||||||
|
|
||||||
void CurrentThread::detachQueryIfNotDetached()
|
|
||||||
{
|
|
||||||
return detachGroupIfNotDetached();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CurrentThread::QueryScope::QueryScope(ContextMutablePtr query_context, std::function<void()> fatal_error_callback)
|
CurrentThread::QueryScope::QueryScope(ContextMutablePtr query_context, std::function<void()> fatal_error_callback)
|
||||||
@ -517,7 +557,7 @@ CurrentThread::QueryScope::QueryScope(ContextMutablePtr query_context, std::func
|
|||||||
query_context->makeQueryContext();
|
query_context->makeQueryContext();
|
||||||
|
|
||||||
auto group = ThreadGroupStatus::createForQuery(query_context, std::move(fatal_error_callback));
|
auto group = ThreadGroupStatus::createForQuery(query_context, std::move(fatal_error_callback));
|
||||||
CurrentThread::attachTo(std::move(group));
|
CurrentThread::attachToGroup(std::move(group));
|
||||||
}
|
}
|
||||||
|
|
||||||
CurrentThread::QueryScope::QueryScope(ContextPtr query_context, std::function<void()> fatal_error_callback)
|
CurrentThread::QueryScope::QueryScope(ContextPtr query_context, std::function<void()> fatal_error_callback)
|
||||||
@ -527,7 +567,7 @@ CurrentThread::QueryScope::QueryScope(ContextPtr query_context, std::function<vo
|
|||||||
ErrorCodes::LOGICAL_ERROR, "Cannot initialize query scope without query context");
|
ErrorCodes::LOGICAL_ERROR, "Cannot initialize query scope without query context");
|
||||||
|
|
||||||
auto group = ThreadGroupStatus::createForQuery(query_context, std::move(fatal_error_callback));
|
auto group = ThreadGroupStatus::createForQuery(query_context, std::move(fatal_error_callback));
|
||||||
CurrentThread::attachTo(std::move(group));
|
CurrentThread::attachToGroup(std::move(group));
|
||||||
}
|
}
|
||||||
|
|
||||||
void CurrentThread::QueryScope::logPeakMemoryUsage()
|
void CurrentThread::QueryScope::logPeakMemoryUsage()
|
||||||
@ -547,7 +587,7 @@ CurrentThread::QueryScope::~QueryScope()
|
|||||||
if (log_peak_memory_usage_in_destructor)
|
if (log_peak_memory_usage_in_destructor)
|
||||||
logPeakMemoryUsage();
|
logPeakMemoryUsage();
|
||||||
|
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
@ -22,7 +22,7 @@ ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool &
|
|||||||
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, callback = std::move(callback)]() mutable -> Result
|
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, callback = std::move(callback)]() mutable -> Result
|
||||||
{
|
{
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachTo(thread_group);
|
CurrentThread::attachToGroup(thread_group);
|
||||||
|
|
||||||
SCOPE_EXIT_SAFE({
|
SCOPE_EXIT_SAFE({
|
||||||
{
|
{
|
||||||
@ -33,7 +33,7 @@ ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool &
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -36,14 +36,14 @@ static void threadFunction(CompletedPipelineExecutor::Data & data, ThreadGroupSt
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
setThreadName("QueryCompPipeEx");
|
setThreadName("QueryCompPipeEx");
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachTo(thread_group);
|
CurrentThread::attachToGroup(thread_group);
|
||||||
|
|
||||||
data.executor->execute(num_threads);
|
data.executor->execute(num_threads);
|
||||||
}
|
}
|
||||||
|
@ -308,12 +308,12 @@ void PipelineExecutor::spawnThreads()
|
|||||||
|
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
setThreadName("QueryPipelineEx");
|
setThreadName("QueryPipelineEx");
|
||||||
|
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachTo(thread_group);
|
CurrentThread::attachToGroup(thread_group);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -71,14 +71,14 @@ static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGrou
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
setThreadName("QueryPullPipeEx");
|
setThreadName("QueryPullPipeEx");
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachTo(thread_group);
|
CurrentThread::attachToGroup(thread_group);
|
||||||
|
|
||||||
data.executor->execute(num_threads);
|
data.executor->execute(num_threads);
|
||||||
}
|
}
|
||||||
|
@ -101,14 +101,14 @@ static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGrou
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
setThreadName("QueryPushPipeEx");
|
setThreadName("QueryPushPipeEx");
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachTo(thread_group);
|
CurrentThread::attachToGroup(thread_group);
|
||||||
|
|
||||||
data.executor->execute(num_threads);
|
data.executor->execute(num_threads);
|
||||||
}
|
}
|
||||||
|
@ -100,11 +100,11 @@ namespace DB
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
setThreadName("Collector");
|
setThreadName("Collector");
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -161,11 +161,11 @@ namespace DB
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
setThreadName("Formatter");
|
setThreadName("Formatter");
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -12,10 +12,10 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachTo(thread_group);
|
CurrentThread::attachToGroup(thread_group);
|
||||||
|
|
||||||
setThreadName("Segmentator");
|
setThreadName("Segmentator");
|
||||||
try
|
try
|
||||||
@ -62,10 +62,10 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr threa
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
const auto parser_unit_number = current_ticket_number % processing_units.size();
|
const auto parser_unit_number = current_ticket_number % processing_units.size();
|
||||||
auto & unit = processing_units[parser_unit_number];
|
auto & unit = processing_units[parser_unit_number];
|
||||||
|
@ -100,10 +100,10 @@ struct ManyAggregatedData
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
setThreadName("AggregDestruct");
|
setThreadName("AggregDestruct");
|
||||||
});
|
});
|
||||||
|
@ -286,7 +286,7 @@ Chain buildPushingToViewsChain(
|
|||||||
std::unique_ptr<ThreadStatus> view_thread_status_ptr = std::make_unique<ThreadStatus>();
|
std::unique_ptr<ThreadStatus> view_thread_status_ptr = std::make_unique<ThreadStatus>();
|
||||||
/// Copy of a ThreadStatus should be internal.
|
/// Copy of a ThreadStatus should be internal.
|
||||||
view_thread_status_ptr->setInternalThread();
|
view_thread_status_ptr->setInternalThread();
|
||||||
view_thread_status_ptr->attachTo(running_group);
|
view_thread_status_ptr->attachToGroup(running_group);
|
||||||
|
|
||||||
auto * view_thread_status = view_thread_status_ptr.get();
|
auto * view_thread_status = view_thread_status_ptr.get();
|
||||||
views_data->thread_status_holder->thread_statuses.push_front(std::move(view_thread_status_ptr));
|
views_data->thread_status_holder->thread_statuses.push_front(std::move(view_thread_status_ptr));
|
||||||
|
@ -293,12 +293,12 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
|
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
setThreadName("DistrOutStrProc");
|
setThreadName("DistrOutStrProc");
|
||||||
|
|
||||||
++job.blocks_started;
|
++job.blocks_started;
|
||||||
|
@ -1395,10 +1395,10 @@ std::vector<MergeTreeData::LoadPartResult> MergeTreeData::loadDataPartsFromDisk(
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
@ -2314,10 +2314,10 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
asMutableDeletingPart(part)->remove();
|
asMutableDeletingPart(part)->remove();
|
||||||
if (part_names_succeed)
|
if (part_names_succeed)
|
||||||
@ -2375,10 +2375,10 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
LOG_TRACE(log, "Removing {} parts in blocks range {}", batch.size(), range.getPartNameForLogs());
|
LOG_TRACE(log, "Removing {} parts in blocks range {}", batch.size(), range.getPartNameForLogs());
|
||||||
|
|
||||||
|
@ -1119,10 +1119,10 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
|
|||||||
{
|
{
|
||||||
SCOPE_EXIT_SAFE(
|
SCOPE_EXIT_SAFE(
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::detachQueryIfNotDetached();
|
CurrentThread::detachFromGroupIfNotDetached();
|
||||||
);
|
);
|
||||||
if (thread_group)
|
if (thread_group)
|
||||||
CurrentThread::attachToIfDetached(thread_group);
|
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||||
|
|
||||||
process_part(part_index);
|
process_part(part_index);
|
||||||
});
|
});
|
||||||
|
@ -90,7 +90,7 @@ namespace
|
|||||||
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
|
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
|
||||||
stack_trace = StackTrace(signal_context);
|
stack_trace = StackTrace(signal_context);
|
||||||
|
|
||||||
std::string_view query_id = CurrentThread::getQueryId();
|
auto query_id = CurrentThread::getQueryId();
|
||||||
query_id_size = std::min(query_id.size(), max_query_id_size);
|
query_id_size = std::min(query_id.size(), max_query_id_size);
|
||||||
if (!query_id.empty())
|
if (!query_id.empty())
|
||||||
memcpy(query_id_data, query_id.data(), query_id_size);
|
memcpy(query_id_data, query_id.data(), query_id_size);
|
||||||
|
@ -16,8 +16,8 @@ INSERT and READ INSERT
|
|||||||
[ 0 ] FileOpen: 7
|
[ 0 ] FileOpen: 7
|
||||||
DROP
|
DROP
|
||||||
CHECK with query_log
|
CHECK with query_log
|
||||||
QueryFinish INSERT INTO times SELECT now() + INTERVAL 1 day; FileOpen 7
|
QueryFinish INSERT INTO times SELECT now() + INTERVAL 1 day SETTINGS optimize_on_insert = 0; FileOpen 7
|
||||||
QueryFinish SELECT \'1\', min(t) FROM times; FileOpen 0
|
QueryFinish SELECT \'1\', min(t) FROM times; FileOpen 0
|
||||||
QueryFinish INSERT INTO times SELECT now() + INTERVAL 2 day; FileOpen 7
|
QueryFinish INSERT INTO times SELECT now() + INTERVAL 2 day SETTINGS optimize_on_insert = 0; FileOpen 7
|
||||||
QueryFinish SELECT \'2\', min(t) FROM times; FileOpen 0
|
QueryFinish SELECT \'2\', min(t) FROM times; FileOpen 0
|
||||||
QueryFinish INSERT INTO times SELECT now() + INTERVAL 3 day; FileOpen 7
|
QueryFinish INSERT INTO times SELECT now() + INTERVAL 3 day SETTINGS optimize_on_insert = 0; FileOpen 7
|
||||||
|
@ -29,12 +29,20 @@ ORDER BY query_start_time DESC;
|
|||||||
echo "CREATE"
|
echo "CREATE"
|
||||||
$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -nq "
|
$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -nq "
|
||||||
DROP TABLE IF EXISTS times;
|
DROP TABLE IF EXISTS times;
|
||||||
CREATE TABLE times (t DateTime) ENGINE MergeTree ORDER BY t;
|
CREATE TABLE times (t DateTime) ENGINE MergeTree ORDER BY t
|
||||||
|
SETTINGS
|
||||||
|
storage_policy='default',
|
||||||
|
min_rows_for_compact_part = 0,
|
||||||
|
min_bytes_for_compact_part = 0,
|
||||||
|
min_rows_for_wide_part = 1000000,
|
||||||
|
min_bytes_for_wide_part = 1000000,
|
||||||
|
in_memory_parts_enable_wal = 0,
|
||||||
|
ratio_of_defaults_for_sparse_serialization=1.0;
|
||||||
" 2>&1 | grep -o -e '\ \[\ .*\ \]\ FileOpen:\ .*\ '
|
" 2>&1 | grep -o -e '\ \[\ .*\ \]\ FileOpen:\ .*\ '
|
||||||
|
|
||||||
echo "INSERT"
|
echo "INSERT"
|
||||||
$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -nq "
|
$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -nq "
|
||||||
INSERT INTO times SELECT now() + INTERVAL 1 day;
|
INSERT INTO times SELECT now() + INTERVAL 1 day SETTINGS optimize_on_insert = 0;
|
||||||
" 2>&1 | grep -o -e '\ \[\ .*\ \]\ FileOpen:\ .*\ '
|
" 2>&1 | grep -o -e '\ \[\ .*\ \]\ FileOpen:\ .*\ '
|
||||||
|
|
||||||
echo "READ"
|
echo "READ"
|
||||||
@ -44,13 +52,13 @@ SELECT '1', min(t) FROM times;
|
|||||||
|
|
||||||
echo "INSERT and READ INSERT"
|
echo "INSERT and READ INSERT"
|
||||||
$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -nq "
|
$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -nq "
|
||||||
INSERT INTO times SELECT now() + INTERVAL 2 day;
|
INSERT INTO times SELECT now() + INTERVAL 2 day SETTINGS optimize_on_insert = 0;
|
||||||
SELECT '2', min(t) FROM times;
|
SELECT '2', min(t) FROM times;
|
||||||
INSERT INTO times SELECT now() + INTERVAL 3 day;
|
INSERT INTO times SELECT now() + INTERVAL 3 day SETTINGS optimize_on_insert = 0;
|
||||||
" 2>&1 | grep -o -e '\ \[\ .*\ \]\ FileOpen:\ .*\ '
|
" 2>&1 | grep -o -e '\ \[\ .*\ \]\ FileOpen:\ .*\ '
|
||||||
|
|
||||||
echo "DROP"
|
echo "DROP"
|
||||||
$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -nq "
|
$CLICKHOUSE_CLIENT -nq "
|
||||||
DROP TABLE times;
|
DROP TABLE times;
|
||||||
"
|
"
|
||||||
|
|
||||||
@ -62,7 +70,7 @@ SELECT type,
|
|||||||
'FileOpen', ProfileEvents['FileOpen']
|
'FileOpen', ProfileEvents['FileOpen']
|
||||||
FROM system.query_log
|
FROM system.query_log
|
||||||
WHERE current_database = currentDatabase()
|
WHERE current_database = currentDatabase()
|
||||||
AND ( query LIKE '%SELECT % FROM times%' OR query LIKE '%INSERT INTO times%')
|
AND ( query LIKE '%SELECT % FROM times%' OR query LIKE '%INSERT INTO times%' )
|
||||||
AND type = 'QueryFinish'
|
AND type = 'QueryFinish'
|
||||||
ORDER BY query_start_time_microseconds ASC, query DESC;
|
ORDER BY query_start_time_microseconds ASC, query DESC;
|
||||||
"
|
"
|
||||||
|
Loading…
Reference in New Issue
Block a user