Rename ThreadGroupStatus to ThreadGroup

There are methods like getThreadGroup() and ThreadGroupSwitcher class,
so seems that this is logical.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-04-07 15:13:21 +02:00
parent 0503ed6fb8
commit 5b2b20a0b0
19 changed files with 59 additions and 59 deletions

View File

@ -90,7 +90,7 @@ void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTe
}
ThreadGroupStatusPtr CurrentThread::getGroup()
ThreadGroupPtr CurrentThread::getGroup()
{
if (unlikely(!current_thread))
return nullptr;

View File

@ -39,7 +39,7 @@ public:
static ThreadStatus & get();
/// Group to which belongs current thread
static ThreadGroupStatusPtr getGroup();
static ThreadGroupPtr getGroup();
/// A logs queue used by TCPHandler to pass logs to a client
static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
@ -69,9 +69,9 @@ public:
/// You must call one of these methods when create a query child thread:
/// Add current thread to a group associated with the thread group
static void attachToGroup(const ThreadGroupStatusPtr & thread_group);
static void attachToGroup(const ThreadGroupPtr & thread_group);
/// Is useful for a ThreadPool tasks
static void attachToGroupIfDetached(const ThreadGroupStatusPtr & thread_group);
static void attachToGroupIfDetached(const ThreadGroupPtr & thread_group);
/// Non-master threads call this method in destructor automatically
static void detachFromGroupIfNotDetached();

View File

@ -61,7 +61,7 @@ static thread_local ThreadStack alt_stack;
static thread_local bool has_alt_stack = false;
#endif
ThreadGroupStatus::ThreadGroupStatus()
ThreadGroup::ThreadGroup()
: master_thread_id(CurrentThread::get().thread_id)
{}
@ -119,7 +119,7 @@ ThreadStatus::ThreadStatus()
#endif
}
ThreadGroupStatusPtr ThreadStatus::getThreadGroup() const
ThreadGroupPtr ThreadStatus::getThreadGroup() const
{
return thread_group;
}
@ -139,7 +139,7 @@ ContextPtr ThreadStatus::getGlobalContext() const
return global_context.lock();
}
void ThreadGroupStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, LogsLevel logs_level)
void ThreadGroup::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, LogsLevel logs_level)
{
std::lock_guard lock(mutex);
shared_data.logs_queue_ptr = logs_queue;

View File

@ -58,15 +58,15 @@ using ThreadStatusPtr = ThreadStatus *;
* Create via CurrentThread::initializeQuery (for queries) or directly (for various background tasks).
* Use via CurrentThread::getGroup.
*/
class ThreadGroupStatus;
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
class ThreadGroup;
using ThreadGroupPtr = std::shared_ptr<ThreadGroup>;
class ThreadGroupStatus
class ThreadGroup
{
public:
ThreadGroupStatus();
ThreadGroup();
using FatalErrorCallback = std::function<void()>;
ThreadGroupStatus(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
ThreadGroup(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
/// The first thread created this thread group
const UInt64 master_thread_id;
@ -104,9 +104,9 @@ public:
void attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue);
/// When new query starts, new thread group is created for it, current thread becomes master thread of the query
static ThreadGroupStatusPtr createForQuery(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
static ThreadGroupPtr createForQuery(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
static ThreadGroupStatusPtr createForBackgroundProcess(ContextPtr storage_context);
static ThreadGroupPtr createForBackgroundProcess(ContextPtr storage_context);
std::vector<UInt64> getInvolvedThreadIds() const;
void linkThread(UInt64 thread_it);
@ -163,7 +163,7 @@ public:
private:
/// Group of threads, to which this thread attached
ThreadGroupStatusPtr thread_group;
ThreadGroupPtr thread_group;
/// Is set once
ContextWeakPtr global_context;
@ -174,7 +174,7 @@ private:
using FatalErrorCallback = std::function<void()>;
FatalErrorCallback fatal_error_callback;
ThreadGroupStatus::SharedData local_data;
ThreadGroup::SharedData local_data;
bool performance_counters_finalized = false;
@ -215,7 +215,7 @@ public:
ThreadStatus();
~ThreadStatus();
ThreadGroupStatusPtr getThreadGroup() const;
ThreadGroupPtr getThreadGroup() const;
const String & getQueryId() const;
@ -239,7 +239,7 @@ public:
void setInternalThread();
/// Attaches slave thread to existing thread group
void attachToGroup(const ThreadGroupStatusPtr & thread_group_, bool check_detached = true);
void attachToGroup(const ThreadGroupPtr & thread_group_, bool check_detached = true);
/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
void detachFromGroup();
@ -287,7 +287,7 @@ private:
void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database);
void attachToGroupImpl(const ThreadGroupStatusPtr & thread_group_);
void attachToGroupImpl(const ThreadGroupPtr & thread_group_);
};
/**

View File

@ -2315,7 +2315,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
std::atomic<UInt32> next_bucket_to_merge = 0;
auto converter = [&](size_t thread_id, ThreadGroupStatusPtr thread_group)
auto converter = [&](size_t thread_id, ThreadGroupPtr thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
@ -3043,7 +3043,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
LOG_TRACE(log, "Merging partially aggregated two-level data.");
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadGroupStatusPtr thread_group)
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadGroupPtr thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)

View File

@ -967,7 +967,7 @@ private:
}
/// Does the loading, possibly in the separate thread.
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async, ThreadGroupStatusPtr thread_group = {})
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async, ThreadGroupPtr thread_group = {})
{
SCOPE_EXIT_SAFE(
if (thread_group)

View File

@ -340,7 +340,7 @@ QueryStatus::QueryStatus(
const String & query_,
const ClientInfo & client_info_,
QueryPriorities::Handle && priority_handle_,
ThreadGroupStatusPtr && thread_group_,
ThreadGroupPtr && thread_group_,
IAST::QueryKind query_kind_,
UInt64 watch_start_nanoseconds)
: WithContext(context_)

View File

@ -86,7 +86,7 @@ protected:
ClientInfo client_info;
/// Info about all threads involved in query execution
ThreadGroupStatusPtr thread_group;
ThreadGroupPtr thread_group;
Stopwatch watch;
@ -162,7 +162,7 @@ public:
const String & query_,
const ClientInfo & client_info_,
QueryPriorities::Handle && priority_handle_,
ThreadGroupStatusPtr && thread_group_,
ThreadGroupPtr && thread_group_,
IAST::QueryKind query_kind_,
UInt64 watch_start_nanoseconds);

View File

@ -41,14 +41,14 @@ namespace ErrorCodes
extern const int CANNOT_SET_THREAD_PRIORITY;
}
ThreadGroupStatus::ThreadGroupStatus(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_)
ThreadGroup::ThreadGroup(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_)
: master_thread_id(CurrentThread::get().thread_id)
, query_context(query_context_)
, global_context(query_context_->getGlobalContext())
, fatal_error_callback(fatal_error_callback_)
{}
std::vector<UInt64> ThreadGroupStatus::getInvolvedThreadIds() const
std::vector<UInt64> ThreadGroup::getInvolvedThreadIds() const
{
std::vector<UInt64> res;
@ -60,22 +60,22 @@ std::vector<UInt64> ThreadGroupStatus::getInvolvedThreadIds() const
return res;
}
void ThreadGroupStatus::linkThread(UInt64 thread_it)
void ThreadGroup::linkThread(UInt64 thread_it)
{
std::lock_guard lock(mutex);
thread_ids.insert(thread_it);
}
ThreadGroupStatusPtr ThreadGroupStatus::createForQuery(ContextPtr query_context_, std::function<void()> fatal_error_callback_)
ThreadGroupPtr ThreadGroup::createForQuery(ContextPtr query_context_, std::function<void()> fatal_error_callback_)
{
auto group = std::make_shared<ThreadGroupStatus>(query_context_, std::move(fatal_error_callback_));
auto group = std::make_shared<ThreadGroup>(query_context_, std::move(fatal_error_callback_));
group->memory_tracker.setDescription("(for query)");
return group;
}
ThreadGroupStatusPtr ThreadGroupStatus::createForBackgroundProcess(ContextPtr storage_context)
ThreadGroupPtr ThreadGroup::createForBackgroundProcess(ContextPtr storage_context)
{
auto group = std::make_shared<ThreadGroupStatus>(storage_context);
auto group = std::make_shared<ThreadGroup>(storage_context);
group->memory_tracker.setDescription("background process to apply mutate/merge in table");
/// However settings from storage context have to be applied
@ -89,7 +89,7 @@ ThreadGroupStatusPtr ThreadGroupStatus::createForBackgroundProcess(ContextPtr st
return group;
}
void ThreadGroupStatus::attachQueryForLog(const String & query_, UInt64 normalized_hash)
void ThreadGroup::attachQueryForLog(const String & query_, UInt64 normalized_hash)
{
auto hash = normalized_hash ? normalized_hash : normalizedQueryHash<false>(query_);
@ -109,7 +109,7 @@ void ThreadStatus::attachQueryForLog(const String & query_)
thread_group->attachQueryForLog(local_data.query_for_logs, local_data.normalized_query_hash);
}
void ThreadGroupStatus::attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue)
void ThreadGroup::attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue)
{
std::lock_guard lock(mutex);
shared_data.profile_queue_ptr = profile_queue;
@ -168,7 +168,7 @@ void ThreadStatus::applyQuerySettings()
#endif
}
void ThreadStatus::attachToGroupImpl(const ThreadGroupStatusPtr & thread_group_)
void ThreadStatus::attachToGroupImpl(const ThreadGroupPtr & thread_group_)
{
/// Attach or init current thread to thread group and copy useful information from it
thread_group = thread_group_;
@ -234,7 +234,7 @@ void ThreadStatus::setInternalThread()
internal_thread = true;
}
void ThreadStatus::attachToGroup(const ThreadGroupStatusPtr & thread_group_, bool check_detached)
void ThreadStatus::attachToGroup(const ThreadGroupPtr & thread_group_, bool check_detached)
{
if (thread_group && check_detached)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't attach query to the thread, it is already attached");
@ -541,14 +541,14 @@ void ThreadStatus::logToQueryViewsLog(const ViewRuntimeData & vinfo)
views_log->add(element);
}
void CurrentThread::attachToGroup(const ThreadGroupStatusPtr & thread_group)
void CurrentThread::attachToGroup(const ThreadGroupPtr & thread_group)
{
if (unlikely(!current_thread))
return;
current_thread->attachToGroup(thread_group, true);
}
void CurrentThread::attachToGroupIfDetached(const ThreadGroupStatusPtr & thread_group)
void CurrentThread::attachToGroupIfDetached(const ThreadGroupPtr & thread_group)
{
if (unlikely(!current_thread))
return;
@ -574,7 +574,7 @@ CurrentThread::QueryScope::QueryScope(ContextMutablePtr query_context, std::func
if (!query_context->hasQueryContext())
query_context->makeQueryContext();
auto group = ThreadGroupStatus::createForQuery(query_context, std::move(fatal_error_callback));
auto group = ThreadGroup::createForQuery(query_context, std::move(fatal_error_callback));
CurrentThread::attachToGroup(group);
}
@ -584,7 +584,7 @@ CurrentThread::QueryScope::QueryScope(ContextPtr query_context, std::function<vo
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Cannot initialize query scope without query context");
auto group = ThreadGroupStatus::createForQuery(query_context, std::move(fatal_error_callback));
auto group = ThreadGroup::createForQuery(query_context, std::move(fatal_error_callback));
CurrentThread::attachToGroup(group);
}

View File

@ -32,7 +32,7 @@ struct CompletedPipelineExecutor::Data
}
};
static void threadFunction(CompletedPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
static void threadFunction(CompletedPipelineExecutor::Data & data, ThreadGroupPtr thread_group, size_t num_threads)
{
SCOPE_EXIT_SAFE(
if (thread_group)

View File

@ -67,7 +67,7 @@ const Block & PullingAsyncPipelineExecutor::getHeader() const
return lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader();
}
static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGroupPtr thread_group, size_t num_threads)
{
SCOPE_EXIT_SAFE(
if (thread_group)

View File

@ -97,7 +97,7 @@ struct PushingAsyncPipelineExecutor::Data
}
};
static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGroupPtr thread_group, size_t num_threads)
{
SCOPE_EXIT_SAFE(
if (thread_group)

View File

@ -96,7 +96,7 @@ namespace DB
}
void ParallelFormattingOutputFormat::collectorThreadFunction(const ThreadGroupStatusPtr & thread_group)
void ParallelFormattingOutputFormat::collectorThreadFunction(const ThreadGroupPtr & thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
@ -157,7 +157,7 @@ namespace DB
}
void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number, size_t first_row_num, const ThreadGroupStatusPtr & thread_group)
void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number, size_t first_row_num, const ThreadGroupPtr & thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)

View File

@ -270,10 +270,10 @@ private:
}
/// Collects all temporary buffers into main WriteBuffer.
void collectorThreadFunction(const ThreadGroupStatusPtr & thread_group);
void collectorThreadFunction(const ThreadGroupPtr & thread_group);
/// This function is executed in ThreadPool and the only purpose of it is to format one Chunk into a continuous buffer in memory.
void formatterThreadFunction(size_t current_unit_number, size_t first_row_num, const ThreadGroupStatusPtr & thread_group);
void formatterThreadFunction(size_t current_unit_number, size_t first_row_num, const ThreadGroupPtr & thread_group);
void setRowsBeforeLimit(size_t rows_before_limit) override
{

View File

@ -8,7 +8,7 @@
namespace DB
{
void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group)
void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupPtr thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
@ -62,7 +62,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr
}
}
void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupPtr thread_group, size_t current_ticket_number)
{
SCOPE_EXIT_SAFE(
if (thread_group)

View File

@ -317,8 +317,8 @@ private:
}
}
void segmentatorThreadFunction(ThreadGroupStatusPtr thread_group);
void parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number);
void segmentatorThreadFunction(ThreadGroupPtr thread_group);
void parserThreadFunction(ThreadGroupPtr thread_group, size_t current_ticket_number);
/// Save/log a background exception, set termination flag, wake up all
/// threads. This function is used by segmentator and parsed threads.

View File

@ -270,14 +270,14 @@ Chain buildPushingToViewsChain(
ASTPtr query;
Chain out;
/// NOTE: ThreadGroupStatus always should have context attached,
/// NOTE: ThreadGroup always should have context attached,
/// otherwise entry to the system.query_views_log will not be added
/// (see ThreadStatus::logToQueryViewsLog())
ThreadGroupStatusPtr running_group;
ThreadGroupPtr running_group;
if (current_thread)
running_group = current_thread->getThreadGroup();
if (!running_group)
running_group = std::make_shared<ThreadGroupStatus>(context);
running_group = std::make_shared<ThreadGroup>(context);
/// We are creating a ThreadStatus per view to store its metrics individually
/// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls

View File

@ -11,7 +11,7 @@ namespace DB
{
ThreadGroupSwitcher::ThreadGroupSwitcher(ThreadGroupStatusPtr thread_group)
ThreadGroupSwitcher::ThreadGroupSwitcher(ThreadGroupPtr thread_group)
{
chassert(thread_group);
@ -59,7 +59,7 @@ MergeListElement::MergeListElement(
is_mutation = (result_part_info.getDataVersion() != source_data_version);
}
thread_group = ThreadGroupStatus::createForBackgroundProcess(context);
thread_group = ThreadGroup::createForBackgroundProcess(context);
}
MergeInfo MergeListElement::getInfo() const

View File

@ -69,11 +69,11 @@ struct Settings;
class ThreadGroupSwitcher : private boost::noncopyable
{
public:
explicit ThreadGroupSwitcher(ThreadGroupStatusPtr thread_group);
explicit ThreadGroupSwitcher(ThreadGroupPtr thread_group);
~ThreadGroupSwitcher();
private:
ThreadGroupStatusPtr prev_thread_group;
ThreadGroupPtr prev_thread_group;
};
struct MergeListElement : boost::noncopyable
@ -113,7 +113,7 @@ struct MergeListElement : boost::noncopyable
/// Detected after merge already started
std::atomic<MergeAlgorithm> merge_algorithm;
ThreadGroupStatusPtr thread_group;
ThreadGroupPtr thread_group;
MergeListElement(
const StorageID & table_id_,