From 07c71c226e96ea7510844a35115f6c0e9e33c3ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Tue, 15 Jun 2021 19:10:41 +0200 Subject: [PATCH 01/48] Add QueryMaterializationLog class --- src/Interpreters/QueryMaterializationLog.cpp | 97 ++++++++++++++++++++ src/Interpreters/QueryMaterializationLog.h | 61 ++++++++++++ src/Interpreters/SystemLog.h | 3 + src/Interpreters/ya.make | 1 + 4 files changed, 162 insertions(+) create mode 100644 src/Interpreters/QueryMaterializationLog.cpp create mode 100644 src/Interpreters/QueryMaterializationLog.h diff --git a/src/Interpreters/QueryMaterializationLog.cpp b/src/Interpreters/QueryMaterializationLog.cpp new file mode 100644 index 00000000000..a900a70ac5f --- /dev/null +++ b/src/Interpreters/QueryMaterializationLog.cpp @@ -0,0 +1,97 @@ +#include "QueryMaterializationLog.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +Block QueryMaterializationLogElement::createBlock() +{ + auto query_status_datatype = std::make_shared(DataTypeEnum8::Values{ + {"QueryStart", static_cast(QUERY_START)}, + {"QueryFinish", static_cast(QUERY_FINISH)}, + {"ExceptionBeforeStart", static_cast(EXCEPTION_BEFORE_START)}, + {"ExceptionWhileProcessing", static_cast(EXCEPTION_WHILE_PROCESSING)}}); + + return { + {std::make_shared(), "event_date"}, + {std::make_shared(), "event_time"}, + {std::make_shared(6), "event_time_microseconds"}, + {std::make_shared(), "materialization_start_time"}, + {std::make_shared(6), "materialization_start_time_microseconds"}, + {std::make_shared(), "materialization_duration_ms"}, + + {std::make_shared(), "initial_query_id"}, + {std::make_shared(), "materialization_name"}, + {std::make_shared(), "materialization_uuid"}, + {std::make_shared(), "materialization_query"}, + + {std::make_shared(), "read_rows"}, + {std::make_shared(), "read_bytes"}, + {std::make_shared(), "written_rows"}, + {std::make_shared(), "written_bytes"}, + {std::make_shared(), "memory_usage"}, + {std::make_shared(), "peak_memory_usage"}, + {std::make_shared(std::make_shared()), "ProfileEvents.Names"}, + {std::make_shared(std::make_shared()), "ProfileEvents.Values"}, + + {std::move(query_status_datatype), "end_status"}, + {std::make_shared(), "exception_code"}, + {std::make_shared(), "exception"}, + {std::make_shared(), "stack_trace"}}; +} + +void QueryMaterializationLogElement::appendToBlock(MutableColumns & columns) const +{ + size_t i = 0; + + columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType()); // event_date + columns[i++]->insert(event_time); + columns[i++]->insert(event_time_microseconds); + columns[i++]->insert(materialization_start_time); + columns[i++]->insert(materialization_start_time_microseconds); + columns[i++]->insert(materialization_duration_ms); + + columns[i++]->insertData(initial_query_id.data(), initial_query_id.size()); + columns[i++]->insertData(materialization_name.data(), materialization_name.size()); + columns[i++]->insert(materialization_uuid); + columns[i++]->insertData(materialization_query.data(), materialization_query.size()); + + columns[i++]->insert(read_rows); + columns[i++]->insert(read_bytes); + columns[i++]->insert(written_rows); + columns[i++]->insert(written_bytes); + columns[i++]->insert(memory_usage); + columns[i++]->insert(peak_memory_usage); + + if (profile_counters) + { + auto * column_names = columns[i++].get(); + auto * column_values = columns[i++].get(); + ProfileEvents::dumpToArrayColumns(*profile_counters, column_names, column_values, true); + } + else + { + columns[i++]->insertDefault(); + columns[i++]->insertDefault(); + } + + columns[i++]->insert(end_status); + columns[i++]->insert(exception_code); + columns[i++]->insertData(exception.data(), exception.size()); + columns[i++]->insertData(stack_trace.data(), stack_trace.size()); +} + +} diff --git a/src/Interpreters/QueryMaterializationLog.h b/src/Interpreters/QueryMaterializationLog.h new file mode 100644 index 00000000000..ce642cd1126 --- /dev/null +++ b/src/Interpreters/QueryMaterializationLog.h @@ -0,0 +1,61 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace ProfileEvents +{ +class Counters; +} + +namespace DB +{ +struct QueryMaterializationLogElement +{ + using Status = QueryLogElementType; + + time_t event_time{}; + Decimal64 event_time_microseconds{}; + time_t materialization_start_time{}; + Decimal64 materialization_start_time_microseconds{}; + UInt64 materialization_duration_ms{}; + + String initial_query_id; + String materialization_name; + UUID materialization_uuid{UUIDHelpers::Nil}; + String materialization_query; + + UInt64 read_rows{}; + UInt64 read_bytes{}; + UInt64 written_rows{}; + UInt64 written_bytes{}; + Int64 memory_usage{}; + Int64 peak_memory_usage{}; + std::shared_ptr profile_counters; + + Status end_status{EXCEPTION_BEFORE_START}; + Int32 exception_code{}; + String exception; + String stack_trace; + + static std::string name() { return "QueryMutationLog"; } + + static Block createBlock(); + void appendToBlock(MutableColumns & columns) const; +}; + + +class QueryMaterializationLog : public SystemLog +{ + using SystemLog::SystemLog; +}; + +} diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index ee3116362e5..b95cc532236 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -74,6 +74,7 @@ class CrashLog; class MetricLog; class AsynchronousMetricLog; class OpenTelemetrySpanLog; +class QueryMaterializationLog; class ISystemLog @@ -110,6 +111,8 @@ struct SystemLogs std::shared_ptr asynchronous_metric_log; /// OpenTelemetry trace spans. std::shared_ptr opentelemetry_span_log; + /// Used to log queries of materialized views + std::shared_ptr query_materialization_log; std::vector logs; }; diff --git a/src/Interpreters/ya.make b/src/Interpreters/ya.make index 17157fe3a8c..5b0c6f0d9b7 100644 --- a/src/Interpreters/ya.make +++ b/src/Interpreters/ya.make @@ -127,6 +127,7 @@ SRCS( ProfileEventsExt.cpp QueryAliasesVisitor.cpp QueryLog.cpp + QueryMaterializationLog.cpp QueryNormalizer.cpp QueryParameterVisitor.cpp QueryThreadLog.cpp From c5b14f50751ce5cd7a4c88eff4e44b8f2394ba69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Wed, 16 Jun 2021 11:45:43 +0200 Subject: [PATCH 02/48] Add materialization log to SystemLog --- src/Interpreters/SystemLog.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 31ceca8ec05..71190a84a83 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -1,13 +1,14 @@ -#include -#include -#include -#include -#include -#include +#include #include #include -#include #include +#include +#include +#include +#include +#include +#include +#include #include #include @@ -103,6 +104,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf opentelemetry_span_log = createSystemLog( global_context, "system", "opentelemetry_span_log", config, "opentelemetry_span_log"); + query_materialization_log = createSystemLog( + global_context, "system", "query_materialization_log", config, "query_materialization_log"); if (query_log) logs.emplace_back(query_log.get()); @@ -122,6 +125,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf logs.emplace_back(asynchronous_metric_log.get()); if (opentelemetry_span_log) logs.emplace_back(opentelemetry_span_log.get()); + if (query_materialization_log) + logs.emplace_back(query_materialization_log.get()); try { From f34cb886ccc25170d0949ba3d6ed5c5f8c585757 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Wed, 16 Jun 2021 12:26:18 +0200 Subject: [PATCH 03/48] Add materialization log accessor to Context --- src/Interpreters/Context.cpp | 9 +++++++++ src/Interpreters/Context.h | 2 ++ 2 files changed, 11 insertions(+) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 3e080eb4b4f..5743164f280 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2052,6 +2052,15 @@ std::shared_ptr Context::getQueryLog() const return shared->system_logs->query_log; } +std::shared_ptr Context::getQueryMaterializationLog() const +{ + auto lock = getLock(); + + if (!shared->system_logs) + return {}; + + return shared->system_logs->query_materialization_log; +} std::shared_ptr Context::getQueryThreadLog() const { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 2c500c4166b..17d27b45b25 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -69,6 +69,7 @@ class Macros; struct Progress; class Clusters; class QueryLog; +class QueryMaterializationLog; class QueryThreadLog; class PartLog; class TextLog; @@ -708,6 +709,7 @@ public: /// Nullptr if the query log is not ready for this moment. std::shared_ptr getQueryLog() const; + std::shared_ptr getQueryMaterializationLog() const; std::shared_ptr getQueryThreadLog() const; std::shared_ptr getTraceLog() const; std::shared_ptr getTextLog() const; From bc1ccd2d62596b826e090e956675686a8ba8931f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Wed, 16 Jun 2021 12:42:10 +0200 Subject: [PATCH 04/48] Flush materialization log on FLUSH_LOGS --- src/Interpreters/InterpreterSystemQuery.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index a682a029258..bcdd2225d68 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -417,6 +418,7 @@ BlockIO InterpreterSystemQuery::execute() [&] { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); }, [&] { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); }, [&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); } + [&] () { if (auto query_materialization_log = getContext()->getQueryMaterializationLog()) query_materialization_log->flush(true); } ); break; } From ea5c02a605cef90a57e618bcbde61ad8977880a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Fri, 18 Jun 2021 15:44:08 +0200 Subject: [PATCH 05/48] WIP --- programs/server/config.xml | 8 + src/Common/ThreadStatus.h | 4 + src/Core/Settings.h | 3 +- .../PushingToViewsBlockOutputStream.cpp | 291 +++++++++++------- .../PushingToViewsBlockOutputStream.h | 21 +- src/Interpreters/Context.cpp | 19 +- src/Interpreters/Context.h | 4 +- src/Interpreters/InterpreterSystemQuery.cpp | 4 +- ...terializationLog.cpp => QueryViewsLog.cpp} | 35 ++- ...ryMaterializationLog.h => QueryViewsLog.h} | 39 ++- src/Interpreters/SystemLog.cpp | 9 +- src/Interpreters/SystemLog.h | 6 +- src/Interpreters/ThreadStatusExt.cpp | 44 ++- src/Interpreters/ya.make | 2 +- 14 files changed, 321 insertions(+), 168 deletions(-) rename src/Interpreters/{QueryMaterializationLog.cpp => QueryViewsLog.cpp} (73%) rename src/Interpreters/{QueryMaterializationLog.h => QueryViewsLog.h} (51%) diff --git a/programs/server/config.xml b/programs/server/config.xml index 6f0b228dda7..3f7f6c25e24 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -955,6 +955,14 @@ 1000 + + + system + query_views_log
+ toYYYYMM(event_date) + 7500 +
+ diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 6fc43114621..86aa8cc52a1 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -37,6 +37,8 @@ struct RUsageCounters; struct PerfEventsCounters; class TaskStatsInfoGetter; class InternalTextLogsQueue; +struct ViewInfo; +class QueryViewsLog; using InternalTextLogsQueuePtr = std::shared_ptr; using InternalTextLogsQueueWeakPtr = std::weak_ptr; @@ -213,6 +215,8 @@ public: /// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false); + void logToQueryViewsLog(QueryViewsLog & views_log, const ViewInfo & vinfo); + protected: void applyQuerySettings(); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index e53db255d20..54e1f00832c 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -351,7 +351,8 @@ class IColumn; \ M(Bool, log_profile_events, true, "Log query performance statistics into the query_log and query_thread_log.", 0) \ M(Bool, log_query_settings, true, "Log query settings into the query_log.", 0) \ - M(Bool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.", 0) \ + M(Bool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.", 0) \ + M(Bool, log_query_views, false, "Log query dependent views into system.query_views_log table. This setting have effect only when 'log_queries' is true.", 0) \ M(String, log_comment, "", "Log comment into system.query_log table and server log. It can be set to arbitrary string no longer than max_query_size.", 0) \ M(LogsLevel, send_logs_level, LogsLevel::fatal, "Send server text logs with specified minimum level to client. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \ M(Bool, enable_optimize_predicate_expression, 1, "If it is set to true, optimize predicates to subqueries.", 0) \ diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 7729eb5fb44..2501d4dd42e 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -1,25 +1,25 @@ #include +#include +#include #include #include -#include -#include #include #include -#include -#include #include +#include +#include #include -#include -#include -#include -#include -#include +#include +#include #include #include #include #include #include +#include +#include + namespace DB { @@ -79,9 +79,12 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( ASTPtr query; BlockOutputStreamPtr out; + QueryViewsLogElement::ViewType type = QueryViewsLogElement::ViewType::DEFAULT; + String target_name = database_table.getNameForLogs(); if (auto * materialized_view = dynamic_cast(dependent_table.get())) { + type = QueryViewsLogElement::ViewType::MATERIALIZED; addTableLock( materialized_view->lockForShare(getContext()->getInitialQueryId(), getContext()->getSettingsRef().lock_acquire_timeout)); @@ -89,6 +92,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( auto inner_table_id = inner_table->getStorageID(); auto inner_metadata_snapshot = inner_table->getInMemoryMetadataPtr(); query = dependent_metadata_snapshot->getSelectQuery().inner_query; + target_name = inner_table_id.getNameForLogs(); std::unique_ptr insert = std::make_unique(); insert->table_id = inner_table_id; @@ -113,15 +117,27 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( InterpreterInsertQuery interpreter(insert_query_ptr, insert_context); BlockIO io = interpreter.execute(); out = io.out; + LOG_WARNING( + log, + "Pushing from {} to {} {}.", + storage->getStorageID().getNameForLogs(), + inner_table_id.getNameForLogs(), + inner_table->getStorageID().getFullTableName()); } - else if (dynamic_cast(dependent_table.get())) + else if (auto * live_view = dynamic_cast(dependent_table.get())) + { + type = QueryViewsLogElement::ViewType::LIVE; + query = live_view->getInnerQuery(); // TODO: Optimize this out = std::make_shared( dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true); + } else out = std::make_shared( dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr()); - views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, 0 /* elapsed_ms */}); + QueryViewsLogElement::ViewRuntimeStats runtime_stats{ + 0, type, std::make_shared(), select_context->getInitialQueryId(), std::chrono::system_clock::now(), target_name}; + views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)}); } /// Do not push to destination table if the flag is set @@ -169,41 +185,64 @@ void PushingToViewsBlockOutputStream::write(const Block & block) output->write(block); } + if (!views.size()) + return; + /// Don't process materialized views if this block is duplicate if (!getContext()->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views && replicated_output && replicated_output->lastBlockIsDuplicate()) return; - // Insert data into materialized views only after successful insert into main table + // Push to each view. Only parallel if available const Settings & settings = getContext()->getSettingsRef(); - if (settings.parallel_view_processing && views.size() > 1) + const size_t max_threads = settings.parallel_view_processing ? settings.max_threads : 1; + ThreadPool pool(std::min(max_threads, views.size())); + for (auto & view : views) { - // Push to views concurrently if enabled and more than one view is attached - ThreadPool pool(std::min(size_t(settings.max_threads), views.size())); - for (auto & view : views) - { - auto thread_group = CurrentThread::getGroup(); - pool.scheduleOrThrowOnError([=, &view, this] - { - setThreadName("PushingToViews"); - if (thread_group) - CurrentThread::attachToIfDetached(thread_group); - process(block, view); - }); - } - // Wait for concurrent view processing - pool.wait(); - } - else - { - // Process sequentially - for (auto & view : views) - { - process(block, view); + auto thread_group = CurrentThread::getGroup(); + pool.scheduleOrThrowOnError([=, &view, this] { + LOG_WARNING( + log, + "Pushing from {} to {}. {}. Current thread {} {} {} {}. WRITE START", + storage->getStorageID().getNameForLogs(), + view.table_id.getNameForLogs(), + uint64_t(view.out.get()), + view.runtime_stats.thread_status->progress_in.read_rows, + view.runtime_stats.thread_status->progress_in.written_rows, + view.runtime_stats.thread_status->progress_out.read_rows, + view.runtime_stats.thread_status->progress_out.written_rows); + // current_thread = view.thread_status.get(); - if (view.exception) - std::rethrow_exception(view.exception); - } + setThreadName("PushingToViews"); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); + + Stopwatch watch; + try + { + process(block, view); + } + catch (...) + { + view.exception = std::current_exception(); + // TODO: Stop processing on exception + } + view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); + // TODO: Update other counters + LOG_WARNING( + log, + "Pushing from {} to {}. {}. Current thread {} {} {} {}. WRITE END", + storage->getStorageID().getNameForLogs(), + view.table_id.getNameForLogs(), + uint64_t(view.out.get()), + view.runtime_stats.thread_status->progress_in.read_rows, + view.runtime_stats.thread_status->progress_in.written_rows, + view.runtime_stats.thread_status->progress_out.read_rows, + view.runtime_stats.thread_status->progress_out.written_rows); + }); } + // Wait for concurrent view processing + pool.wait(); + check_exceptions_in_views(); } void PushingToViewsBlockOutputStream::writePrefix() @@ -213,104 +252,97 @@ void PushingToViewsBlockOutputStream::writePrefix() for (auto & view : views) { + Stopwatch watch; try { view.out->writePrefix(); } catch (Exception & ex) { + view.exception = std::current_exception(); ex.addMessage("while write prefix to view " + view.table_id.getNameForLogs()); + log_query_views(); throw; } + view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); } } void PushingToViewsBlockOutputStream::writeSuffix() { + LOG_WARNING(log, "STARTING {} WITH {}", uint64_t(this), views.size()); + if (output) output->writeSuffix(); + if (!views.size()) + return; std::exception_ptr first_exception; - const Settings & settings = getContext()->getSettingsRef(); - bool parallel_processing = false; /// Run writeSuffix() for views in separate thread pool. /// In could have been done in PushingToViewsBlockOutputStream::process, however /// it is not good if insert into main table fail but into view succeed. - if (settings.parallel_view_processing && views.size() > 1) - { - parallel_processing = true; - - // Push to views concurrently if enabled and more than one view is attached - ThreadPool pool(std::min(size_t(settings.max_threads), views.size())); - auto thread_group = CurrentThread::getGroup(); - - for (auto & view : views) - { - if (view.exception) - continue; - - pool.scheduleOrThrowOnError([thread_group, &view, this] - { - setThreadName("PushingToViews"); - if (thread_group) - CurrentThread::attachToIfDetached(thread_group); - - Stopwatch watch; - try - { - view.out->writeSuffix(); - } - catch (...) - { - view.exception = std::current_exception(); - } - view.elapsed_ms += watch.elapsedMilliseconds(); - - LOG_TRACE(log, "Pushing from {} to {} took {} ms.", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - view.elapsed_ms); - }); - } - // Wait for concurrent view processing - pool.wait(); - } + const size_t max_threads = settings.parallel_view_processing ? settings.max_threads : 1; + ThreadPool pool(std::min(max_threads, views.size())); + auto thread_group = CurrentThread::getGroup(); for (auto & view : views) { if (view.exception) - { - if (!first_exception) - first_exception = view.exception; - - continue; - } - - if (parallel_processing) continue; - Stopwatch watch; - try - { - view.out->writeSuffix(); - } - catch (Exception & ex) - { - ex.addMessage("while write prefix to view " + view.table_id.getNameForLogs()); - throw; - } - view.elapsed_ms += watch.elapsedMilliseconds(); + pool.scheduleOrThrowOnError([&] { + LOG_WARNING( + log, + "Pushing from {} to {}. {}. Current thread {} {} {} {}. WRITESUFFIX START", + storage->getStorageID().getNameForLogs(), + view.table_id.getNameForLogs(), + uint64_t(view.out.get()), + view.runtime_stats.thread_status->progress_in.read_rows, + view.runtime_stats.thread_status->progress_in.written_rows, + view.runtime_stats.thread_status->progress_out.read_rows, + view.runtime_stats.thread_status->progress_out.written_rows); + // current_thread = view.thread_status.get(); + setThreadName("PushingToViews"); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); - LOG_TRACE(log, "Pushing from {} to {} took {} ms.", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - view.elapsed_ms); + Stopwatch watch; + try + { + LOG_WARNING(log, "BEFORE CALL {} -> {}", uint64_t(this), uint64_t(view.out.get())); + view.out->writeSuffix(); + LOG_WARNING(log, "AFTER CALL {} -> {}", uint64_t(this), uint64_t(view.out.get())); + //Set status here + } + catch (...) + { + view.exception = std::current_exception(); + } + view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); + // TODO: Update other counters + LOG_TRACE( + log, + "Pushing from {} to {} took {} ms.", + storage->getStorageID().getNameForLogs(), + view.table_id.getNameForLogs(), + view.runtime_stats.elapsed_ms); + LOG_WARNING( + log, + "Pushing from {} to {}. {}. Current thread {} {} {} {}. WRITESUFFIX END", + storage->getStorageID().getNameForLogs(), + view.table_id.getNameForLogs(), + uint64_t(view.out.get()), + view.runtime_stats.thread_status->progress_in.read_rows, + view.runtime_stats.thread_status->progress_in.written_rows, + view.runtime_stats.thread_status->progress_out.read_rows, + view.runtime_stats.thread_status->progress_out.written_rows); + }); } - - if (first_exception) - std::rethrow_exception(first_exception); + // Wait for concurrent view processing + pool.wait(); + check_exceptions_in_views(); UInt64 milliseconds = main_watch.elapsedMilliseconds(); if (views.size() > 1) @@ -319,6 +351,8 @@ void PushingToViewsBlockOutputStream::writeSuffix() storage->getStorageID().getNameForLogs(), views.size(), milliseconds); } + LOG_WARNING(log, "FINISHING {}", uint64_t(this)); + log_query_views(); } void PushingToViewsBlockOutputStream::flush() @@ -332,8 +366,6 @@ void PushingToViewsBlockOutputStream::flush() void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & view) { - Stopwatch watch; - try { BlockInputStreamPtr in; @@ -392,8 +424,49 @@ void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & vi { view.exception = std::current_exception(); } - - view.elapsed_ms += watch.elapsedMilliseconds(); } +void PushingToViewsBlockOutputStream::check_exceptions_in_views() +{ + for (auto & view : views) + { + if (view.exception) + { + LOG_WARNING(log, "View exception {}", view.table_id.getNameForLogs()); + log_query_views(); + std::rethrow_exception(view.exception); + } + } +} + +void PushingToViewsBlockOutputStream::log_query_views() +{ + // TODO: Check settings + auto views_log = getContext()->getQueryViewsLog(); + if (!views_log) + { + LOG_WARNING(log, "NO VIEWS LOG"); // NOCHECKIN + return; + } + for (auto const & view : views) + { + LOG_WARNING( + log, + "LOG LOG LOG from {} to {}. {}. Progress {} {} {} {}", + storage->getStorageID().getNameForLogs(), + view.table_id.getNameForLogs(), + uint64_t(view.out.get()), + view.runtime_stats.thread_status->progress_in.read_rows, + view.runtime_stats.thread_status->progress_in.written_rows, + view.runtime_stats.thread_status->progress_out.read_rows, + view.runtime_stats.thread_status->progress_out.written_rows); + try + { + view.runtime_stats.thread_status->logToQueryViewsLog(*views_log, view); + } + catch (...) + { + } + } +} } diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.h b/src/DataStreams/PushingToViewsBlockOutputStream.h index db6b671ce2c..70d5f0982b0 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -15,6 +16,15 @@ namespace DB class ReplicatedMergeTreeSink; +struct ViewInfo +{ + ASTPtr query; + StorageID table_id; + BlockOutputStreamPtr out; + std::exception_ptr exception; + QueryViewsLogElement::ViewRuntimeStats runtime_stats; +}; + /** Writes data to the specified table and to all dependent materialized views. */ class PushingToViewsBlockOutputStream : public IBlockOutputStream, WithContext @@ -44,20 +54,13 @@ private: ASTPtr query_ptr; Stopwatch main_watch; - struct ViewInfo - { - ASTPtr query; - StorageID table_id; - BlockOutputStreamPtr out; - std::exception_ptr exception; - UInt64 elapsed_ms = 0; - }; - std::vector views; ContextMutablePtr select_context; ContextMutablePtr insert_context; void process(const Block & block, ViewInfo & view); + void check_exceptions_in_views(); + void log_query_views(); }; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 5743164f280..798efeed150 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2052,16 +2052,6 @@ std::shared_ptr Context::getQueryLog() const return shared->system_logs->query_log; } -std::shared_ptr Context::getQueryMaterializationLog() const -{ - auto lock = getLock(); - - if (!shared->system_logs) - return {}; - - return shared->system_logs->query_materialization_log; -} - std::shared_ptr Context::getQueryThreadLog() const { auto lock = getLock(); @@ -2072,6 +2062,15 @@ std::shared_ptr Context::getQueryThreadLog() const return shared->system_logs->query_thread_log; } +std::shared_ptr Context::getQueryViewsLog() const +{ + auto lock = getLock(); + + if (!shared->system_logs) + return {}; + + return shared->system_logs->query_views_log; +} std::shared_ptr Context::getPartLog(const String & part_database) const { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 17d27b45b25..b8b4ef37399 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -69,8 +69,8 @@ class Macros; struct Progress; class Clusters; class QueryLog; -class QueryMaterializationLog; class QueryThreadLog; +class QueryViewsLog; class PartLog; class TextLog; class TraceLog; @@ -709,8 +709,8 @@ public: /// Nullptr if the query log is not ready for this moment. std::shared_ptr getQueryLog() const; - std::shared_ptr getQueryMaterializationLog() const; std::shared_ptr getQueryThreadLog() const; + std::shared_ptr getQueryViewsLog() const; std::shared_ptr getTraceLog() const; std::shared_ptr getTextLog() const; std::shared_ptr getMetricLog() const; diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index bcdd2225d68..eb150356fee 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -19,8 +19,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -418,7 +418,7 @@ BlockIO InterpreterSystemQuery::execute() [&] { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); }, [&] { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); }, [&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); } - [&] () { if (auto query_materialization_log = getContext()->getQueryMaterializationLog()) query_materialization_log->flush(true); } + [&] () { if (auto query_views_log = getContext()->getQueryViewsLog()) query_views_log->flush(true); } ); break; } diff --git a/src/Interpreters/QueryMaterializationLog.cpp b/src/Interpreters/QueryViewsLog.cpp similarity index 73% rename from src/Interpreters/QueryMaterializationLog.cpp rename to src/Interpreters/QueryViewsLog.cpp index a900a70ac5f..09d15cc1769 100644 --- a/src/Interpreters/QueryMaterializationLog.cpp +++ b/src/Interpreters/QueryViewsLog.cpp @@ -1,4 +1,4 @@ -#include "QueryMaterializationLog.h" +#include "QueryViewsLog.h" #include #include @@ -17,7 +17,7 @@ namespace DB { -Block QueryMaterializationLogElement::createBlock() +Block QueryViewsLogElement::createBlock() { auto query_status_datatype = std::make_shared(DataTypeEnum8::Values{ {"QueryStart", static_cast(QUERY_START)}, @@ -25,18 +25,23 @@ Block QueryMaterializationLogElement::createBlock() {"ExceptionBeforeStart", static_cast(EXCEPTION_BEFORE_START)}, {"ExceptionWhileProcessing", static_cast(EXCEPTION_WHILE_PROCESSING)}}); + auto view_type_datatype = std::make_shared(DataTypeEnum8::Values{ + {"Default", static_cast(ViewType::DEFAULT)}, + {"Materialized", static_cast(ViewType::MATERIALIZED)}, + {"Live", static_cast(ViewType::LIVE)}}); + return { {std::make_shared(), "event_date"}, {std::make_shared(), "event_time"}, {std::make_shared(6), "event_time_microseconds"}, - {std::make_shared(), "materialization_start_time"}, - {std::make_shared(6), "materialization_start_time_microseconds"}, - {std::make_shared(), "materialization_duration_ms"}, + {std::make_shared(), "view_duration_ms"}, {std::make_shared(), "initial_query_id"}, - {std::make_shared(), "materialization_name"}, - {std::make_shared(), "materialization_uuid"}, - {std::make_shared(), "materialization_query"}, + {std::make_shared(), "view_name"}, + {std::make_shared(), "view_uuid"}, + {std::move(view_type_datatype), "view_type"}, + {std::make_shared(), "view_query"}, + {std::make_shared(), "view_target"}, {std::make_shared(), "read_rows"}, {std::make_shared(), "read_bytes"}, @@ -53,21 +58,21 @@ Block QueryMaterializationLogElement::createBlock() {std::make_shared(), "stack_trace"}}; } -void QueryMaterializationLogElement::appendToBlock(MutableColumns & columns) const +void QueryViewsLogElement::appendToBlock(MutableColumns & columns) const { size_t i = 0; columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType()); // event_date columns[i++]->insert(event_time); columns[i++]->insert(event_time_microseconds); - columns[i++]->insert(materialization_start_time); - columns[i++]->insert(materialization_start_time_microseconds); - columns[i++]->insert(materialization_duration_ms); + columns[i++]->insert(view_duration_ms); columns[i++]->insertData(initial_query_id.data(), initial_query_id.size()); - columns[i++]->insertData(materialization_name.data(), materialization_name.size()); - columns[i++]->insert(materialization_uuid); - columns[i++]->insertData(materialization_query.data(), materialization_query.size()); + columns[i++]->insertData(view_name.data(), view_name.size()); + columns[i++]->insert(view_uuid); + columns[i++]->insert(view_type); + columns[i++]->insertData(view_query.data(), view_query.size()); + columns[i++]->insertData(view_target.data(), view_target.size()); columns[i++]->insert(read_rows); columns[i++]->insert(read_bytes); diff --git a/src/Interpreters/QueryMaterializationLog.h b/src/Interpreters/QueryViewsLog.h similarity index 51% rename from src/Interpreters/QueryMaterializationLog.h rename to src/Interpreters/QueryViewsLog.h index ce642cd1126..661d24abfcb 100644 --- a/src/Interpreters/QueryMaterializationLog.h +++ b/src/Interpreters/QueryViewsLog.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -18,20 +19,38 @@ class Counters; namespace DB { -struct QueryMaterializationLogElement +class ThreadStatus; + +struct QueryViewsLogElement { using Status = QueryLogElementType; + enum class ViewType : int8_t + { + DEFAULT, + MATERIALIZED, + LIVE + }; + + struct ViewRuntimeStats + { + UInt64 elapsed_ms = 0; + ViewType type = ViewType::DEFAULT; + std::shared_ptr thread_status = std::make_shared(); + String initial_query_id; + std::chrono::time_point start; + String target_name; + }; time_t event_time{}; Decimal64 event_time_microseconds{}; - time_t materialization_start_time{}; - Decimal64 materialization_start_time_microseconds{}; - UInt64 materialization_duration_ms{}; + UInt64 view_duration_ms{}; String initial_query_id; - String materialization_name; - UUID materialization_uuid{UUIDHelpers::Nil}; - String materialization_query; + String view_name; + UUID view_uuid{UUIDHelpers::Nil}; + ViewType view_type{ViewType::DEFAULT}; + String view_query; + String view_target; UInt64 read_rows{}; UInt64 read_bytes{}; @@ -46,16 +65,16 @@ struct QueryMaterializationLogElement String exception; String stack_trace; - static std::string name() { return "QueryMutationLog"; } + static std::string name() { return "QueryViewsLog"; } static Block createBlock(); void appendToBlock(MutableColumns & columns) const; }; -class QueryMaterializationLog : public SystemLog +class QueryViewsLog : public SystemLog { - using SystemLog::SystemLog; + using SystemLog::SystemLog; }; } diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 71190a84a83..a7400c59e76 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -4,8 +4,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -104,8 +104,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf opentelemetry_span_log = createSystemLog( global_context, "system", "opentelemetry_span_log", config, "opentelemetry_span_log"); - query_materialization_log = createSystemLog( - global_context, "system", "query_materialization_log", config, "query_materialization_log"); + query_views_log = createSystemLog(global_context, "system", "query_views_log", config, "query_views_log"); if (query_log) logs.emplace_back(query_log.get()); @@ -125,8 +124,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf logs.emplace_back(asynchronous_metric_log.get()); if (opentelemetry_span_log) logs.emplace_back(opentelemetry_span_log.get()); - if (query_materialization_log) - logs.emplace_back(query_materialization_log.get()); + if (query_views_log) + logs.emplace_back(query_views_log.get()); try { diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index b95cc532236..ee839ecf4ff 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -74,7 +74,7 @@ class CrashLog; class MetricLog; class AsynchronousMetricLog; class OpenTelemetrySpanLog; -class QueryMaterializationLog; +class QueryViewsLog; class ISystemLog @@ -111,8 +111,8 @@ struct SystemLogs std::shared_ptr asynchronous_metric_log; /// OpenTelemetry trace spans. std::shared_ptr opentelemetry_span_log; - /// Used to log queries of materialized views - std::shared_ptr query_materialization_log; + /// Used to log queries of materialized and live views + std::shared_ptr query_views_log; std::vector logs; }; diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 8590b3c94f3..e8a2c529dcb 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -1,9 +1,12 @@ #include +#include #include -#include #include +#include #include +#include +#include #include #include #include @@ -455,6 +458,45 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String thread_log.add(elem); } +void ThreadStatus::logToQueryViewsLog(QueryViewsLog & views_log, const ViewInfo & vinfo) +{ + QueryViewsLogElement element; + + element.event_time = time_in_seconds(vinfo.runtime_stats.start); + element.event_time_microseconds = time_in_microseconds(vinfo.runtime_stats.start); + element.view_duration_ms = vinfo.runtime_stats.elapsed_ms; + + element.initial_query_id = vinfo.runtime_stats.initial_query_id; + element.view_name = vinfo.table_id.getNameForLogs(); + element.view_uuid = vinfo.table_id.uuid; + element.view_type = vinfo.runtime_stats.type; + if (vinfo.query) + element.view_query = serializeAST(*vinfo.query, true); // TODO: Anonymize like query_log ? + element.view_target = vinfo.runtime_stats.target_name; + + element.read_rows = progress_in.read_rows.load(std::memory_order_relaxed); + element.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed); + element.written_rows = progress_out.written_rows.load(std::memory_order_relaxed); + element.written_bytes = progress_out.written_bytes.load(std::memory_order_relaxed); + element.memory_usage = memory_tracker.get(); + element.peak_memory_usage = memory_tracker.getPeak(); + // if (query_context_ptr->getSettingsRef().log_profile_events != 0) // TODO + { + element.profile_counters = std::make_shared(performance_counters.getPartiallyAtomicSnapshot()); + } + + element.end_status = EXCEPTION_BEFORE_START; + element.exception_code = 0; + if (vinfo.exception) + { + element.exception_code = 0; + element.exception = "TODO"; + element.stack_trace = "TODO"; + } + + views_log.add(element); +} + void CurrentThread::initializeQuery() { if (unlikely(!current_thread)) diff --git a/src/Interpreters/ya.make b/src/Interpreters/ya.make index 5b0c6f0d9b7..f1d9e45a3a6 100644 --- a/src/Interpreters/ya.make +++ b/src/Interpreters/ya.make @@ -127,10 +127,10 @@ SRCS( ProfileEventsExt.cpp QueryAliasesVisitor.cpp QueryLog.cpp - QueryMaterializationLog.cpp QueryNormalizer.cpp QueryParameterVisitor.cpp QueryThreadLog.cpp + QueryViewsLog.cpp RemoveInjectiveFunctionsVisitor.cpp RenameColumnVisitor.cpp ReplaceQueryParameterVisitor.cpp From efe19384379dde23bceee1e67b6df85e32d36710 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Fri, 18 Jun 2021 18:25:19 +0200 Subject: [PATCH 06/48] WIP: Slow improvements --- src/Common/Exception.cpp | 40 +++++++++ src/Common/Exception.h | 2 + src/Core/Settings.h | 4 +- .../PushingToViewsBlockOutputStream.cpp | 90 ++++--------------- src/Interpreters/InterpreterSystemQuery.cpp | 2 +- src/Interpreters/QueryViewsLog.h | 17 ++-- src/Interpreters/ThreadStatusExt.cpp | 13 +-- 7 files changed, 82 insertions(+), 86 deletions(-) diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index 641f8bbe0f0..104942a7b75 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -94,6 +94,22 @@ std::string getExceptionStackTraceString(const std::exception & e) #endif } +std::string getExceptionStackTraceString(std::exception_ptr e) +{ + try + { + std::rethrow_exception(e); + } + catch (const std::exception & exception) + { + return getExceptionStackTraceString(exception); + } + catch (...) + { + return {}; + } +} + std::string Exception::getStackTraceString() const { @@ -380,6 +396,30 @@ int getCurrentExceptionCode() } } +int getExceptionErrorCode(std::exception_ptr e) +{ + try + { + std::rethrow_exception(e); + } + catch (const Exception & e) + { + return e.code(); + } + catch (const Poco::Exception &) + { + return ErrorCodes::POCO_EXCEPTION; + } + catch (const std::exception &) + { + return ErrorCodes::STD_EXCEPTION; + } + catch (...) + { + return ErrorCodes::UNKNOWN_EXCEPTION; + } +} + void rethrowFirstException(const Exceptions & exceptions) { diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 79b4394948a..d04b0f71b9e 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -82,6 +82,7 @@ private: std::string getExceptionStackTraceString(const std::exception & e); +std::string getExceptionStackTraceString(std::exception_ptr e); /// Contains an additional member `saved_errno`. See the throwFromErrno function. @@ -167,6 +168,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded /// Returns error code from ErrorCodes int getCurrentExceptionCode(); +int getExceptionErrorCode(std::exception_ptr e); /// An execution status of any piece of code, contains return code and optional error diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 54e1f00832c..7019a4091c8 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -351,8 +351,8 @@ class IColumn; \ M(Bool, log_profile_events, true, "Log query performance statistics into the query_log and query_thread_log.", 0) \ M(Bool, log_query_settings, true, "Log query settings into the query_log.", 0) \ - M(Bool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.", 0) \ - M(Bool, log_query_views, false, "Log query dependent views into system.query_views_log table. This setting have effect only when 'log_queries' is true.", 0) \ + M(Bool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.", 0) \ + M(Bool, log_query_views, true, "Log query dependent views into system.query_views_log table. This setting have effect only when 'log_queries' is true.", 0) \ M(String, log_comment, "", "Log comment into system.query_log table and server log. It can be set to arbitrary string no longer than max_query_size.", 0) \ M(LogsLevel, send_logs_level, LogsLevel::fatal, "Send server text logs with specified minimum level to client. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \ M(Bool, enable_optimize_predicate_expression, 1, "If it is set to true, optimize predicates to subqueries.", 0) \ diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 2501d4dd42e..63caa3a0d39 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -117,12 +117,6 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( InterpreterInsertQuery interpreter(insert_query_ptr, insert_context); BlockIO io = interpreter.execute(); out = io.out; - LOG_WARNING( - log, - "Pushing from {} to {} {}.", - storage->getStorageID().getNameForLogs(), - inner_table_id.getNameForLogs(), - inner_table->getStorageID().getFullTableName()); } else if (auto * live_view = dynamic_cast(dependent_table.get())) { @@ -136,7 +130,13 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr()); QueryViewsLogElement::ViewRuntimeStats runtime_stats{ - 0, type, std::make_shared(), select_context->getInitialQueryId(), std::chrono::system_clock::now(), target_name}; + target_name, + type, + select_context->getInitialQueryId(), + std::make_shared(), + 0, + std::chrono::system_clock::now(), + QueryViewsLogElement::Status::QUERY_START}; views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)}); } @@ -200,17 +200,6 @@ void PushingToViewsBlockOutputStream::write(const Block & block) { auto thread_group = CurrentThread::getGroup(); pool.scheduleOrThrowOnError([=, &view, this] { - LOG_WARNING( - log, - "Pushing from {} to {}. {}. Current thread {} {} {} {}. WRITE START", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - uint64_t(view.out.get()), - view.runtime_stats.thread_status->progress_in.read_rows, - view.runtime_stats.thread_status->progress_in.written_rows, - view.runtime_stats.thread_status->progress_out.read_rows, - view.runtime_stats.thread_status->progress_out.written_rows); - // current_thread = view.thread_status.get(); setThreadName("PushingToViews"); if (thread_group) @@ -224,20 +213,12 @@ void PushingToViewsBlockOutputStream::write(const Block & block) catch (...) { view.exception = std::current_exception(); - // TODO: Stop processing on exception } + /* process might have set view.exception without throwing */ + if (view.exception) + view.runtime_stats.setStatus(QueryViewsLogElement::Status::EXCEPTION_WHILE_PROCESSING); view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); // TODO: Update other counters - LOG_WARNING( - log, - "Pushing from {} to {}. {}. Current thread {} {} {} {}. WRITE END", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - uint64_t(view.out.get()), - view.runtime_stats.thread_status->progress_in.read_rows, - view.runtime_stats.thread_status->progress_in.written_rows, - view.runtime_stats.thread_status->progress_out.read_rows, - view.runtime_stats.thread_status->progress_out.written_rows); }); } // Wait for concurrent view processing @@ -259,8 +240,9 @@ void PushingToViewsBlockOutputStream::writePrefix() } catch (Exception & ex) { - view.exception = std::current_exception(); ex.addMessage("while write prefix to view " + view.table_id.getNameForLogs()); + view.exception = std::current_exception(); + view.runtime_stats.setStatus(QueryViewsLogElement::Status::EXCEPTION_WHILE_PROCESSING); log_query_views(); throw; } @@ -270,8 +252,6 @@ void PushingToViewsBlockOutputStream::writePrefix() void PushingToViewsBlockOutputStream::writeSuffix() { - LOG_WARNING(log, "STARTING {} WITH {}", uint64_t(this), views.size()); - if (output) output->writeSuffix(); @@ -293,17 +273,6 @@ void PushingToViewsBlockOutputStream::writeSuffix() continue; pool.scheduleOrThrowOnError([&] { - LOG_WARNING( - log, - "Pushing from {} to {}. {}. Current thread {} {} {} {}. WRITESUFFIX START", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - uint64_t(view.out.get()), - view.runtime_stats.thread_status->progress_in.read_rows, - view.runtime_stats.thread_status->progress_in.written_rows, - view.runtime_stats.thread_status->progress_out.read_rows, - view.runtime_stats.thread_status->progress_out.written_rows); - // current_thread = view.thread_status.get(); setThreadName("PushingToViews"); if (thread_group) CurrentThread::attachToIfDetached(thread_group); @@ -311,10 +280,8 @@ void PushingToViewsBlockOutputStream::writeSuffix() Stopwatch watch; try { - LOG_WARNING(log, "BEFORE CALL {} -> {}", uint64_t(this), uint64_t(view.out.get())); view.out->writeSuffix(); - LOG_WARNING(log, "AFTER CALL {} -> {}", uint64_t(this), uint64_t(view.out.get())); - //Set status here + view.runtime_stats.setStatus(QueryViewsLogElement::Status::QUERY_FINISH); } catch (...) { @@ -328,16 +295,6 @@ void PushingToViewsBlockOutputStream::writeSuffix() storage->getStorageID().getNameForLogs(), view.table_id.getNameForLogs(), view.runtime_stats.elapsed_ms); - LOG_WARNING( - log, - "Pushing from {} to {}. {}. Current thread {} {} {} {}. WRITESUFFIX END", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - uint64_t(view.out.get()), - view.runtime_stats.thread_status->progress_in.read_rows, - view.runtime_stats.thread_status->progress_in.written_rows, - view.runtime_stats.thread_status->progress_out.read_rows, - view.runtime_stats.thread_status->progress_out.written_rows); }); } // Wait for concurrent view processing @@ -351,7 +308,6 @@ void PushingToViewsBlockOutputStream::writeSuffix() storage->getStorageID().getNameForLogs(), views.size(), milliseconds); } - LOG_WARNING(log, "FINISHING {}", uint64_t(this)); log_query_views(); } @@ -432,7 +388,6 @@ void PushingToViewsBlockOutputStream::check_exceptions_in_views() { if (view.exception) { - LOG_WARNING(log, "View exception {}", view.table_id.getNameForLogs()); log_query_views(); std::rethrow_exception(view.exception); } @@ -444,28 +399,19 @@ void PushingToViewsBlockOutputStream::log_query_views() // TODO: Check settings auto views_log = getContext()->getQueryViewsLog(); if (!views_log) - { - LOG_WARNING(log, "NO VIEWS LOG"); // NOCHECKIN return; - } - for (auto const & view : views) + + for (auto & view : views) { - LOG_WARNING( - log, - "LOG LOG LOG from {} to {}. {}. Progress {} {} {} {}", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - uint64_t(view.out.get()), - view.runtime_stats.thread_status->progress_in.read_rows, - view.runtime_stats.thread_status->progress_in.written_rows, - view.runtime_stats.thread_status->progress_out.read_rows, - view.runtime_stats.thread_status->progress_out.written_rows); + if (view.runtime_stats.event_status == QueryViewsLogElement::Status::QUERY_START) + view.runtime_stats.setStatus(QueryViewsLogElement::Status::EXCEPTION_WHILE_PROCESSING); try { view.runtime_stats.thread_status->logToQueryViewsLog(*views_log, view); } catch (...) { + LOG_WARNING(log, getCurrentExceptionMessage(true)); } } } diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index eb150356fee..e7cc8e3d386 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -417,7 +417,7 @@ BlockIO InterpreterSystemQuery::execute() [&] { if (auto text_log = getContext()->getTextLog()) text_log->flush(true); }, [&] { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); }, [&] { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); }, - [&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); } + [&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); }, [&] () { if (auto query_views_log = getContext()->getQueryViewsLog()) query_views_log->flush(true); } ); break; diff --git a/src/Interpreters/QueryViewsLog.h b/src/Interpreters/QueryViewsLog.h index 661d24abfcb..acf0657129d 100644 --- a/src/Interpreters/QueryViewsLog.h +++ b/src/Interpreters/QueryViewsLog.h @@ -33,12 +33,19 @@ struct QueryViewsLogElement struct ViewRuntimeStats { - UInt64 elapsed_ms = 0; - ViewType type = ViewType::DEFAULT; - std::shared_ptr thread_status = std::make_shared(); - String initial_query_id; - std::chrono::time_point start; String target_name; + ViewType type = ViewType::DEFAULT; + String initial_query_id; + std::shared_ptr thread_status = std::make_shared(); + UInt64 elapsed_ms = 0; + std::chrono::time_point event_time; + Status event_status = Status::QUERY_START; + + void setStatus(Status s) + { + event_status = s; + event_time = std::chrono::system_clock::now(); + } }; time_t event_time{}; diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index e8a2c529dcb..b31ca398e76 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -462,8 +462,8 @@ void ThreadStatus::logToQueryViewsLog(QueryViewsLog & views_log, const ViewInfo { QueryViewsLogElement element; - element.event_time = time_in_seconds(vinfo.runtime_stats.start); - element.event_time_microseconds = time_in_microseconds(vinfo.runtime_stats.start); + element.event_time = time_in_seconds(vinfo.runtime_stats.event_time); + element.event_time_microseconds = time_in_microseconds(vinfo.runtime_stats.event_time); element.view_duration_ms = vinfo.runtime_stats.elapsed_ms; element.initial_query_id = vinfo.runtime_stats.initial_query_id; @@ -485,13 +485,14 @@ void ThreadStatus::logToQueryViewsLog(QueryViewsLog & views_log, const ViewInfo element.profile_counters = std::make_shared(performance_counters.getPartiallyAtomicSnapshot()); } - element.end_status = EXCEPTION_BEFORE_START; + element.end_status = vinfo.runtime_stats.event_status; element.exception_code = 0; if (vinfo.exception) { - element.exception_code = 0; - element.exception = "TODO"; - element.stack_trace = "TODO"; + element.exception_code = getExceptionErrorCode(vinfo.exception); + element.exception = getExceptionMessage(vinfo.exception, false); + // if (current_settings.calculate_text_stack_trace) // TODO + element.stack_trace = getExceptionStackTraceString(vinfo.exception); } views_log.add(element); From 6b9ec2a62e43e76b41bede499e37eee99a9db640 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Mon, 21 Jun 2021 17:58:15 +0200 Subject: [PATCH 07/48] WIP --- programs/server/config.xml | 2 +- src/Common/ThreadStatus.h | 2 +- src/Core/Settings.h | 4 +- .../PushingToViewsBlockOutputStream.cpp | 63 ++++++++++++------- .../PushingToViewsBlockOutputStream.h | 2 +- src/Interpreters/QueryViewsLog.cpp | 2 - src/Interpreters/QueryViewsLog.h | 2 - src/Interpreters/ThreadStatusExt.cpp | 57 ++++++++++++----- 8 files changed, 89 insertions(+), 45 deletions(-) diff --git a/programs/server/config.xml b/programs/server/config.xml index 3f7f6c25e24..298fc176cfa 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -583,7 +583,7 @@ 9019 --> - + diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 86aa8cc52a1..2d39ed56e47 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -215,7 +215,7 @@ public: /// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false); - void logToQueryViewsLog(QueryViewsLog & views_log, const ViewInfo & vinfo); + void logToQueryViewsLog(const ViewInfo & vinfo); protected: void applyQuerySettings(); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 7019a4091c8..d866bbb5350 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -170,7 +170,7 @@ class IColumn; \ M(Bool, log_queries, 1, "Log requests and write the log to the system table.", 0) \ M(LogQueriesType, log_queries_min_type, QueryLogElementType::QUERY_START, "Minimal type in query_log to log, possible values (from low to high): QUERY_START, QUERY_FINISH, EXCEPTION_BEFORE_START, EXCEPTION_WHILE_PROCESSING.", 0) \ - M(Milliseconds, log_queries_min_query_duration_ms, 0, "Minimal time for the query to run, to get to the query_log/query_thread_log.", 0) \ + M(Milliseconds, log_queries_min_query_duration_ms, 0, "Minimal time for the query to run, to get to the query_log/query_thread_log/query_views_log.", 0) \ M(UInt64, log_queries_cut_to_length, 100000, "If query length is greater than specified threshold (in bytes), then cut query when writing to query log. Also limit length of printed query in ordinary text log.", 0) \ \ M(DistributedProductMode, distributed_product_mode, DistributedProductMode::DENY, "How are distributed subqueries performed inside IN or JOIN sections?", IMPORTANT) \ @@ -349,7 +349,7 @@ class IColumn; M(UInt64, max_network_bandwidth_for_user, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means unlimited.", 0)\ M(UInt64, max_network_bandwidth_for_all_users, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means unlimited.", 0) \ \ - M(Bool, log_profile_events, true, "Log query performance statistics into the query_log and query_thread_log.", 0) \ + M(Bool, log_profile_events, true, "Log query performance statistics into the query_log, query_thread_log and query_views_log.", 0) \ M(Bool, log_query_settings, true, "Log query settings into the query_log.", 0) \ M(Bool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.", 0) \ M(Bool, log_query_views, true, "Log query dependent views into system.query_views_log table. This setting have effect only when 'log_queries' is true.", 0) \ diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 63caa3a0d39..a6015b33225 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -12,14 +12,28 @@ #include #include #include -#include #include +#include +#include +#include +#include +#include +#include #include #include #include #include +#include + +namespace ProfileEvents +{ +extern const Event SlowRead; +extern const Event MergedRows; +extern const Event ZooKeeperTransactions; +} + namespace DB { @@ -72,6 +86,8 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( insert_context->setSetting("min_insert_block_size_bytes", insert_settings.min_insert_block_size_bytes_for_materialized_views.value); } + auto thread_group = CurrentThread::getGroup(); + for (const auto & database_table : dependencies) { auto dependent_table = DatabaseCatalog::instance().getTable(database_table, getContext()); @@ -118,7 +134,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( BlockIO io = interpreter.execute(); out = io.out; } - else if (auto * live_view = dynamic_cast(dependent_table.get())) + else if (const auto * live_view = dynamic_cast(dependent_table.get())) { type = QueryViewsLogElement::ViewType::LIVE; query = live_view->getInnerQuery(); // TODO: Optimize this @@ -129,14 +145,13 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( out = std::make_shared( dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr()); + auto main_thread = current_thread; + auto thread_status = std::make_shared(); + current_thread = main_thread; + thread_status->attachQueryContext(getContext()); + QueryViewsLogElement::ViewRuntimeStats runtime_stats{ - target_name, - type, - select_context->getInitialQueryId(), - std::make_shared(), - 0, - std::chrono::system_clock::now(), - QueryViewsLogElement::Status::QUERY_START}; + target_name, type, thread_status, 0, std::chrono::system_clock::now(), QueryViewsLogElement::Status::QUERY_START}; views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)}); } @@ -150,6 +165,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( replicated_output = dynamic_cast(sink.get()); output = std::make_shared(std::move(sink)); } + ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions, 100); } @@ -185,7 +201,7 @@ void PushingToViewsBlockOutputStream::write(const Block & block) output->write(block); } - if (!views.size()) + if (views.empty()) return; /// Don't process materialized views if this block is duplicate @@ -198,12 +214,12 @@ void PushingToViewsBlockOutputStream::write(const Block & block) ThreadPool pool(std::min(max_threads, views.size())); for (auto & view : views) { - auto thread_group = CurrentThread::getGroup(); pool.scheduleOrThrowOnError([=, &view, this] { setThreadName("PushingToViews"); - if (thread_group) - CurrentThread::attachToIfDetached(thread_group); + current_thread = view.runtime_stats.thread_status.get(); + ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions, 1); + LOG_WARNING(log, "WRITE THREAD"); Stopwatch watch; try @@ -218,7 +234,6 @@ void PushingToViewsBlockOutputStream::write(const Block & block) if (view.exception) view.runtime_stats.setStatus(QueryViewsLogElement::Status::EXCEPTION_WHILE_PROCESSING); view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); - // TODO: Update other counters }); } // Wait for concurrent view processing @@ -255,7 +270,7 @@ void PushingToViewsBlockOutputStream::writeSuffix() if (output) output->writeSuffix(); - if (!views.size()) + if (views.empty()) return; std::exception_ptr first_exception; const Settings & settings = getContext()->getSettingsRef(); @@ -274,8 +289,9 @@ void PushingToViewsBlockOutputStream::writeSuffix() pool.scheduleOrThrowOnError([&] { setThreadName("PushingToViews"); - if (thread_group) - CurrentThread::attachToIfDetached(thread_group); + current_thread = view.runtime_stats.thread_status.get(); + ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions, 1); + LOG_WARNING(log, "WRITE SUFFIX THREAD"); Stopwatch watch; try @@ -288,7 +304,6 @@ void PushingToViewsBlockOutputStream::writeSuffix() view.exception = std::current_exception(); } view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); - // TODO: Update other counters LOG_TRACE( log, "Pushing from {} to {} took {} ms.", @@ -396,18 +411,22 @@ void PushingToViewsBlockOutputStream::check_exceptions_in_views() void PushingToViewsBlockOutputStream::log_query_views() { - // TODO: Check settings - auto views_log = getContext()->getQueryViewsLog(); - if (!views_log) + const auto & settings = getContext()->getSettingsRef(); + const UInt64 min_query_duration = settings.log_queries_min_query_duration_ms.totalMilliseconds(); + if (views.empty() || !settings.log_queries || !settings.log_query_views) return; for (auto & view : views) { if (view.runtime_stats.event_status == QueryViewsLogElement::Status::QUERY_START) view.runtime_stats.setStatus(QueryViewsLogElement::Status::EXCEPTION_WHILE_PROCESSING); + + if (min_query_duration && view.runtime_stats.elapsed_ms <= min_query_duration) + continue; + try { - view.runtime_stats.thread_status->logToQueryViewsLog(*views_log, view); + view.runtime_stats.thread_status->logToQueryViewsLog(view); } catch (...) { diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.h b/src/DataStreams/PushingToViewsBlockOutputStream.h index 70d5f0982b0..062c026ff1c 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -18,7 +18,7 @@ class ReplicatedMergeTreeSink; struct ViewInfo { - ASTPtr query; + const ASTPtr query; StorageID table_id; BlockOutputStreamPtr out; std::exception_ptr exception; diff --git a/src/Interpreters/QueryViewsLog.cpp b/src/Interpreters/QueryViewsLog.cpp index 09d15cc1769..878ca2984e2 100644 --- a/src/Interpreters/QueryViewsLog.cpp +++ b/src/Interpreters/QueryViewsLog.cpp @@ -47,7 +47,6 @@ Block QueryViewsLogElement::createBlock() {std::make_shared(), "read_bytes"}, {std::make_shared(), "written_rows"}, {std::make_shared(), "written_bytes"}, - {std::make_shared(), "memory_usage"}, {std::make_shared(), "peak_memory_usage"}, {std::make_shared(std::make_shared()), "ProfileEvents.Names"}, {std::make_shared(std::make_shared()), "ProfileEvents.Values"}, @@ -78,7 +77,6 @@ void QueryViewsLogElement::appendToBlock(MutableColumns & columns) const columns[i++]->insert(read_bytes); columns[i++]->insert(written_rows); columns[i++]->insert(written_bytes); - columns[i++]->insert(memory_usage); columns[i++]->insert(peak_memory_usage); if (profile_counters) diff --git a/src/Interpreters/QueryViewsLog.h b/src/Interpreters/QueryViewsLog.h index acf0657129d..fe8a591358e 100644 --- a/src/Interpreters/QueryViewsLog.h +++ b/src/Interpreters/QueryViewsLog.h @@ -35,7 +35,6 @@ struct QueryViewsLogElement { String target_name; ViewType type = ViewType::DEFAULT; - String initial_query_id; std::shared_ptr thread_status = std::make_shared(); UInt64 elapsed_ms = 0; std::chrono::time_point event_time; @@ -63,7 +62,6 @@ struct QueryViewsLogElement UInt64 read_bytes{}; UInt64 written_rows{}; UInt64 written_bytes{}; - Int64 memory_usage{}; Int64 peak_memory_usage{}; std::shared_ptr profile_counters; diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index b31ca398e76..25aac7864b9 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -9,7 +9,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -21,6 +23,14 @@ # include #endif +namespace ProfileEvents +{ +extern const Event SelectedRows; +extern const Event SelectedBytes; +extern const Event InsertedRows; +extern const Event InsertedBytes; +} + /// Implement some methods of ThreadStatus and CurrentThread here to avoid extra linking dependencies in clickhouse_common_io /// TODO It doesn't make sense. @@ -458,31 +468,50 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String thread_log.add(elem); } -void ThreadStatus::logToQueryViewsLog(QueryViewsLog & views_log, const ViewInfo & vinfo) +static String getCleanQueryAst(const ASTPtr q, ContextPtr context) { + String res = serializeAST(*q, true); + if (auto * masker = SensitiveDataMasker::getInstance()) + masker->wipeSensitiveData(res); + + res = res.substr(0, context->getSettingsRef().log_queries_cut_to_length); + + return res; +} + +void ThreadStatus::logToQueryViewsLog(const ViewInfo & vinfo) +{ + updatePerformanceCounters(); + auto query_context_ptr = query_context.lock(); + if (!query_context_ptr) + return; + auto views_log = query_context_ptr->getQueryViewsLog(); + if (!views_log) + return; + QueryViewsLogElement element; element.event_time = time_in_seconds(vinfo.runtime_stats.event_time); element.event_time_microseconds = time_in_microseconds(vinfo.runtime_stats.event_time); element.view_duration_ms = vinfo.runtime_stats.elapsed_ms; - element.initial_query_id = vinfo.runtime_stats.initial_query_id; + element.initial_query_id = query_id; // query_context_ptr->getInitialQueryId(); element.view_name = vinfo.table_id.getNameForLogs(); element.view_uuid = vinfo.table_id.uuid; element.view_type = vinfo.runtime_stats.type; if (vinfo.query) - element.view_query = serializeAST(*vinfo.query, true); // TODO: Anonymize like query_log ? + element.view_query = getCleanQueryAst(vinfo.query, query_context_ptr); element.view_target = vinfo.runtime_stats.target_name; - element.read_rows = progress_in.read_rows.load(std::memory_order_relaxed); - element.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed); - element.written_rows = progress_out.written_rows.load(std::memory_order_relaxed); - element.written_bytes = progress_out.written_bytes.load(std::memory_order_relaxed); - element.memory_usage = memory_tracker.get(); - element.peak_memory_usage = memory_tracker.getPeak(); - // if (query_context_ptr->getSettingsRef().log_profile_events != 0) // TODO + auto events = std::make_shared(performance_counters.getPartiallyAtomicSnapshot()); + element.read_rows = (*events)[ProfileEvents::SelectedRows]; + element.read_bytes = (*events)[ProfileEvents::SelectedBytes]; + element.written_rows = (*events)[ProfileEvents::InsertedRows]; + element.written_bytes = (*events)[ProfileEvents::InsertedBytes]; + element.peak_memory_usage = memory_tracker.getPeak() > 0 ? memory_tracker.getPeak() : 0; + if (query_context_ptr->getSettingsRef().log_profile_events != 0) { - element.profile_counters = std::make_shared(performance_counters.getPartiallyAtomicSnapshot()); + element.profile_counters = events; } element.end_status = vinfo.runtime_stats.event_status; @@ -491,11 +520,11 @@ void ThreadStatus::logToQueryViewsLog(QueryViewsLog & views_log, const ViewInfo { element.exception_code = getExceptionErrorCode(vinfo.exception); element.exception = getExceptionMessage(vinfo.exception, false); - // if (current_settings.calculate_text_stack_trace) // TODO - element.stack_trace = getExceptionStackTraceString(vinfo.exception); + if (query_context_ptr->getSettingsRef().calculate_text_stack_trace) + element.stack_trace = getExceptionStackTraceString(vinfo.exception); } - views_log.add(element); + views_log->add(element); } void CurrentThread::initializeQuery() From e0ee7839f7ac8cb9cbce32eaaba194114a851dd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Tue, 22 Jun 2021 13:39:53 +0200 Subject: [PATCH 08/48] Only use threads when necessary and log written status --- .../PushingToViewsBlockOutputStream.cpp | 233 +++++++++++------- .../PushingToViewsBlockOutputStream.h | 2 + src/Interpreters/QueryViewsLog.cpp | 14 +- src/Interpreters/QueryViewsLog.h | 22 +- src/Interpreters/ThreadStatusExt.cpp | 2 +- 5 files changed, 166 insertions(+), 107 deletions(-) diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index a6015b33225..fd3fd1d6a13 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -1,7 +1,7 @@ #include #include #include -#include +#include #include #include #include @@ -18,22 +18,13 @@ #include #include #include +#include #include #include -#include #include #include -#include - -namespace ProfileEvents -{ -extern const Event SlowRead; -extern const Event MergedRows; -extern const Event ZooKeeperTransactions; -} - namespace DB { @@ -137,7 +128,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( else if (const auto * live_view = dynamic_cast(dependent_table.get())) { type = QueryViewsLogElement::ViewType::LIVE; - query = live_view->getInnerQuery(); // TODO: Optimize this + query = live_view->getInnerQuery(); // Used only to log in system.query_views_log out = std::make_shared( dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true); } @@ -145,13 +136,13 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( out = std::make_shared( dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr()); - auto main_thread = current_thread; + auto * main_thread = current_thread; auto thread_status = std::make_shared(); current_thread = main_thread; thread_status->attachQueryContext(getContext()); QueryViewsLogElement::ViewRuntimeStats runtime_stats{ - target_name, type, thread_status, 0, std::chrono::system_clock::now(), QueryViewsLogElement::Status::QUERY_START}; + target_name, type, thread_status, 0, std::chrono::system_clock::now(), QueryViewsLogElement::ViewStatus::INIT}; views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)}); } @@ -165,7 +156,6 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( replicated_output = dynamic_cast(sink.get()); output = std::make_shared(std::move(sink)); } - ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions, 100); } @@ -208,37 +198,43 @@ void PushingToViewsBlockOutputStream::write(const Block & block) if (!getContext()->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views && replicated_output && replicated_output->lastBlockIsDuplicate()) return; - // Push to each view. Only parallel if available const Settings & settings = getContext()->getSettingsRef(); const size_t max_threads = settings.parallel_view_processing ? settings.max_threads : 1; - ThreadPool pool(std::min(max_threads, views.size())); - for (auto & view : views) + bool exception_happened = false; + if (max_threads > 1) { - pool.scheduleOrThrowOnError([=, &view, this] { + ThreadPool pool(std::min(max_threads, views.size())); + auto thread_group = CurrentThread::getGroup(); + std::atomic_uint8_t exception_count = 0; + for (auto & view : views) + { + pool.scheduleOrThrowOnError([&] { + setThreadName("PushingToViews"); + if (exception_count.load(std::memory_order_relaxed)) + return; - setThreadName("PushingToViews"); - current_thread = view.runtime_stats.thread_status.get(); - ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions, 1); - LOG_WARNING(log, "WRITE THREAD"); - - Stopwatch watch; - try - { process(block, view); - } - catch (...) - { - view.exception = std::current_exception(); - } - /* process might have set view.exception without throwing */ - if (view.exception) - view.runtime_stats.setStatus(QueryViewsLogElement::Status::EXCEPTION_WHILE_PROCESSING); - view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); - }); + if (view.exception) + exception_count.fetch_add(1, std::memory_order_relaxed); + }); + } + pool.wait(); + exception_happened = exception_count.load(std::memory_order_relaxed) != 0; } - // Wait for concurrent view processing - pool.wait(); - check_exceptions_in_views(); + else + { + for (auto & view : views) + { + process(block, view); + if (view.exception) + { + exception_happened = true; + break; + } + } + } + if (exception_happened) + check_exceptions_in_views(); } void PushingToViewsBlockOutputStream::writePrefix() @@ -248,20 +244,12 @@ void PushingToViewsBlockOutputStream::writePrefix() for (auto & view : views) { - Stopwatch watch; - try + process_prefix(view); + if (view.exception) { - view.out->writePrefix(); - } - catch (Exception & ex) - { - ex.addMessage("while write prefix to view " + view.table_id.getNameForLogs()); - view.exception = std::current_exception(); - view.runtime_stats.setStatus(QueryViewsLogElement::Status::EXCEPTION_WHILE_PROCESSING); log_query_views(); throw; } - view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); } } @@ -272,62 +260,60 @@ void PushingToViewsBlockOutputStream::writeSuffix() if (views.empty()) return; - std::exception_ptr first_exception; - const Settings & settings = getContext()->getSettingsRef(); /// Run writeSuffix() for views in separate thread pool. /// In could have been done in PushingToViewsBlockOutputStream::process, however /// it is not good if insert into main table fail but into view succeed. + const Settings & settings = getContext()->getSettingsRef(); const size_t max_threads = settings.parallel_view_processing ? settings.max_threads : 1; - ThreadPool pool(std::min(max_threads, views.size())); - auto thread_group = CurrentThread::getGroup(); - - for (auto & view : views) + bool exception_happened = false; + if (max_threads > 1) { - if (view.exception) - continue; + ThreadPool pool(std::min(max_threads, views.size())); + auto thread_group = CurrentThread::getGroup(); + std::atomic_uint8_t exception_count = 0; + for (auto & view : views) + { + pool.scheduleOrThrowOnError([&] { + setThreadName("PushingToViews"); + if (exception_count.load(std::memory_order_relaxed)) + return; - pool.scheduleOrThrowOnError([&] { - setThreadName("PushingToViews"); - current_thread = view.runtime_stats.thread_status.get(); - ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions, 1); - LOG_WARNING(log, "WRITE SUFFIX THREAD"); - - Stopwatch watch; - try - { - view.out->writeSuffix(); - view.runtime_stats.setStatus(QueryViewsLogElement::Status::QUERY_FINISH); - } - catch (...) - { - view.exception = std::current_exception(); - } - view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); - LOG_TRACE( - log, - "Pushing from {} to {} took {} ms.", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - view.runtime_stats.elapsed_ms); - }); + process_suffix(view); + if (view.exception) + exception_count.fetch_add(1, std::memory_order_relaxed); + }); + } + pool.wait(); + exception_happened = exception_count.load(std::memory_order_relaxed) != 0; } - // Wait for concurrent view processing - pool.wait(); - check_exceptions_in_views(); + else + { + for (auto & view : views) + { + process_suffix(view); + if (view.exception) + { + exception_happened = true; + break; + } + } + } + if (exception_happened) + check_exceptions_in_views(); - UInt64 milliseconds = main_watch.elapsedMilliseconds(); if (views.size() > 1) { - LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.", - storage->getStorageID().getNameForLogs(), views.size(), - milliseconds); + UInt64 milliseconds = main_watch.elapsedMilliseconds(); + LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.", storage->getStorageID().getNameForLogs(), views.size(), milliseconds); } log_query_views(); } void PushingToViewsBlockOutputStream::flush() { + LOG_DEBUG(log, "{} FLUSH CALLED", storage->getStorageID().getNameForLogs()); + if (output) output->flush(); @@ -337,6 +323,11 @@ void PushingToViewsBlockOutputStream::flush() void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & view) { + Stopwatch watch; + auto * source_thread = current_thread; // Change thread context to store individual metrics per view + current_thread = view.runtime_stats.thread_status.get(); + SCOPE_EXIT({ current_thread = source_thread; }); + try { BlockInputStreamPtr in; @@ -385,6 +376,7 @@ void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & vi } in->readSuffix(); + view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::WRITTEN_BLOCK); } catch (Exception & ex) { @@ -395,6 +387,66 @@ void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & vi { view.exception = std::current_exception(); } + + view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); +} + +void PushingToViewsBlockOutputStream::process_prefix(ViewInfo & view) +{ + Stopwatch watch; + auto * source_thread = current_thread; // Change thread context to store individual metrics per view + current_thread = view.runtime_stats.thread_status.get(); + SCOPE_EXIT({ current_thread = source_thread; }); + + try + { + view.out->writePrefix(); + view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::WRITTEN_PREFIX); + } + catch (Exception & ex) + { + ex.addMessage("while writing prefix to view " + view.table_id.getNameForLogs()); + view.exception = std::current_exception(); + } + catch (...) + { + view.exception = std::current_exception(); + } + view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); +} + + +void PushingToViewsBlockOutputStream::process_suffix(ViewInfo & view) +{ + Stopwatch watch; + auto * source_thread = current_thread; // Change thread context to store individual metrics per view + current_thread = view.runtime_stats.thread_status.get(); + SCOPE_EXIT({ current_thread = source_thread; }); + + try + { + view.out->writeSuffix(); + view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::WRITTEN_SUFFIX); + } + catch (Exception & ex) + { + ex.addMessage("while writing suffix to view " + view.table_id.getNameForLogs()); + view.exception = std::current_exception(); + } + catch (...) + { + view.exception = std::current_exception(); + } + view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); + if (!view.exception) + { + LOG_TRACE( + log, + "Pushing from {} to {} took {} ms.", + storage->getStorageID().getNameForLogs(), + view.table_id.getNameForLogs(), + view.runtime_stats.elapsed_ms); + } } void PushingToViewsBlockOutputStream::check_exceptions_in_views() @@ -418,9 +470,6 @@ void PushingToViewsBlockOutputStream::log_query_views() for (auto & view : views) { - if (view.runtime_stats.event_status == QueryViewsLogElement::Status::QUERY_START) - view.runtime_stats.setStatus(QueryViewsLogElement::Status::EXCEPTION_WHILE_PROCESSING); - if (min_query_duration && view.runtime_stats.elapsed_ms <= min_query_duration) continue; diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.h b/src/DataStreams/PushingToViewsBlockOutputStream.h index 062c026ff1c..c836ee54ded 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -59,6 +59,8 @@ private: ContextMutablePtr insert_context; void process(const Block & block, ViewInfo & view); + void process_prefix(ViewInfo & view); + void process_suffix(ViewInfo & view); void check_exceptions_in_views(); void log_query_views(); }; diff --git a/src/Interpreters/QueryViewsLog.cpp b/src/Interpreters/QueryViewsLog.cpp index 878ca2984e2..f77b5c6caf6 100644 --- a/src/Interpreters/QueryViewsLog.cpp +++ b/src/Interpreters/QueryViewsLog.cpp @@ -19,11 +19,11 @@ namespace DB { Block QueryViewsLogElement::createBlock() { - auto query_status_datatype = std::make_shared(DataTypeEnum8::Values{ - {"QueryStart", static_cast(QUERY_START)}, - {"QueryFinish", static_cast(QUERY_FINISH)}, - {"ExceptionBeforeStart", static_cast(EXCEPTION_BEFORE_START)}, - {"ExceptionWhileProcessing", static_cast(EXCEPTION_WHILE_PROCESSING)}}); + auto view_status_datatype = std::make_shared(DataTypeEnum8::Values{ + {"Init", static_cast(ViewStatus::INIT)}, + {"WrittenPrefix", static_cast(ViewStatus::WRITTEN_PREFIX)}, + {"WrittenBlock", static_cast(ViewStatus::WRITTEN_BLOCK)}, + {"WrittenSuffix", static_cast(ViewStatus::WRITTEN_SUFFIX)}}); auto view_type_datatype = std::make_shared(DataTypeEnum8::Values{ {"Default", static_cast(ViewType::DEFAULT)}, @@ -51,7 +51,7 @@ Block QueryViewsLogElement::createBlock() {std::make_shared(std::make_shared()), "ProfileEvents.Names"}, {std::make_shared(std::make_shared()), "ProfileEvents.Values"}, - {std::move(query_status_datatype), "end_status"}, + {std::move(view_status_datatype), "status"}, {std::make_shared(), "exception_code"}, {std::make_shared(), "exception"}, {std::make_shared(), "stack_trace"}}; @@ -91,7 +91,7 @@ void QueryViewsLogElement::appendToBlock(MutableColumns & columns) const columns[i++]->insertDefault(); } - columns[i++]->insert(end_status); + columns[i++]->insert(status); columns[i++]->insert(exception_code); columns[i++]->insertData(exception.data(), exception.size()); columns[i++]->insertData(stack_trace.data(), stack_trace.size()); diff --git a/src/Interpreters/QueryViewsLog.h b/src/Interpreters/QueryViewsLog.h index fe8a591358e..a2eb3666a17 100644 --- a/src/Interpreters/QueryViewsLog.h +++ b/src/Interpreters/QueryViewsLog.h @@ -23,12 +23,20 @@ class ThreadStatus; struct QueryViewsLogElement { - using Status = QueryLogElementType; + enum class ViewStatus : int8_t + { + INIT = 1, + WRITTEN_PREFIX = 2, + WRITTEN_BLOCK = 3, + WRITTEN_SUFFIX = 4 + }; + + enum class ViewType : int8_t { - DEFAULT, - MATERIALIZED, - LIVE + DEFAULT = 1, + MATERIALIZED = 2, + LIVE = 3 }; struct ViewRuntimeStats @@ -38,9 +46,9 @@ struct QueryViewsLogElement std::shared_ptr thread_status = std::make_shared(); UInt64 elapsed_ms = 0; std::chrono::time_point event_time; - Status event_status = Status::QUERY_START; + ViewStatus event_status = ViewStatus::INIT; - void setStatus(Status s) + void setStatus(ViewStatus s) { event_status = s; event_time = std::chrono::system_clock::now(); @@ -65,7 +73,7 @@ struct QueryViewsLogElement Int64 peak_memory_usage{}; std::shared_ptr profile_counters; - Status end_status{EXCEPTION_BEFORE_START}; + ViewStatus status; Int32 exception_code{}; String exception; String stack_trace; diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 25aac7864b9..62b12d07d9c 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -514,7 +514,7 @@ void ThreadStatus::logToQueryViewsLog(const ViewInfo & vinfo) element.profile_counters = events; } - element.end_status = vinfo.runtime_stats.event_status; + element.status = vinfo.runtime_stats.event_status; element.exception_code = 0; if (vinfo.exception) { From ab05fc12e2d05a56a19c5794b361dd3ab46c3bd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Tue, 22 Jun 2021 18:25:17 +0200 Subject: [PATCH 09/48] Docs and settings --- .../settings.md | 27 ++++++ docs/en/operations/settings/settings.md | 15 +++- docs/en/operations/system-tables/query_log.md | 1 + .../system-tables/query_thread_log.md | 1 + .../system-tables/query_views_log.md | 83 +++++++++++++++++++ programs/server/config.xml | 20 +++-- programs/server/config.yaml.example | 10 ++- 7 files changed, 146 insertions(+), 11 deletions(-) create mode 100644 docs/en/operations/system-tables/query_views_log.md diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index 41962573546..643b71e6d70 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -869,6 +869,33 @@ If the table does not exist, ClickHouse will create it. If the structure of the ``` +## query_views_log {#server_configuration_parameters-query_views_log} + +Setting for logging views dependant of queries received with the [log_query_views=1](../../operations/settings/settings.md#settings-log-query-views) setting. + +Queries are logged in the [system.query_views_log](../../operations/system-tables/query_thread_log.md#system_tables-query_views_log) table, not in a separate file. You can change the name of the table in the `table` parameter (see below). + +Use the following parameters to configure logging: + +- `database` – Name of the database. +- `table` – Name of the system table the queries will be logged in. +- `partition_by` — [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined. +- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` defined. +- `flush_interval_milliseconds` – Interval for flushing data from the buffer in memory to the table. + +If the table does not exist, ClickHouse will create it. If the structure of the query views log changed when the ClickHouse server was updated, the table with the old structure is renamed, and a new table is created automatically. + +**Example** + +``` xml + + system + query_views_log
+ toYYYYMM(event_date) + 7500 +
+``` + ## text_log {#server_configuration_parameters-text_log} Settings for the [text_log](../../operations/system-tables/text_log.md#system_tables-text_log) system table for logging text messages. diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 9a0aa0af159..a471ef3293c 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -832,7 +832,7 @@ log_queries_min_type='EXCEPTION_WHILE_PROCESSING' Setting up query threads logging. -Queries’ threads runned by ClickHouse with this setup are logged according to the rules in the [query_thread_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log) server configuration parameter. +Queries’ threads run by ClickHouse with this setup are logged according to the rules in the [query_thread_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log) server configuration parameter. Example: @@ -840,6 +840,19 @@ Example: log_query_threads=1 ``` +## log_query_views {#settings-log-query-views} + +Setting up query views logging. + +When a query run by ClickHouse with this setup on has associated views (materialized or live views), they are logged in the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) server configuration parameter. + +Example: + +``` text +log_query_views=1 +``` + + ## log_comment {#settings-log-comment} Specifies the value for the `log_comment` field of the [system.query_log](../system-tables/query_log.md) table and comment text for the server log. diff --git a/docs/en/operations/system-tables/query_log.md b/docs/en/operations/system-tables/query_log.md index d58e549616f..7c76d2f0a4f 100644 --- a/docs/en/operations/system-tables/query_log.md +++ b/docs/en/operations/system-tables/query_log.md @@ -155,5 +155,6 @@ Settings: {'background_pool_size':'32','load_balancing':'random','al **See Also** - [system.query_thread_log](../../operations/system-tables/query_thread_log.md#system_tables-query_thread_log) — This table contains information about each query execution thread. +- [system.query_views_log](../../operations/system-tables/query_views_log.md#system_tables-query_views_log) — This table contains information about each view executed during a query. [Original article](https://clickhouse.tech/docs/en/operations/system-tables/query_log) diff --git a/docs/en/operations/system-tables/query_thread_log.md b/docs/en/operations/system-tables/query_thread_log.md index 7ecea2971b4..152a10504bb 100644 --- a/docs/en/operations/system-tables/query_thread_log.md +++ b/docs/en/operations/system-tables/query_thread_log.md @@ -112,5 +112,6 @@ ProfileEvents: {'Query':1,'SelectQuery':1,'ReadCompressedBytes':36,'Compr **See Also** - [system.query_log](../../operations/system-tables/query_log.md#system_tables-query_log) — Description of the `query_log` system table which contains common information about queries execution. +- [system.query_views_log](../../operations/system-tables/query_views_log.md#system_tables-query_views_log) — This table contains information about each view executed during a query. [Original article](https://clickhouse.tech/docs/en/operations/system-tables/query_thread_log) diff --git a/docs/en/operations/system-tables/query_views_log.md b/docs/en/operations/system-tables/query_views_log.md new file mode 100644 index 00000000000..38d101c1636 --- /dev/null +++ b/docs/en/operations/system-tables/query_views_log.md @@ -0,0 +1,83 @@ +# system.query_views_log {#system_tables-query_views_log} + +Contains information about the dependent views executed when running a query, for example, the view type or the execution time. + +To start logging: + +1. Configure parameters in the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) section. +2. Set [log_query_views](../../operations/settings/settings.md#settings-log-query-views) to 1. + +The flushing period of data is set in `flush_interval_milliseconds` parameter of the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) server settings section. To force flushing, use the [SYSTEM FLUSH LOGS](../../sql-reference/statements/system.md#query_language-system-flush_logs) query. + +ClickHouse does not delete data from the table automatically. See [Introduction](../../operations/system-tables/index.md#system-tables-introduction) for more details. + +Columns: + +- `event_date` ([Date](../../sql-reference/data-types/date.md)) — The date when the last event of the view happened. +- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — The date and time when the view finished execution. +- `event_time_microsecinds` ([DateTime](../../sql-reference/data-types/datetime.md)) — The date and time when the view finished execution with microseconds precision. +- `view_duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Duration of view execution (sum of its stages). +- `initial_query_id` ([String](../../sql-reference/data-types/string.md)) — ID of the initial query (for distributed query execution). +- `view_name` ([String](../../sql-reference/data-types/string.md)) — Name of the view. +- `view_uuid` ([UUID](../../sql-reference/data-types/uuid.md)) — UUID of the view. +- `view_type` ([Enum8](../../sql-reference/data-types/enum.md)) — Type of the view. Values: + - `'Default' = 1` — [Default views](../../sql-reference/statements/create/view.md#normal). Should not appear in this log. + - `'Materialized' = 2` — [Materialized views](../../sql-reference/statements/create/view.md#materialized). + - `'Live' = 3` — [Live views](../../sql-reference/statements/create/view.md#live-view). +- `view_query` ([String](../../sql-reference/data-types/string.md)) — The query executed by the view. +- `view_target` ([String](../../sql-reference/data-types/string.md)) — The name of the view target table. +- `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of read rows. +- `read_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of read bytes. +- `written_rows` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of written rows. +- `written_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of written bytes. +- `peak_memory_usage` ([Int64](../../sql-reference/data-types/int-uint.md)) — The maximum difference between the amount of allocated and freed memory in context of this view. +- `ProfileEvents.Names` ([Array(String)](../../sql-reference/data-types/array.md)) — Counters that measure different metrics for this thread. The description of them could be found in the table [system.events](#system_tables-events). +- `ProfileEvents.Values` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Values of metrics for this thread that are listed in the `ProfileEvents.Names` column. +- `status` ([Enum8](../../sql-reference/data-types/enum.md)) — Status of the view. Values: + - `'Init' = 1` — The view was cancelled before writing anything to storage. + - `'WrittenPrefix' = 2` — The view was cancelled after writing its prefix to storage. + - `'WrittenBlock' = 3` — The view was cancelled after writing its blocks to storage. It might have materialized the input wholly, partially or none at all. + - `'WrittenSuffix' = 4` — The view wrote its suffix to storage. It completed successfully. +- `exception_code` ([Int32](../../sql-reference/data-types/int-uint.md)) — Code of an exception. +- `exception` ([String](../../sql-reference/data-types/string.md)) — Exception message. +- `stack_trace` ([String](../../sql-reference/data-types/string.md)) — [Stack trace](https://en.wikipedia.org/wiki/Stack_trace). An empty string, if the query was completed successfully. + +**Example** + +``` sql + SELECT * FROM system.query_views_log LIMIT 1 \G +``` + +``` text +Row 1: +────── +event_date: 2021-06-22 +event_time: 2021-06-22 13:23:07 +event_time_microseconds: 2021-06-22 13:23:07.738221 +view_duration_ms: 0 +initial_query_id: c3a1ac02-9cad-479b-af54-9e9c0a7afd70 +view_name: default.matview_inner +view_uuid: 00000000-0000-0000-0000-000000000000 +view_type: Materialized +view_query: SELECT * FROM default.table_b +view_target: default.`.inner.matview_inner` +read_rows: 4 +read_bytes: 64 +written_rows: 2 +written_bytes: 32 +peak_memory_usage: 4196188 +ProfileEvents.Names: ['FileOpen','WriteBufferFromFileDescriptorWrite','WriteBufferFromFileDescriptorWriteBytes','IOBufferAllocs','IOBufferAllocBytes','DiskWriteElapsedMicroseconds','InsertedRows','InsertedBytes','SelectedRows','SelectedBytes','ContextLock','RWLockAcquiredReadLocks','RealTimeMicroseconds','UserTimeMicroseconds','SystemTimeMicroseconds','SoftPageFaults'] +ProfileEvents.Values: [3,3,154,5,5242955,23,2,32,4,64,11,1,12458571345,1955,5860,110] +status: WrittenSuffix +exception_code: 0 +exception: +stack_trace: +``` + +**See Also** + +- [system.query_log](../../operations/system-tables/query_log.md#system_tables-query_log) — Description of the `query_log` system table which contains common information about queries execution. +- [system.query_thread_log](../../operations/system-tables/query_thread_log.md#system_tables-query_thread_log) — This table contains information about each query execution thread. + + +[Original article](https://clickhouse.tech/docs/en/operations/system_tables/query_thread_log) diff --git a/programs/server/config.xml b/programs/server/config.xml index 298fc176cfa..99edce8651f 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -320,7 +320,7 @@ The amount of data in mapped files can be monitored in system.metrics, system.metric_log by the MMappedFiles, MMappedFileBytes metrics and in system.asynchronous_metrics, system.asynchronous_metrics_log by the MMapCacheCells metric, - and also in system.events, system.processes, system.query_log, system.query_thread_log by the + and also in system.events, system.processes, system.query_log, system.query_thread_log, system.query_views_log by the CreatedReadBufferMMap, CreatedReadBufferMMapFailed, MMappedFileCacheHits, MMappedFileCacheMisses events. Note that the amount of data in mapped files does not consume memory directly and is not accounted in query or server memory usage - because this memory can be discarded similar to OS page cache. @@ -583,7 +583,7 @@ 9019 --> - + @@ -878,6 +878,15 @@ 7500 + + + system + query_views_log
+ toYYYYMM(event_date) + 7500 +
+ - - system - query_views_log
- toYYYYMM(event_date) - 7500 -
+ + system + query_views_log
+ toYYYYMM(event_date) + 7500 +
+