From 820a6e098773bc5fde2ae2296aa7ce3812ab79a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Mon, 9 Aug 2021 19:38:29 +0200 Subject: [PATCH] Refactor MV stage metric setup and teardown --- src/Common/ThreadStatus.cpp | 3 + src/Common/ThreadStatus.h | 7 +- .../PushingToViewsBlockOutputStream.cpp | 267 +++++++----------- .../PushingToViewsBlockOutputStream.h | 8 +- src/Interpreters/ThreadStatusExt.cpp | 9 +- 5 files changed, 122 insertions(+), 172 deletions(-) diff --git a/src/Common/ThreadStatus.cpp b/src/Common/ThreadStatus.cpp index c84195c8834..81c6b8eb1c3 100644 --- a/src/Common/ThreadStatus.cpp +++ b/src/Common/ThreadStatus.cpp @@ -149,6 +149,9 @@ ThreadStatus::~ThreadStatus() if (deleter) deleter(); + + /// Only change current_thread if it's currently being used by this ThreadStatus + /// For example, PushingToViewsBlockOutputStream creates and deletes ThreadStatus instances while running in the main query thread if (current_thread == this) current_thread = nullptr; } diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index cf3629dce40..dbfb33a320c 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -37,7 +37,7 @@ struct RUsageCounters; struct PerfEventsCounters; class TaskStatsInfoGetter; class InternalTextLogsQueue; -struct ViewInfo; +struct ViewRuntimeData; class QueryViewsLog; using InternalTextLogsQueuePtr = std::shared_ptr; using InternalTextLogsQueueWeakPtr = std::weak_ptr; @@ -216,6 +216,9 @@ public: /// Update ProfileEvents and dumps info to system.query_thread_log void finalizePerformanceCounters(); + /// Set the counters last usage to now + void resetPerformanceCountersLastUsage(); + /// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false); @@ -231,7 +234,7 @@ protected: void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database, std::chrono::time_point now); - void logToQueryViewsLog(const ViewInfo & vinfo); + void logToQueryViewsLog(const ViewRuntimeData & vinfo); void assertState(const std::initializer_list & permitted_states, const char * description = nullptr) const; diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 27089e98b80..a024b80f391 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -167,7 +167,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( 0, std::chrono::system_clock::now(), QueryViewsLogElement::ViewStatus::EXCEPTION_BEFORE_START}; - views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)}); + views.emplace_back(ViewRuntimeData{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)}); /// Add the view to the query access info so it can appear in system.query_log @@ -200,6 +200,39 @@ Block PushingToViewsBlockOutputStream::getHeader() const return metadata_snapshot->getSampleBlockWithVirtuals(storage->getVirtuals()); } +void inline runViewStage(ViewRuntimeData & view, const std::string & action, std::function stage) +{ + Stopwatch watch; + + auto * original_thread = current_thread; + SCOPE_EXIT({ current_thread = original_thread; }); + + if (view.runtime_stats.thread_status) + { + /// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread + view.runtime_stats.thread_status->resetPerformanceCountersLastUsage(); + current_thread = view.runtime_stats.thread_status.get(); + } + + try + { + stage(); + } + catch (Exception & ex) + { + ex.addMessage(action + " " + view.table_id.getNameForLogs()); + view.setException(std::current_exception()); + } + catch (...) + { + view.setException(std::current_exception()); + } + + if (view.runtime_stats.thread_status) + view.runtime_stats.thread_status->updatePerformanceCounters(); + view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); +} + void PushingToViewsBlockOutputStream::write(const Block & block) { /** Throw an exception if the sizes of arrays - elements of nested data structures doesn't match. @@ -237,7 +270,7 @@ void PushingToViewsBlockOutputStream::write(const Block & block) { pool.scheduleOrThrowOnError([&] { setThreadName("PushingToViews"); - process(block, view); + runViewStage(view, "while pushing to view", [&]() { process(block, view); }); }); } pool.wait(); @@ -246,7 +279,7 @@ void PushingToViewsBlockOutputStream::write(const Block & block) { for (auto & view : views) { - process(block, view); + runViewStage(view, "while pushing to view", [&]() { process(block, view); }); } } } @@ -258,7 +291,7 @@ void PushingToViewsBlockOutputStream::writePrefix() for (auto & view : views) { - processPrefix(view); + runViewStage(view, "while writing prefix to view", [&] { view.out->writePrefix(); }); if (view.exception) { logQueryViews(); @@ -275,6 +308,12 @@ void PushingToViewsBlockOutputStream::writeSuffix() if (views.empty()) return; + auto processSuffix = [](ViewRuntimeData & view) { + view.out->writeSuffix(); + view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::QUERY_FINISH); + }; + static std::string stageStep = "while writing suffix to view"; + /// Run writeSuffix() for views in separate thread pool. /// In could have been done in PushingToViewsBlockOutputStream::process, however /// it is not good if insert into main table fail but into view succeed. @@ -295,9 +334,20 @@ void PushingToViewsBlockOutputStream::writeSuffix() pool.scheduleOrThrowOnError([&] { setThreadName("PushingToViews"); - processSuffix(view); + runViewStage(view, stageStep, [&] { processSuffix(view); }); if (view.exception) + { exception_count.fetch_add(1, std::memory_order_relaxed); + } + else + { + LOG_TRACE( + log, + "Pushing from {} to {} took {} ms.", + storage->getStorageID().getNameForLogs(), + view.table_id.getNameForLogs(), + view.runtime_stats.elapsed_ms); + } }); } pool.wait(); @@ -312,7 +362,7 @@ void PushingToViewsBlockOutputStream::writeSuffix() exception_happened = true; continue; } - processSuffix(view); + runViewStage(view, stageStep, [&] { processSuffix(view); }); if (view.exception) exception_happened = true; } @@ -337,170 +387,59 @@ void PushingToViewsBlockOutputStream::flush() view.out->flush(); } -void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & view) +void PushingToViewsBlockOutputStream::process(const Block & block, ViewRuntimeData & view) { - Stopwatch watch; + BlockInputStreamPtr in; - auto * original_thread = current_thread; - SCOPE_EXIT({ current_thread = original_thread; }); + /// We need keep InterpreterSelectQuery, until the processing will be finished, since: + /// + /// - We copy Context inside InterpreterSelectQuery to support + /// modification of context (Settings) for subqueries + /// - InterpreterSelectQuery lives shorter than query pipeline. + /// It's used just to build the query pipeline and no longer needed + /// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery, + /// **can** take a reference to Context from InterpreterSelectQuery + /// (the problem raises only when function uses context from the + /// execute*() method, like FunctionDictGet do) + /// - These objects live inside query pipeline (DataStreams) and the reference become dangling. + std::optional select; - if (view.runtime_stats.thread_status) + if (view.runtime_stats.type == QueryViewsLogElement::ViewType::MATERIALIZED) { - /// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread - *view.runtime_stats.thread_status->last_rusage = RUsageCounters::current(); - if (view.runtime_stats.thread_status->taskstats) - view.runtime_stats.thread_status->taskstats->reset(); - current_thread = view.runtime_stats.thread_status.get(); + /// We create a table with the same name as original table and the same alias columns, + /// but it will contain single block (that is INSERT-ed into main table). + /// InterpreterSelectQuery will do processing of alias columns. + + auto local_context = Context::createCopy(select_context); + local_context->addViewSource( + StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals())); + select.emplace(view.query, local_context, SelectQueryOptions()); + in = std::make_shared(select->execute().getInputStream()); + + /// Squashing is needed here because the materialized view query can generate a lot of blocks + /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY + /// and two-level aggregation is triggered). + in = std::make_shared( + in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes); + in = std::make_shared(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name); + } + else + in = std::make_shared(block); + + in->setProgressCallback([this](const Progress & progress) { + CurrentThread::updateProgressIn(progress); + this->onProgress(progress); + }); + + in->readPrefix(); + + while (Block result_block = in->read()) + { + Nested::validateArraySizes(result_block); + view.out->write(result_block); } - - try - { - BlockInputStreamPtr in; - - /// We need keep InterpreterSelectQuery, until the processing will be finished, since: - /// - /// - We copy Context inside InterpreterSelectQuery to support - /// modification of context (Settings) for subqueries - /// - InterpreterSelectQuery lives shorter than query pipeline. - /// It's used just to build the query pipeline and no longer needed - /// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery, - /// **can** take a reference to Context from InterpreterSelectQuery - /// (the problem raises only when function uses context from the - /// execute*() method, like FunctionDictGet do) - /// - These objects live inside query pipeline (DataStreams) and the reference become dangling. - std::optional select; - - if (view.runtime_stats.type == QueryViewsLogElement::ViewType::MATERIALIZED) - { - /// We create a table with the same name as original table and the same alias columns, - /// but it will contain single block (that is INSERT-ed into main table). - /// InterpreterSelectQuery will do processing of alias columns. - - auto local_context = Context::createCopy(select_context); - local_context->addViewSource( - StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals())); - select.emplace(view.query, local_context, SelectQueryOptions()); - in = std::make_shared(select->execute().getInputStream()); - - /// Squashing is needed here because the materialized view query can generate a lot of blocks - /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY - /// and two-level aggregation is triggered). - in = std::make_shared( - in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes); - in = std::make_shared(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name); - } - else - in = std::make_shared(block); - - in->setProgressCallback([this](const Progress & progress) - { - CurrentThread::updateProgressIn(progress); - this->onProgress(progress); - }); - - in->readPrefix(); - - while (Block result_block = in->read()) - { - Nested::validateArraySizes(result_block); - view.out->write(result_block); - } - - in->readSuffix(); - } - catch (Exception & ex) - { - ex.addMessage("while pushing to view " + view.table_id.getNameForLogs()); - view.setException(std::current_exception()); - } - catch (...) - { - view.setException(std::current_exception()); - } - - if (view.runtime_stats.thread_status) - view.runtime_stats.thread_status->updatePerformanceCounters(); - view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); -} - -void PushingToViewsBlockOutputStream::processPrefix(ViewInfo & view) -{ - Stopwatch watch; - - auto * original_thread = current_thread; - SCOPE_EXIT({ current_thread = original_thread; }); - - if (view.runtime_stats.thread_status) - { - /// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread - *view.runtime_stats.thread_status->last_rusage = RUsageCounters::current(); - if (view.runtime_stats.thread_status->taskstats) - view.runtime_stats.thread_status->taskstats->reset(); - current_thread = view.runtime_stats.thread_status.get(); - } - - try - { - view.out->writePrefix(); - } - catch (Exception & ex) - { - ex.addMessage("while writing prefix to view " + view.table_id.getNameForLogs()); - view.setException(std::current_exception()); - } - catch (...) - { - view.setException(std::current_exception()); - } - if (view.runtime_stats.thread_status) - view.runtime_stats.thread_status->updatePerformanceCounters(); - view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); -} - - -void PushingToViewsBlockOutputStream::processSuffix(ViewInfo & view) -{ - Stopwatch watch; - - auto * original_thread = current_thread; - SCOPE_EXIT({ current_thread = original_thread; }); - - if (view.runtime_stats.thread_status) - { - /// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread - *view.runtime_stats.thread_status->last_rusage = RUsageCounters::current(); - if (view.runtime_stats.thread_status->taskstats) - view.runtime_stats.thread_status->taskstats->reset(); - current_thread = view.runtime_stats.thread_status.get(); - } - - try - { - view.out->writeSuffix(); - view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::QUERY_FINISH); - } - catch (Exception & ex) - { - ex.addMessage("while writing suffix to view " + view.table_id.getNameForLogs()); - view.setException(std::current_exception()); - } - catch (...) - { - view.setException(std::current_exception()); - } - if (view.runtime_stats.thread_status) - view.runtime_stats.thread_status->updatePerformanceCounters(); - view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); - if (!view.exception) - { - LOG_TRACE( - log, - "Pushing from {} to {} took {} ms.", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - view.runtime_stats.elapsed_ms); - } + in->readSuffix(); } void PushingToViewsBlockOutputStream::checkExceptionsInViews() diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.h b/src/DataStreams/PushingToViewsBlockOutputStream.h index 591cf9e771c..ba125e28829 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -16,7 +16,7 @@ namespace DB class ReplicatedMergeTreeSink; -struct ViewInfo +struct ViewRuntimeData { const ASTPtr query; StorageID table_id; @@ -61,13 +61,11 @@ private: ASTPtr query_ptr; Stopwatch main_watch; - std::vector views; + std::vector views; ContextMutablePtr select_context; ContextMutablePtr insert_context; - void process(const Block & block, ViewInfo & view); - static void processPrefix(ViewInfo & view); - void processSuffix(ViewInfo & view); + void process(const Block & block, ViewRuntimeData & view); void checkExceptionsInViews(); void logQueryViews(); }; diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index d11d3b22184..2917a399906 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -300,6 +300,13 @@ void ThreadStatus::finalizePerformanceCounters() } } +void ThreadStatus::resetPerformanceCountersLastUsage() +{ + *last_rusage = RUsageCounters::current(); + if (taskstats) + taskstats->reset(); +} + void ThreadStatus::initQueryProfiler() { if (!query_profiled_enabled) @@ -482,7 +489,7 @@ static String getCleanQueryAst(const ASTPtr q, ContextPtr context) return res; } -void ThreadStatus::logToQueryViewsLog(const ViewInfo & vinfo) +void ThreadStatus::logToQueryViewsLog(const ViewRuntimeData & vinfo) { auto query_context_ptr = query_context.lock(); if (!query_context_ptr)