PushingToViewsBlockOutputStream: Use setupState instead

This commit is contained in:
Raúl Marín 2021-06-29 19:28:47 +02:00
parent 0a5c9868b0
commit 0231583ea2
2 changed files with 13 additions and 28 deletions

View File

@ -140,13 +140,13 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
/// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work,
/// and switch back to the original thread_status.
auto * running_thread = current_thread;
auto * running_memory_tracker = DB::CurrentThread::getMemoryTracker();
if (!running_memory_tracker)
running_memory_tracker = &total_memory_tracker;
ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup()
? current_thread->getThreadGroup()
: MainThreadStatus::getInstance().thread_group;
auto thread_status = std::make_unique<ThreadStatus>();
thread_status->attachQueryContext(getContext());
thread_status->memory_tracker.setParent(running_memory_tracker);
current_thread = running_thread;
if (running_group)
thread_status->setupState(running_group);
QueryViewsLogElement::ViewRuntimeStats runtime_stats{
target_name,
@ -156,7 +156,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
std::chrono::system_clock::now(),
QueryViewsLogElement::ViewStatus::EXCEPTION_BEFORE_START};
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)});
current_thread = running_thread;
/// Add the view to the query access info so it can appear in system.query_log
if (!no_destination)
@ -178,24 +178,6 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
}
}
PushingToViewsBlockOutputStream::~PushingToViewsBlockOutputStream()
{
for (auto & view : views)
{
if (view.runtime_stats.thread_status)
{
try
{
view.runtime_stats.thread_status->detachQuery(true, true);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
}
Block PushingToViewsBlockOutputStream::getHeader() const
{
/// If we don't write directly to the destination
@ -206,7 +188,6 @@ Block PushingToViewsBlockOutputStream::getHeader() const
return metadata_snapshot->getSampleBlockWithVirtuals(storage->getVirtuals());
}
void PushingToViewsBlockOutputStream::write(const Block & block)
{
/** Throw an exception if the sizes of arrays - elements of nested data structures doesn't match.
@ -351,6 +332,8 @@ void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & vi
auto * running_thread = current_thread;
current_thread = view.runtime_stats.thread_status.get();
*current_thread->last_rusage = RUsageCounters::current();
if (current_thread->taskstats)
current_thread->taskstats->reset();
SCOPE_EXIT({
current_thread->updatePerformanceCounters();
current_thread = running_thread;
@ -425,6 +408,8 @@ void PushingToViewsBlockOutputStream::processPrefix(ViewInfo & view)
auto * running_thread = current_thread;
current_thread = view.runtime_stats.thread_status.get();
*current_thread->last_rusage = RUsageCounters::current();
if (current_thread->taskstats)
current_thread->taskstats->reset();
SCOPE_EXIT({
current_thread->updatePerformanceCounters();
current_thread = running_thread;
@ -454,6 +439,8 @@ void PushingToViewsBlockOutputStream::processSuffix(ViewInfo & view)
auto * running_thread = current_thread;
current_thread = view.runtime_stats.thread_status.get();
*current_thread->last_rusage = RUsageCounters::current();
if (current_thread->taskstats)
current_thread->taskstats->reset();
SCOPE_EXIT({
current_thread->updatePerformanceCounters();
current_thread = running_thread;

View File

@ -43,8 +43,6 @@ public:
const ASTPtr & query_ptr_,
bool no_destination = false);
~PushingToViewsBlockOutputStream() override;
Block getHeader() const override;
void write(const Block & block) override;