Refactor MV stage metric setup and teardown

This commit is contained in:
Raúl Marín 2021-08-09 19:38:29 +02:00
parent d1d47658f5
commit 820a6e0987
5 changed files with 122 additions and 172 deletions

View File

@ -149,6 +149,9 @@ ThreadStatus::~ThreadStatus()
if (deleter)
deleter();
/// Only change current_thread if it's currently being used by this ThreadStatus
/// For example, PushingToViewsBlockOutputStream creates and deletes ThreadStatus instances while running in the main query thread
if (current_thread == this)
current_thread = nullptr;
}

View File

@ -37,7 +37,7 @@ struct RUsageCounters;
struct PerfEventsCounters;
class TaskStatsInfoGetter;
class InternalTextLogsQueue;
struct ViewInfo;
struct ViewRuntimeData;
class QueryViewsLog;
using InternalTextLogsQueuePtr = std::shared_ptr<InternalTextLogsQueue>;
using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>;
@ -216,6 +216,9 @@ public:
/// Update ProfileEvents and dumps info to system.query_thread_log
void finalizePerformanceCounters();
/// Set the counters last usage to now
void resetPerformanceCountersLastUsage();
/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false);
@ -231,7 +234,7 @@ protected:
void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database, std::chrono::time_point<std::chrono::system_clock> now);
void logToQueryViewsLog(const ViewInfo & vinfo);
void logToQueryViewsLog(const ViewRuntimeData & vinfo);
void assertState(const std::initializer_list<int> & permitted_states, const char * description = nullptr) const;

View File

@ -167,7 +167,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
0,
std::chrono::system_clock::now(),
QueryViewsLogElement::ViewStatus::EXCEPTION_BEFORE_START};
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)});
views.emplace_back(ViewRuntimeData{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)});
/// Add the view to the query access info so it can appear in system.query_log
@ -200,6 +200,39 @@ Block PushingToViewsBlockOutputStream::getHeader() const
return metadata_snapshot->getSampleBlockWithVirtuals(storage->getVirtuals());
}
void inline runViewStage(ViewRuntimeData & view, const std::string & action, std::function<void()> stage)
{
Stopwatch watch;
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
if (view.runtime_stats.thread_status)
{
/// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread
view.runtime_stats.thread_status->resetPerformanceCountersLastUsage();
current_thread = view.runtime_stats.thread_status.get();
}
try
{
stage();
}
catch (Exception & ex)
{
ex.addMessage(action + " " + view.table_id.getNameForLogs());
view.setException(std::current_exception());
}
catch (...)
{
view.setException(std::current_exception());
}
if (view.runtime_stats.thread_status)
view.runtime_stats.thread_status->updatePerformanceCounters();
view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds();
}
void PushingToViewsBlockOutputStream::write(const Block & block)
{
/** Throw an exception if the sizes of arrays - elements of nested data structures doesn't match.
@ -237,7 +270,7 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
{
pool.scheduleOrThrowOnError([&] {
setThreadName("PushingToViews");
process(block, view);
runViewStage(view, "while pushing to view", [&]() { process(block, view); });
});
}
pool.wait();
@ -246,7 +279,7 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
{
for (auto & view : views)
{
process(block, view);
runViewStage(view, "while pushing to view", [&]() { process(block, view); });
}
}
}
@ -258,7 +291,7 @@ void PushingToViewsBlockOutputStream::writePrefix()
for (auto & view : views)
{
processPrefix(view);
runViewStage(view, "while writing prefix to view", [&] { view.out->writePrefix(); });
if (view.exception)
{
logQueryViews();
@ -275,6 +308,12 @@ void PushingToViewsBlockOutputStream::writeSuffix()
if (views.empty())
return;
auto processSuffix = [](ViewRuntimeData & view) {
view.out->writeSuffix();
view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::QUERY_FINISH);
};
static std::string stageStep = "while writing suffix to view";
/// Run writeSuffix() for views in separate thread pool.
/// In could have been done in PushingToViewsBlockOutputStream::process, however
/// it is not good if insert into main table fail but into view succeed.
@ -295,9 +334,20 @@ void PushingToViewsBlockOutputStream::writeSuffix()
pool.scheduleOrThrowOnError([&] {
setThreadName("PushingToViews");
processSuffix(view);
runViewStage(view, stageStep, [&] { processSuffix(view); });
if (view.exception)
{
exception_count.fetch_add(1, std::memory_order_relaxed);
}
else
{
LOG_TRACE(
log,
"Pushing from {} to {} took {} ms.",
storage->getStorageID().getNameForLogs(),
view.table_id.getNameForLogs(),
view.runtime_stats.elapsed_ms);
}
});
}
pool.wait();
@ -312,7 +362,7 @@ void PushingToViewsBlockOutputStream::writeSuffix()
exception_happened = true;
continue;
}
processSuffix(view);
runViewStage(view, stageStep, [&] { processSuffix(view); });
if (view.exception)
exception_happened = true;
}
@ -337,170 +387,59 @@ void PushingToViewsBlockOutputStream::flush()
view.out->flush();
}
void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & view)
void PushingToViewsBlockOutputStream::process(const Block & block, ViewRuntimeData & view)
{
Stopwatch watch;
BlockInputStreamPtr in;
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
/// We need keep InterpreterSelectQuery, until the processing will be finished, since:
///
/// - We copy Context inside InterpreterSelectQuery to support
/// modification of context (Settings) for subqueries
/// - InterpreterSelectQuery lives shorter than query pipeline.
/// It's used just to build the query pipeline and no longer needed
/// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery,
/// **can** take a reference to Context from InterpreterSelectQuery
/// (the problem raises only when function uses context from the
/// execute*() method, like FunctionDictGet do)
/// - These objects live inside query pipeline (DataStreams) and the reference become dangling.
std::optional<InterpreterSelectQuery> select;
if (view.runtime_stats.thread_status)
if (view.runtime_stats.type == QueryViewsLogElement::ViewType::MATERIALIZED)
{
/// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread
*view.runtime_stats.thread_status->last_rusage = RUsageCounters::current();
if (view.runtime_stats.thread_status->taskstats)
view.runtime_stats.thread_status->taskstats->reset();
current_thread = view.runtime_stats.thread_status.get();
/// We create a table with the same name as original table and the same alias columns,
/// but it will contain single block (that is INSERT-ed into main table).
/// InterpreterSelectQuery will do processing of alias columns.
auto local_context = Context::createCopy(select_context);
local_context->addViewSource(
StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals()));
select.emplace(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select->execute().getInputStream());
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
in = std::make_shared<SquashingBlockInputStream>(
in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes);
in = std::make_shared<ConvertingBlockInputStream>(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name);
}
else
in = std::make_shared<OneBlockInputStream>(block);
in->setProgressCallback([this](const Progress & progress) {
CurrentThread::updateProgressIn(progress);
this->onProgress(progress);
});
in->readPrefix();
while (Block result_block = in->read())
{
Nested::validateArraySizes(result_block);
view.out->write(result_block);
}
try
{
BlockInputStreamPtr in;
/// We need keep InterpreterSelectQuery, until the processing will be finished, since:
///
/// - We copy Context inside InterpreterSelectQuery to support
/// modification of context (Settings) for subqueries
/// - InterpreterSelectQuery lives shorter than query pipeline.
/// It's used just to build the query pipeline and no longer needed
/// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery,
/// **can** take a reference to Context from InterpreterSelectQuery
/// (the problem raises only when function uses context from the
/// execute*() method, like FunctionDictGet do)
/// - These objects live inside query pipeline (DataStreams) and the reference become dangling.
std::optional<InterpreterSelectQuery> select;
if (view.runtime_stats.type == QueryViewsLogElement::ViewType::MATERIALIZED)
{
/// We create a table with the same name as original table and the same alias columns,
/// but it will contain single block (that is INSERT-ed into main table).
/// InterpreterSelectQuery will do processing of alias columns.
auto local_context = Context::createCopy(select_context);
local_context->addViewSource(
StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals()));
select.emplace(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select->execute().getInputStream());
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
in = std::make_shared<SquashingBlockInputStream>(
in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes);
in = std::make_shared<ConvertingBlockInputStream>(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name);
}
else
in = std::make_shared<OneBlockInputStream>(block);
in->setProgressCallback([this](const Progress & progress)
{
CurrentThread::updateProgressIn(progress);
this->onProgress(progress);
});
in->readPrefix();
while (Block result_block = in->read())
{
Nested::validateArraySizes(result_block);
view.out->write(result_block);
}
in->readSuffix();
}
catch (Exception & ex)
{
ex.addMessage("while pushing to view " + view.table_id.getNameForLogs());
view.setException(std::current_exception());
}
catch (...)
{
view.setException(std::current_exception());
}
if (view.runtime_stats.thread_status)
view.runtime_stats.thread_status->updatePerformanceCounters();
view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds();
}
void PushingToViewsBlockOutputStream::processPrefix(ViewInfo & view)
{
Stopwatch watch;
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
if (view.runtime_stats.thread_status)
{
/// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread
*view.runtime_stats.thread_status->last_rusage = RUsageCounters::current();
if (view.runtime_stats.thread_status->taskstats)
view.runtime_stats.thread_status->taskstats->reset();
current_thread = view.runtime_stats.thread_status.get();
}
try
{
view.out->writePrefix();
}
catch (Exception & ex)
{
ex.addMessage("while writing prefix to view " + view.table_id.getNameForLogs());
view.setException(std::current_exception());
}
catch (...)
{
view.setException(std::current_exception());
}
if (view.runtime_stats.thread_status)
view.runtime_stats.thread_status->updatePerformanceCounters();
view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds();
}
void PushingToViewsBlockOutputStream::processSuffix(ViewInfo & view)
{
Stopwatch watch;
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
if (view.runtime_stats.thread_status)
{
/// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread
*view.runtime_stats.thread_status->last_rusage = RUsageCounters::current();
if (view.runtime_stats.thread_status->taskstats)
view.runtime_stats.thread_status->taskstats->reset();
current_thread = view.runtime_stats.thread_status.get();
}
try
{
view.out->writeSuffix();
view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::QUERY_FINISH);
}
catch (Exception & ex)
{
ex.addMessage("while writing suffix to view " + view.table_id.getNameForLogs());
view.setException(std::current_exception());
}
catch (...)
{
view.setException(std::current_exception());
}
if (view.runtime_stats.thread_status)
view.runtime_stats.thread_status->updatePerformanceCounters();
view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds();
if (!view.exception)
{
LOG_TRACE(
log,
"Pushing from {} to {} took {} ms.",
storage->getStorageID().getNameForLogs(),
view.table_id.getNameForLogs(),
view.runtime_stats.elapsed_ms);
}
in->readSuffix();
}
void PushingToViewsBlockOutputStream::checkExceptionsInViews()

View File

@ -16,7 +16,7 @@ namespace DB
class ReplicatedMergeTreeSink;
struct ViewInfo
struct ViewRuntimeData
{
const ASTPtr query;
StorageID table_id;
@ -61,13 +61,11 @@ private:
ASTPtr query_ptr;
Stopwatch main_watch;
std::vector<ViewInfo> views;
std::vector<ViewRuntimeData> views;
ContextMutablePtr select_context;
ContextMutablePtr insert_context;
void process(const Block & block, ViewInfo & view);
static void processPrefix(ViewInfo & view);
void processSuffix(ViewInfo & view);
void process(const Block & block, ViewRuntimeData & view);
void checkExceptionsInViews();
void logQueryViews();
};

View File

@ -300,6 +300,13 @@ void ThreadStatus::finalizePerformanceCounters()
}
}
void ThreadStatus::resetPerformanceCountersLastUsage()
{
*last_rusage = RUsageCounters::current();
if (taskstats)
taskstats->reset();
}
void ThreadStatus::initQueryProfiler()
{
if (!query_profiled_enabled)
@ -482,7 +489,7 @@ static String getCleanQueryAst(const ASTPtr q, ContextPtr context)
return res;
}
void ThreadStatus::logToQueryViewsLog(const ViewInfo & vinfo)
void ThreadStatus::logToQueryViewsLog(const ViewRuntimeData & vinfo)
{
auto query_context_ptr = query_context.lock();
if (!query_context_ptr)