diff --git a/docs/en/engines/table-engines/mergetree-family/mergetree.md b/docs/en/engines/table-engines/mergetree-family/mergetree.md index 561b0ad8023..0c900454cd0 100644 --- a/docs/en/engines/table-engines/mergetree-family/mergetree.md +++ b/docs/en/engines/table-engines/mergetree-family/mergetree.md @@ -39,7 +39,10 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2], ... INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1, - INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2 + INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2, + ... + PROJECTION projection_name_1 (SELECT [GROUP BY] [ORDER BY]), + PROJECTION projection_name_2 (SELECT [GROUP BY] [ORDER BY]) ) ENGINE = MergeTree() ORDER BY expr [PARTITION BY expr] @@ -385,6 +388,24 @@ Functions with a constant argument that is less than ngram size can’t be used - `s != 1` - `NOT startsWith(s, 'test')` +### Projections {#projections} +Projections are like materialized views but defined in part-level. It provides consistency guarantees along with automatic usage in queries. + +#### Query {#projection-query} +A projection query is what defines a projection. It has the following grammar: + +`SELECT [GROUP BY] [ORDER BY]` + +It implicitly selects data from the parent table. + +#### Storage {#projection-storage} +Projections are stored inside the part directory. It's similar to an index but contains a subdirectory that stores an anonymous MergeTree table's part. The table is induced by the definition query of the projection. If there is a GROUP BY clause, the underlying storage engine becomes AggregatedMergeTree, and all aggregate functions are converted to AggregateFunction. If there is an ORDER BY clause, the MergeTree table will use it as its primary key expression. During the merge process, the projection part will be merged via its storage's merge routine. The checksum of the parent table's part will combine the projection's part. Other maintenance jobs are similar to skip indices. + +#### Query Analysis {#projection-query-analysis} +1. Check if the projection can be used to answer the given query, that is, it generates the same answer as querying the base table. +2. Select the best feasible match, which contains the least granules to read. +3. The query pipeline which uses projections will be different from the one that uses the original parts. If the projection is absent in some parts, we can add the pipeline to "project" it on the fly. + ## Concurrent Data Access {#concurrent-data-access} For concurrent table access, we use multi-versioning. In other words, when a table is simultaneously read and updated, data is read from a set of parts that is current at the time of the query. There are no lengthy locks. Inserts do not get in the way of read operations. diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index d7ffcff35fb..a620565b71a 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -892,6 +892,33 @@ If the table does not exist, ClickHouse will create it. If the structure of the ``` +## query_views_log {#server_configuration_parameters-query_views_log} + +Setting for logging views dependant of queries received with the [log_query_views=1](../../operations/settings/settings.md#settings-log-query-views) setting. + +Queries are logged in the [system.query_views_log](../../operations/system-tables/query_thread_log.md#system_tables-query_views_log) table, not in a separate file. You can change the name of the table in the `table` parameter (see below). + +Use the following parameters to configure logging: + +- `database` – Name of the database. +- `table` – Name of the system table the queries will be logged in. +- `partition_by` — [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined. +- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` defined. +- `flush_interval_milliseconds` – Interval for flushing data from the buffer in memory to the table. + +If the table does not exist, ClickHouse will create it. If the structure of the query views log changed when the ClickHouse server was updated, the table with the old structure is renamed, and a new table is created automatically. + +**Example** + +``` xml + + system + query_views_log
+ toYYYYMM(event_date) + 7500 +
+``` + ## text_log {#server_configuration_parameters-text_log} Settings for the [text_log](../../operations/system-tables/text_log.md#system_tables-text_log) system table for logging text messages. diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 4936c782299..07bfe158a0a 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -890,7 +890,7 @@ log_queries_min_type='EXCEPTION_WHILE_PROCESSING' Setting up query threads logging. -Queries’ threads runned by ClickHouse with this setup are logged according to the rules in the [query_thread_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log) server configuration parameter. +Queries’ threads run by ClickHouse with this setup are logged according to the rules in the [query_thread_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log) server configuration parameter. Example: @@ -898,6 +898,19 @@ Example: log_query_threads=1 ``` +## log_query_views {#settings-log-query-views} + +Setting up query views logging. + +When a query run by ClickHouse with this setup on has associated views (materialized or live views), they are logged in the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) server configuration parameter. + +Example: + +``` text +log_query_views=1 +``` + + ## log_comment {#settings-log-comment} Specifies the value for the `log_comment` field of the [system.query_log](../system-tables/query_log.md) table and comment text for the server log. diff --git a/docs/en/operations/system-tables/query_log.md b/docs/en/operations/system-tables/query_log.md index 987f1968356..548e454cf58 100644 --- a/docs/en/operations/system-tables/query_log.md +++ b/docs/en/operations/system-tables/query_log.md @@ -50,6 +50,7 @@ Columns: - `query_kind` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — Type of the query. - `databases` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the databases present in the query. - `tables` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the tables present in the query. +- `views` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the (materialized or live) views present in the query. - `columns` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the columns present in the query. - `projections` ([String](../../sql-reference/data-types/string.md)) — Names of the projections used during the query execution. - `exception_code` ([Int32](../../sql-reference/data-types/int-uint.md)) — Code of an exception. @@ -180,5 +181,6 @@ used_table_functions: [] **See Also** - [system.query_thread_log](../../operations/system-tables/query_thread_log.md#system_tables-query_thread_log) — This table contains information about each query execution thread. +- [system.query_views_log](../../operations/system-tables/query_views_log.md#system_tables-query_views_log) — This table contains information about each view executed during a query. [Original article](https://clickhouse.tech/docs/en/operations/system-tables/query_log) diff --git a/docs/en/operations/system-tables/query_thread_log.md b/docs/en/operations/system-tables/query_thread_log.md index 7ecea2971b4..152a10504bb 100644 --- a/docs/en/operations/system-tables/query_thread_log.md +++ b/docs/en/operations/system-tables/query_thread_log.md @@ -112,5 +112,6 @@ ProfileEvents: {'Query':1,'SelectQuery':1,'ReadCompressedBytes':36,'Compr **See Also** - [system.query_log](../../operations/system-tables/query_log.md#system_tables-query_log) — Description of the `query_log` system table which contains common information about queries execution. +- [system.query_views_log](../../operations/system-tables/query_views_log.md#system_tables-query_views_log) — This table contains information about each view executed during a query. [Original article](https://clickhouse.tech/docs/en/operations/system-tables/query_thread_log) diff --git a/docs/en/operations/system-tables/query_views_log.md b/docs/en/operations/system-tables/query_views_log.md new file mode 100644 index 00000000000..48d36a6a118 --- /dev/null +++ b/docs/en/operations/system-tables/query_views_log.md @@ -0,0 +1,81 @@ +# system.query_views_log {#system_tables-query_views_log} + +Contains information about the dependent views executed when running a query, for example, the view type or the execution time. + +To start logging: + +1. Configure parameters in the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) section. +2. Set [log_query_views](../../operations/settings/settings.md#settings-log-query-views) to 1. + +The flushing period of data is set in `flush_interval_milliseconds` parameter of the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) server settings section. To force flushing, use the [SYSTEM FLUSH LOGS](../../sql-reference/statements/system.md#query_language-system-flush_logs) query. + +ClickHouse does not delete data from the table automatically. See [Introduction](../../operations/system-tables/index.md#system-tables-introduction) for more details. + +Columns: + +- `event_date` ([Date](../../sql-reference/data-types/date.md)) — The date when the last event of the view happened. +- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — The date and time when the view finished execution. +- `event_time_microseconds` ([DateTime](../../sql-reference/data-types/datetime.md)) — The date and time when the view finished execution with microseconds precision. +- `view_duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Duration of view execution (sum of its stages) in milliseconds. +- `initial_query_id` ([String](../../sql-reference/data-types/string.md)) — ID of the initial query (for distributed query execution). +- `view_name` ([String](../../sql-reference/data-types/string.md)) — Name of the view. +- `view_uuid` ([UUID](../../sql-reference/data-types/uuid.md)) — UUID of the view. +- `view_type` ([Enum8](../../sql-reference/data-types/enum.md)) — Type of the view. Values: + - `'Default' = 1` — [Default views](../../sql-reference/statements/create/view.md#normal). Should not appear in this log. + - `'Materialized' = 2` — [Materialized views](../../sql-reference/statements/create/view.md#materialized). + - `'Live' = 3` — [Live views](../../sql-reference/statements/create/view.md#live-view). +- `view_query` ([String](../../sql-reference/data-types/string.md)) — The query executed by the view. +- `view_target` ([String](../../sql-reference/data-types/string.md)) — The name of the view target table. +- `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of read rows. +- `read_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of read bytes. +- `written_rows` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of written rows. +- `written_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of written bytes. +- `peak_memory_usage` ([Int64](../../sql-reference/data-types/int-uint.md)) — The maximum difference between the amount of allocated and freed memory in context of this view. +- `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/array.md)) — ProfileEvents that measure different metrics. The description of them could be found in the table [system.events](../../operations/system-tables/events.md#system_tables-events). +- `status` ([Enum8](../../sql-reference/data-types/enum.md)) — Status of the view. Values: + - `'QueryStart' = 1` — Successful start the view execution. Should not appear. + - `'QueryFinish' = 2` — Successful end of the view execution. + - `'ExceptionBeforeStart' = 3` — Exception before the start of the view execution. + - `'ExceptionWhileProcessing' = 4` — Exception during the view execution. +- `exception_code` ([Int32](../../sql-reference/data-types/int-uint.md)) — Code of an exception. +- `exception` ([String](../../sql-reference/data-types/string.md)) — Exception message. +- `stack_trace` ([String](../../sql-reference/data-types/string.md)) — [Stack trace](https://en.wikipedia.org/wiki/Stack_trace). An empty string, if the query was completed successfully. + +**Example** + +``` sql + SELECT * FROM system.query_views_log LIMIT 1 \G +``` + +``` text +Row 1: +────── +event_date: 2021-06-22 +event_time: 2021-06-22 13:23:07 +event_time_microseconds: 2021-06-22 13:23:07.738221 +view_duration_ms: 0 +initial_query_id: c3a1ac02-9cad-479b-af54-9e9c0a7afd70 +view_name: default.matview_inner +view_uuid: 00000000-0000-0000-0000-000000000000 +view_type: Materialized +view_query: SELECT * FROM default.table_b +view_target: default.`.inner.matview_inner` +read_rows: 4 +read_bytes: 64 +written_rows: 2 +written_bytes: 32 +peak_memory_usage: 4196188 +ProfileEvents: {'FileOpen':2,'WriteBufferFromFileDescriptorWrite':2,'WriteBufferFromFileDescriptorWriteBytes':187,'IOBufferAllocs':3,'IOBufferAllocBytes':3145773,'FunctionExecute':3,'DiskWriteElapsedMicroseconds':13,'InsertedRows':2,'InsertedBytes':16,'SelectedRows':4,'SelectedBytes':48,'ContextLock':16,'RWLockAcquiredReadLocks':1,'RealTimeMicroseconds':698,'SoftPageFaults':4,'OSReadChars':463} +status: QueryFinish +exception_code: 0 +exception: +stack_trace: +``` + +**See Also** + +- [system.query_log](../../operations/system-tables/query_log.md#system_tables-query_log) — Description of the `query_log` system table which contains common information about queries execution. +- [system.query_thread_log](../../operations/system-tables/query_thread_log.md#system_tables-query_thread_log) — This table contains information about each query execution thread. + + +[Original article](https://clickhouse.tech/docs/en/operations/system_tables/query_thread_log) diff --git a/programs/server/config.xml b/programs/server/config.xml index 78182482c1c..136b982a181 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -320,7 +320,7 @@ The amount of data in mapped files can be monitored in system.metrics, system.metric_log by the MMappedFiles, MMappedFileBytes metrics and in system.asynchronous_metrics, system.asynchronous_metrics_log by the MMapCacheCells metric, - and also in system.events, system.processes, system.query_log, system.query_thread_log by the + and also in system.events, system.processes, system.query_log, system.query_thread_log, system.query_views_log by the CreatedReadBufferMMap, CreatedReadBufferMMapFailed, MMappedFileCacheHits, MMappedFileCacheMisses events. Note that the amount of data in mapped files does not consume memory directly and is not accounted in query or server memory usage - because this memory can be discarded similar to OS page cache. @@ -878,6 +878,15 @@ 7500 + + + system + query_views_log
+ toYYYYMM(event_date) + 7500 +
+ diff --git a/programs/server/config.yaml.example b/programs/server/config.yaml.example index bebfd74ff58..de1b2143441 100644 --- a/programs/server/config.yaml.example +++ b/programs/server/config.yaml.example @@ -271,7 +271,7 @@ mark_cache_size: 5368709120 # The amount of data in mapped files can be monitored # in system.metrics, system.metric_log by the MMappedFiles, MMappedFileBytes metrics # and in system.asynchronous_metrics, system.asynchronous_metrics_log by the MMapCacheCells metric, -# and also in system.events, system.processes, system.query_log, system.query_thread_log by the +# and also in system.events, system.processes, system.query_log, system.query_thread_log, system.query_views_log by the # CreatedReadBufferMMap, CreatedReadBufferMMapFailed, MMappedFileCacheHits, MMappedFileCacheMisses events. # Note that the amount of data in mapped files does not consume memory directly and is not accounted # in query or server memory usage - because this memory can be discarded similar to OS page cache. @@ -731,6 +731,14 @@ query_thread_log: partition_by: toYYYYMM(event_date) flush_interval_milliseconds: 7500 +# Query views log. Has information about all dependent views associated with a query. +# Used only for queries with setting log_query_views = 1. +query_views_log: + database: system + table: query_views_log + partition_by: toYYYYMM(event_date) + flush_interval_milliseconds: 7500 + # Uncomment if use part log. # Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads). # part_log: diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index 641f8bbe0f0..09629b436b2 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -94,6 +94,22 @@ std::string getExceptionStackTraceString(const std::exception & e) #endif } +std::string getExceptionStackTraceString(std::exception_ptr e) +{ + try + { + std::rethrow_exception(e); + } + catch (const std::exception & exception) + { + return getExceptionStackTraceString(exception); + } + catch (...) + { + return {}; + } +} + std::string Exception::getStackTraceString() const { @@ -380,6 +396,30 @@ int getCurrentExceptionCode() } } +int getExceptionErrorCode(std::exception_ptr e) +{ + try + { + std::rethrow_exception(e); + } + catch (const Exception & exception) + { + return exception.code(); + } + catch (const Poco::Exception &) + { + return ErrorCodes::POCO_EXCEPTION; + } + catch (const std::exception &) + { + return ErrorCodes::STD_EXCEPTION; + } + catch (...) + { + return ErrorCodes::UNKNOWN_EXCEPTION; + } +} + void rethrowFirstException(const Exceptions & exceptions) { diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 79b4394948a..d04b0f71b9e 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -82,6 +82,7 @@ private: std::string getExceptionStackTraceString(const std::exception & e); +std::string getExceptionStackTraceString(std::exception_ptr e); /// Contains an additional member `saved_errno`. See the throwFromErrno function. @@ -167,6 +168,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded /// Returns error code from ErrorCodes int getCurrentExceptionCode(); +int getExceptionErrorCode(std::exception_ptr e); /// An execution status of any piece of code, contains return code and optional error diff --git a/src/Common/ThreadStatus.cpp b/src/Common/ThreadStatus.cpp index 0e12830e49d..81c6b8eb1c3 100644 --- a/src/Common/ThreadStatus.cpp +++ b/src/Common/ThreadStatus.cpp @@ -149,7 +149,11 @@ ThreadStatus::~ThreadStatus() if (deleter) deleter(); - current_thread = nullptr; + + /// Only change current_thread if it's currently being used by this ThreadStatus + /// For example, PushingToViewsBlockOutputStream creates and deletes ThreadStatus instances while running in the main query thread + if (current_thread == this) + current_thread = nullptr; } void ThreadStatus::updatePerformanceCounters() diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 6fc43114621..dbfb33a320c 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -37,6 +37,8 @@ struct RUsageCounters; struct PerfEventsCounters; class TaskStatsInfoGetter; class InternalTextLogsQueue; +struct ViewRuntimeData; +class QueryViewsLog; using InternalTextLogsQueuePtr = std::shared_ptr; using InternalTextLogsQueueWeakPtr = std::weak_ptr; @@ -143,6 +145,7 @@ protected: Poco::Logger * log = nullptr; friend class CurrentThread; + friend class PushingToViewsBlockOutputStream; /// Use ptr not to add extra dependencies in the header std::unique_ptr last_rusage; @@ -151,6 +154,9 @@ protected: /// Is used to send logs from logs_queue to client in case of fatal errors. std::function fatal_error_callback; + /// It is used to avoid enabling the query profiler when you have multiple ThreadStatus in the same thread + bool query_profiled_enabled = true; + public: ThreadStatus(); ~ThreadStatus(); @@ -210,9 +216,13 @@ public: /// Update ProfileEvents and dumps info to system.query_thread_log void finalizePerformanceCounters(); + /// Set the counters last usage to now + void resetPerformanceCountersLastUsage(); + /// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false); + protected: void applyQuerySettings(); @@ -224,6 +234,8 @@ protected: void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database, std::chrono::time_point now); + void logToQueryViewsLog(const ViewRuntimeData & vinfo); + void assertState(const std::initializer_list & permitted_states, const char * description = nullptr) const; diff --git a/src/Core/Settings.h b/src/Core/Settings.h index e1bd1d29153..099f4f399fc 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -173,7 +173,7 @@ class IColumn; M(Bool, log_queries, 1, "Log requests and write the log to the system table.", 0) \ M(Bool, log_formatted_queries, 0, "Log formatted queries and write the log to the system table.", 0) \ M(LogQueriesType, log_queries_min_type, QueryLogElementType::QUERY_START, "Minimal type in query_log to log, possible values (from low to high): QUERY_START, QUERY_FINISH, EXCEPTION_BEFORE_START, EXCEPTION_WHILE_PROCESSING.", 0) \ - M(Milliseconds, log_queries_min_query_duration_ms, 0, "Minimal time for the query to run, to get to the query_log/query_thread_log.", 0) \ + M(Milliseconds, log_queries_min_query_duration_ms, 0, "Minimal time for the query to run, to get to the query_log/query_thread_log/query_views_log.", 0) \ M(UInt64, log_queries_cut_to_length, 100000, "If query length is greater than specified threshold (in bytes), then cut query when writing to query log. Also limit length of printed query in ordinary text log.", 0) \ \ M(DistributedProductMode, distributed_product_mode, DistributedProductMode::DENY, "How are distributed subqueries performed inside IN or JOIN sections?", IMPORTANT) \ @@ -352,9 +352,10 @@ class IColumn; M(UInt64, max_network_bandwidth_for_user, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means unlimited.", 0)\ M(UInt64, max_network_bandwidth_for_all_users, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means unlimited.", 0) \ \ - M(Bool, log_profile_events, true, "Log query performance statistics into the query_log and query_thread_log.", 0) \ + M(Bool, log_profile_events, true, "Log query performance statistics into the query_log, query_thread_log and query_views_log.", 0) \ M(Bool, log_query_settings, true, "Log query settings into the query_log.", 0) \ M(Bool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.", 0) \ + M(Bool, log_query_views, true, "Log query dependent views into system.query_views_log table. This setting have effect only when 'log_queries' is true.", 0) \ M(String, log_comment, "", "Log comment into system.query_log table and server log. It can be set to arbitrary string no longer than max_query_size.", 0) \ M(LogsLevel, send_logs_level, LogsLevel::fatal, "Send server text logs with specified minimum level to client. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \ M(Bool, enable_optimize_predicate_expression, 1, "If it is set to true, optimize predicates to subqueries.", 0) \ diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 7729eb5fb44..aec1209a454 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -1,24 +1,31 @@ #include +#include +#include +#include #include #include -#include -#include #include #include -#include -#include #include +#include +#include #include -#include -#include -#include -#include -#include -#include #include +#include #include +#include +#include +#include +#include +#include +#include +#include +#include #include -#include +#include + +#include +#include namespace DB { @@ -79,9 +86,12 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( ASTPtr query; BlockOutputStreamPtr out; + QueryViewsLogElement::ViewType type = QueryViewsLogElement::ViewType::DEFAULT; + String target_name = database_table.getFullTableName(); if (auto * materialized_view = dynamic_cast(dependent_table.get())) { + type = QueryViewsLogElement::ViewType::MATERIALIZED; addTableLock( materialized_view->lockForShare(getContext()->getInitialQueryId(), getContext()->getSettingsRef().lock_acquire_timeout)); @@ -89,6 +99,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( auto inner_table_id = inner_table->getStorageID(); auto inner_metadata_snapshot = inner_table->getInMemoryMetadataPtr(); query = dependent_metadata_snapshot->getSelectQuery().inner_query; + target_name = inner_table_id.getFullTableName(); std::unique_ptr insert = std::make_unique(); insert->table_id = inner_table_id; @@ -114,14 +125,57 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( BlockIO io = interpreter.execute(); out = io.out; } - else if (dynamic_cast(dependent_table.get())) + else if (const auto * live_view = dynamic_cast(dependent_table.get())) + { + type = QueryViewsLogElement::ViewType::LIVE; + query = live_view->getInnerQuery(); // Used only to log in system.query_views_log out = std::make_shared( dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true); + } else out = std::make_shared( dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr()); - views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, 0 /* elapsed_ms */}); + /// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or + /// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics + std::unique_ptr thread_status = nullptr; + + ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup() + ? current_thread->getThreadGroup() + : MainThreadStatus::getInstance().thread_group; + if (running_group) + { + /// We are creating a ThreadStatus per view to store its metrics individually + /// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls + /// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work, + /// and switch back to the original thread_status. + auto * original_thread = current_thread; + SCOPE_EXIT({ current_thread = original_thread; }); + + thread_status = std::make_unique(); + /// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one + /// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means + /// N times more interruptions + thread_status->query_profiled_enabled = false; + thread_status->setupState(running_group); + } + + QueryViewsLogElement::ViewRuntimeStats runtime_stats{ + target_name, + type, + std::move(thread_status), + 0, + std::chrono::system_clock::now(), + QueryViewsLogElement::ViewStatus::EXCEPTION_BEFORE_START}; + views.emplace_back(ViewRuntimeData{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)}); + + + /// Add the view to the query access info so it can appear in system.query_log + if (!no_destination) + { + getContext()->getQueryContext()->addQueryAccessInfo( + backQuoteIfNeed(database_table.getDatabaseName()), target_name, {}, "", database_table.getFullTableName()); + } } /// Do not push to destination table if the flag is set @@ -136,7 +190,6 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( } } - Block PushingToViewsBlockOutputStream::getHeader() const { /// If we don't write directly to the destination @@ -147,6 +200,39 @@ Block PushingToViewsBlockOutputStream::getHeader() const return metadata_snapshot->getSampleBlockWithVirtuals(storage->getVirtuals()); } +/// Auxiliary function to do the setup and teardown to run a view individually and collect its metrics inside the view ThreadStatus +void inline runViewStage(ViewRuntimeData & view, const std::string & action, std::function stage) +{ + Stopwatch watch; + + auto * original_thread = current_thread; + SCOPE_EXIT({ current_thread = original_thread; }); + + if (view.runtime_stats.thread_status) + { + /// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread + view.runtime_stats.thread_status->resetPerformanceCountersLastUsage(); + current_thread = view.runtime_stats.thread_status.get(); + } + + try + { + stage(); + } + catch (Exception & ex) + { + ex.addMessage(action + " " + view.table_id.getNameForLogs()); + view.setException(std::current_exception()); + } + catch (...) + { + view.setException(std::current_exception()); + } + + if (view.runtime_stats.thread_status) + view.runtime_stats.thread_status->updatePerformanceCounters(); + view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); +} void PushingToViewsBlockOutputStream::write(const Block & block) { @@ -169,39 +255,34 @@ void PushingToViewsBlockOutputStream::write(const Block & block) output->write(block); } - /// Don't process materialized views if this block is duplicate - if (!getContext()->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views && replicated_output && replicated_output->lastBlockIsDuplicate()) + if (views.empty()) return; - // Insert data into materialized views only after successful insert into main table + /// Don't process materialized views if this block is duplicate const Settings & settings = getContext()->getSettingsRef(); - if (settings.parallel_view_processing && views.size() > 1) + if (!settings.deduplicate_blocks_in_dependent_materialized_views && replicated_output && replicated_output->lastBlockIsDuplicate()) + return; + + size_t max_threads = 1; + if (settings.parallel_view_processing) + max_threads = settings.max_threads ? std::min(static_cast(settings.max_threads), views.size()) : views.size(); + if (max_threads > 1) { - // Push to views concurrently if enabled and more than one view is attached - ThreadPool pool(std::min(size_t(settings.max_threads), views.size())); + ThreadPool pool(max_threads); for (auto & view : views) { - auto thread_group = CurrentThread::getGroup(); - pool.scheduleOrThrowOnError([=, &view, this] - { + pool.scheduleOrThrowOnError([&] { setThreadName("PushingToViews"); - if (thread_group) - CurrentThread::attachToIfDetached(thread_group); - process(block, view); + runViewStage(view, "while pushing to view", [&]() { process(block, view); }); }); } - // Wait for concurrent view processing pool.wait(); } else { - // Process sequentially for (auto & view : views) { - process(block, view); - - if (view.exception) - std::rethrow_exception(view.exception); + runViewStage(view, "while pushing to view", [&]() { process(block, view); }); } } } @@ -213,14 +294,11 @@ void PushingToViewsBlockOutputStream::writePrefix() for (auto & view : views) { - try + runViewStage(view, "while writing prefix to view", [&] { view.out->writePrefix(); }); + if (view.exception) { - view.out->writePrefix(); - } - catch (Exception & ex) - { - ex.addMessage("while write prefix to view " + view.table_id.getNameForLogs()); - throw; + logQueryViews(); + std::rethrow_exception(view.exception); } } } @@ -230,95 +308,92 @@ void PushingToViewsBlockOutputStream::writeSuffix() if (output) output->writeSuffix(); - std::exception_ptr first_exception; + if (views.empty()) + return; - const Settings & settings = getContext()->getSettingsRef(); - bool parallel_processing = false; + auto process_suffix = [](ViewRuntimeData & view) + { + view.out->writeSuffix(); + view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::QUERY_FINISH); + }; + static std::string stage_step = "while writing suffix to view"; /// Run writeSuffix() for views in separate thread pool. /// In could have been done in PushingToViewsBlockOutputStream::process, however /// it is not good if insert into main table fail but into view succeed. - if (settings.parallel_view_processing && views.size() > 1) + const Settings & settings = getContext()->getSettingsRef(); + size_t max_threads = 1; + if (settings.parallel_view_processing) + max_threads = settings.max_threads ? std::min(static_cast(settings.max_threads), views.size()) : views.size(); + bool exception_happened = false; + if (max_threads > 1) { - parallel_processing = true; - - // Push to views concurrently if enabled and more than one view is attached - ThreadPool pool(std::min(size_t(settings.max_threads), views.size())); - auto thread_group = CurrentThread::getGroup(); - + ThreadPool pool(max_threads); + std::atomic_uint8_t exception_count = 0; for (auto & view : views) { if (view.exception) - continue; - - pool.scheduleOrThrowOnError([thread_group, &view, this] { + exception_happened = true; + continue; + } + pool.scheduleOrThrowOnError([&] { setThreadName("PushingToViews"); - if (thread_group) - CurrentThread::attachToIfDetached(thread_group); - Stopwatch watch; - try + runViewStage(view, stage_step, [&] { process_suffix(view); }); + if (view.exception) { - view.out->writeSuffix(); + exception_count.fetch_add(1, std::memory_order_relaxed); } - catch (...) + else { - view.exception = std::current_exception(); + LOG_TRACE( + log, + "Pushing (parallel {}) from {} to {} took {} ms.", + max_threads, + storage->getStorageID().getNameForLogs(), + view.table_id.getNameForLogs(), + view.runtime_stats.elapsed_ms); } - view.elapsed_ms += watch.elapsedMilliseconds(); - - LOG_TRACE(log, "Pushing from {} to {} took {} ms.", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - view.elapsed_ms); }); } - // Wait for concurrent view processing pool.wait(); + exception_happened |= exception_count.load(std::memory_order_relaxed) != 0; } - - for (auto & view : views) + else { - if (view.exception) + for (auto & view : views) { - if (!first_exception) - first_exception = view.exception; - - continue; + if (view.exception) + { + exception_happened = true; + continue; + } + runViewStage(view, stage_step, [&] { process_suffix(view); }); + if (view.exception) + { + exception_happened = true; + } + else + { + LOG_TRACE( + log, + "Pushing (sequentially) from {} to {} took {} ms.", + storage->getStorageID().getNameForLogs(), + view.table_id.getNameForLogs(), + view.runtime_stats.elapsed_ms); + } } - - if (parallel_processing) - continue; - - Stopwatch watch; - try - { - view.out->writeSuffix(); - } - catch (Exception & ex) - { - ex.addMessage("while write prefix to view " + view.table_id.getNameForLogs()); - throw; - } - view.elapsed_ms += watch.elapsedMilliseconds(); - - LOG_TRACE(log, "Pushing from {} to {} took {} ms.", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - view.elapsed_ms); } + if (exception_happened) + checkExceptionsInViews(); - if (first_exception) - std::rethrow_exception(first_exception); - - UInt64 milliseconds = main_watch.elapsedMilliseconds(); if (views.size() > 1) { - LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.", - storage->getStorageID().getNameForLogs(), views.size(), - milliseconds); + UInt64 milliseconds = main_watch.elapsedMilliseconds(); + LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.", storage->getStorageID().getNameForLogs(), views.size(), milliseconds); } + logQueryViews(); } void PushingToViewsBlockOutputStream::flush() @@ -330,70 +405,103 @@ void PushingToViewsBlockOutputStream::flush() view.out->flush(); } -void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & view) +void PushingToViewsBlockOutputStream::process(const Block & block, ViewRuntimeData & view) { - Stopwatch watch; + BlockInputStreamPtr in; - try + /// We need keep InterpreterSelectQuery, until the processing will be finished, since: + /// + /// - We copy Context inside InterpreterSelectQuery to support + /// modification of context (Settings) for subqueries + /// - InterpreterSelectQuery lives shorter than query pipeline. + /// It's used just to build the query pipeline and no longer needed + /// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery, + /// **can** take a reference to Context from InterpreterSelectQuery + /// (the problem raises only when function uses context from the + /// execute*() method, like FunctionDictGet do) + /// - These objects live inside query pipeline (DataStreams) and the reference become dangling. + std::optional select; + + if (view.runtime_stats.type == QueryViewsLogElement::ViewType::MATERIALIZED) { - BlockInputStreamPtr in; + /// We create a table with the same name as original table and the same alias columns, + /// but it will contain single block (that is INSERT-ed into main table). + /// InterpreterSelectQuery will do processing of alias columns. - /// We need keep InterpreterSelectQuery, until the processing will be finished, since: - /// - /// - We copy Context inside InterpreterSelectQuery to support - /// modification of context (Settings) for subqueries - /// - InterpreterSelectQuery lives shorter than query pipeline. - /// It's used just to build the query pipeline and no longer needed - /// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery, - /// **can** take a reference to Context from InterpreterSelectQuery - /// (the problem raises only when function uses context from the - /// execute*() method, like FunctionDictGet do) - /// - These objects live inside query pipeline (DataStreams) and the reference become dangling. - std::optional select; + auto local_context = Context::createCopy(select_context); + local_context->addViewSource( + StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals())); + select.emplace(view.query, local_context, SelectQueryOptions()); + in = std::make_shared(select->execute().getInputStream()); - if (view.query) - { - /// We create a table with the same name as original table and the same alias columns, - /// but it will contain single block (that is INSERT-ed into main table). - /// InterpreterSelectQuery will do processing of alias columns. - - auto local_context = Context::createCopy(select_context); - local_context->addViewSource( - StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals())); - select.emplace(view.query, local_context, SelectQueryOptions()); - in = std::make_shared(select->execute().getInputStream()); - - /// Squashing is needed here because the materialized view query can generate a lot of blocks - /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY - /// and two-level aggregation is triggered). - in = std::make_shared( - in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes); - in = std::make_shared(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name); - } - else - in = std::make_shared(block); - - in->readPrefix(); - - while (Block result_block = in->read()) - { - Nested::validateArraySizes(result_block); - view.out->write(result_block); - } - - in->readSuffix(); + /// Squashing is needed here because the materialized view query can generate a lot of blocks + /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY + /// and two-level aggregation is triggered). + in = std::make_shared( + in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes); + in = std::make_shared(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name); } - catch (Exception & ex) + else + in = std::make_shared(block); + + in->setProgressCallback([this](const Progress & progress) { - ex.addMessage("while pushing to view " + view.table_id.getNameForLogs()); - view.exception = std::current_exception(); - } - catch (...) + CurrentThread::updateProgressIn(progress); + this->onProgress(progress); + }); + + in->readPrefix(); + + while (Block result_block = in->read()) { - view.exception = std::current_exception(); + Nested::validateArraySizes(result_block); + view.out->write(result_block); } - view.elapsed_ms += watch.elapsedMilliseconds(); + in->readSuffix(); } +void PushingToViewsBlockOutputStream::checkExceptionsInViews() +{ + for (auto & view : views) + { + if (view.exception) + { + logQueryViews(); + std::rethrow_exception(view.exception); + } + } +} + +void PushingToViewsBlockOutputStream::logQueryViews() +{ + const auto & settings = getContext()->getSettingsRef(); + const UInt64 min_query_duration = settings.log_queries_min_query_duration_ms.totalMilliseconds(); + const QueryViewsLogElement::ViewStatus min_status = settings.log_queries_min_type; + if (views.empty() || !settings.log_queries || !settings.log_query_views) + return; + + for (auto & view : views) + { + if ((min_query_duration && view.runtime_stats.elapsed_ms <= min_query_duration) || (view.runtime_stats.event_status < min_status)) + continue; + + try + { + if (view.runtime_stats.thread_status) + view.runtime_stats.thread_status->logToQueryViewsLog(view); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +} + + +void PushingToViewsBlockOutputStream::onProgress(const Progress & progress) +{ + if (getContext()->getProgressCallback()) + getContext()->getProgressCallback()(progress); +} } diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.h b/src/DataStreams/PushingToViewsBlockOutputStream.h index db6b671ce2c..ba125e28829 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -8,13 +9,28 @@ namespace Poco { class Logger; -}; +} namespace DB { class ReplicatedMergeTreeSink; +struct ViewRuntimeData +{ + const ASTPtr query; + StorageID table_id; + BlockOutputStreamPtr out; + std::exception_ptr exception; + QueryViewsLogElement::ViewRuntimeStats runtime_stats; + + void setException(std::exception_ptr e) + { + exception = e; + runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::EXCEPTION_WHILE_PROCESSING); + } +}; + /** Writes data to the specified table and to all dependent materialized views. */ class PushingToViewsBlockOutputStream : public IBlockOutputStream, WithContext @@ -33,6 +49,7 @@ public: void flush() override; void writePrefix() override; void writeSuffix() override; + void onProgress(const Progress & progress) override; private: StoragePtr storage; @@ -44,20 +61,13 @@ private: ASTPtr query_ptr; Stopwatch main_watch; - struct ViewInfo - { - ASTPtr query; - StorageID table_id; - BlockOutputStreamPtr out; - std::exception_ptr exception; - UInt64 elapsed_ms = 0; - }; - - std::vector views; + std::vector views; ContextMutablePtr select_context; ContextMutablePtr insert_context; - void process(const Block & block, ViewInfo & view); + void process(const Block & block, ViewRuntimeData & view); + void checkExceptionsInViews(); + void logQueryViews(); }; diff --git a/src/Functions/registerFunctionsTuple.cpp b/src/Functions/registerFunctionsTuple.cpp index 12092e1e7e0..33f078675e9 100644 --- a/src/Functions/registerFunctionsTuple.cpp +++ b/src/Functions/registerFunctionsTuple.cpp @@ -5,11 +5,13 @@ class FunctionFactory; void registerFunctionTuple(FunctionFactory &); void registerFunctionTupleElement(FunctionFactory &); +void registerFunctionTupleToNameValuePairs(FunctionFactory &); void registerFunctionsTuple(FunctionFactory & factory) { registerFunctionTuple(factory); registerFunctionTupleElement(factory); + registerFunctionTupleToNameValuePairs(factory); } } diff --git a/src/Functions/tupleToNameValuePairs.cpp b/src/Functions/tupleToNameValuePairs.cpp new file mode 100644 index 00000000000..c3e5f28037b --- /dev/null +++ b/src/Functions/tupleToNameValuePairs.cpp @@ -0,0 +1,131 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} + +namespace +{ + +/** Transform a named tuple into an array of pairs, where the first element + * of the pair corresponds to the tuple field name and the second one to the + * tuple value. + */ +class FunctionTupleToNameValuePairs : public IFunction +{ +public: + static constexpr auto name = "tupleToNameValuePairs"; + static FunctionPtr create(ContextPtr) + { + return std::make_shared(); + } + + String getName() const override + { + return name; + } + + size_t getNumberOfArguments() const override + { + return 1; + } + + bool useDefaultImplementationForConstants() const override + { + return true; + } + + DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override + { + // get the type of all the fields in the tuple + const IDataType * col = arguments[0].type.get(); + const DataTypeTuple * tuple = checkAndGetDataType(col); + + if (!tuple) + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "First argument for function {} must be a tuple.", + getName()); + + const auto & element_types = tuple->getElements(); + + if (element_types.empty()) + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "The argument tuple for function {} must not be empty.", + getName()); + + const auto & first_element_type = element_types[0]; + + bool all_value_types_equal = std::all_of(element_types.begin() + 1, + element_types.end(), + [&](const auto &other) + { + return first_element_type->equals(*other); + }); + + if (!all_value_types_equal) + { + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "The argument tuple for function {} must contain just one type.", + getName()); + } + + DataTypePtr tuple_name_type = std::make_shared(); + DataTypes item_data_types = {tuple_name_type, + first_element_type}; + + auto item_data_type = std::make_shared(item_data_types); + + return std::make_shared(item_data_type); + } + + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override + { + const IColumn * tuple_col = arguments[0].column.get(); + const DataTypeTuple * tuple = checkAndGetDataType(arguments[0].type.get()); + const auto * tuple_col_concrete = assert_cast(tuple_col); + + auto keys = ColumnString::create(); + MutableColumnPtr values = tuple_col_concrete->getColumn(0).cloneEmpty(); + auto offsets = ColumnVector::create(); + for (size_t row = 0; row < tuple_col_concrete->size(); ++row) + { + for (size_t col = 0; col < tuple_col_concrete->tupleSize(); ++col) + { + const std::string & key = tuple->getElementNames()[col]; + const IColumn & value_column = tuple_col_concrete->getColumn(col); + + values->insertFrom(value_column, row); + keys->insertData(key.data(), key.size()); + } + offsets->insertValue(tuple_col_concrete->tupleSize() * (row + 1)); + } + + std::vector tuple_columns = { std::move(keys), std::move(values) }; + auto tuple_column = ColumnTuple::create(std::move(tuple_columns)); + return ColumnArray::create(std::move(tuple_column), std::move(offsets)); + } +}; + +} + +void registerFunctionTupleToNameValuePairs(FunctionFactory & factory) +{ + factory.registerFunction(); +} + +} diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index bda174776e0..bd15af76db0 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1088,7 +1088,11 @@ bool Context::hasScalar(const String & name) const void Context::addQueryAccessInfo( - const String & quoted_database_name, const String & full_quoted_table_name, const Names & column_names, const String & projection_name) + const String & quoted_database_name, + const String & full_quoted_table_name, + const Names & column_names, + const String & projection_name, + const String & view_name) { assert(!isGlobalContext() || getApplicationType() == ApplicationType::LOCAL); std::lock_guard lock(query_access_info.mutex); @@ -1098,6 +1102,8 @@ void Context::addQueryAccessInfo( query_access_info.columns.emplace(full_quoted_table_name + "." + backQuoteIfNeed(column_name)); if (!projection_name.empty()) query_access_info.projections.emplace(full_quoted_table_name + "." + backQuoteIfNeed(projection_name)); + if (!view_name.empty()) + query_access_info.views.emplace(view_name); } @@ -2118,7 +2124,6 @@ std::shared_ptr Context::getQueryLog() const return shared->system_logs->query_log; } - std::shared_ptr Context::getQueryThreadLog() const { auto lock = getLock(); @@ -2129,6 +2134,15 @@ std::shared_ptr Context::getQueryThreadLog() const return shared->system_logs->query_thread_log; } +std::shared_ptr Context::getQueryViewsLog() const +{ + auto lock = getLock(); + + if (!shared->system_logs) + return {}; + + return shared->system_logs->query_views_log; +} std::shared_ptr Context::getPartLog(const String & part_database) const { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 66fac7e6e70..d3a77e0039b 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -70,6 +70,7 @@ struct Progress; class Clusters; class QueryLog; class QueryThreadLog; +class QueryViewsLog; class PartLog; class TextLog; class TraceLog; @@ -219,6 +220,7 @@ private: tables = rhs.tables; columns = rhs.columns; projections = rhs.projections; + views = rhs.views; } QueryAccessInfo(QueryAccessInfo && rhs) = delete; @@ -235,6 +237,7 @@ private: std::swap(tables, rhs.tables); std::swap(columns, rhs.columns); std::swap(projections, rhs.projections); + std::swap(views, rhs.views); } /// To prevent a race between copy-constructor and other uses of this structure. @@ -242,7 +245,8 @@ private: std::set databases{}; std::set tables{}; std::set columns{}; - std::set projections; + std::set projections{}; + std::set views{}; }; QueryAccessInfo query_access_info; @@ -469,7 +473,8 @@ public: const String & quoted_database_name, const String & full_quoted_table_name, const Names & column_names, - const String & projection_name = {}); + const String & projection_name = {}, + const String & view_name = {}); /// Supported factories for records in query_log enum class QueryLogFactories @@ -730,6 +735,7 @@ public: /// Nullptr if the query log is not ready for this moment. std::shared_ptr getQueryLog() const; std::shared_ptr getQueryThreadLog() const; + std::shared_ptr getQueryViewsLog() const; std::shared_ptr getTraceLog() const; std::shared_ptr getTextLog() const; std::shared_ptr getMetricLog() const; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 49ebd3d48b0..edcef191e73 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1965,12 +1965,14 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc if (context->hasQueryContext() && !options.is_internal) { + const String view_name{}; auto local_storage_id = storage->getStorageID(); context->getQueryContext()->addQueryAccessInfo( backQuoteIfNeed(local_storage_id.getDatabaseName()), local_storage_id.getFullTableName(), required_columns, - query_info.projection ? query_info.projection->desc->name : ""); + query_info.projection ? query_info.projection->desc->name : "", + view_name); } /// Create step which reads from empty source if storage has no data. diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index e1ca021deeb..d4ac555add0 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -418,6 +419,7 @@ BlockIO InterpreterSystemQuery::execute() [&] { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); }, [&] { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); }, [&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); }, + [&] { if (auto query_views_log = getContext()->getQueryViewsLog()) query_views_log->flush(true); }, [&] { if (auto zookeeper_log = getContext()->getZooKeeperLog()) zookeeper_log->flush(true); } ); break; diff --git a/src/Interpreters/QueryLog.cpp b/src/Interpreters/QueryLog.cpp index 0f7ff579f5d..2cbb9634446 100644 --- a/src/Interpreters/QueryLog.cpp +++ b/src/Interpreters/QueryLog.cpp @@ -68,6 +68,8 @@ NamesAndTypesList QueryLogElement::getNamesAndTypes() std::make_shared(std::make_shared()))}, {"projections", std::make_shared( std::make_shared(std::make_shared()))}, + {"views", std::make_shared( + std::make_shared(std::make_shared()))}, {"exception_code", std::make_shared()}, {"exception", std::make_shared()}, {"stack_trace", std::make_shared()}, @@ -161,6 +163,7 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const auto & column_tables = typeid_cast(*columns[i++]); auto & column_columns = typeid_cast(*columns[i++]); auto & column_projections = typeid_cast(*columns[i++]); + auto & column_views = typeid_cast(*columns[i++]); auto fill_column = [](const std::set & data, ColumnArray & column) { @@ -178,6 +181,7 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const fill_column(query_tables, column_tables); fill_column(query_columns, column_columns); fill_column(query_projections, column_projections); + fill_column(query_views, column_views); } columns[i++]->insert(exception_code); diff --git a/src/Interpreters/QueryLog.h b/src/Interpreters/QueryLog.h index aad3e56190b..2713febe1b6 100644 --- a/src/Interpreters/QueryLog.h +++ b/src/Interpreters/QueryLog.h @@ -59,6 +59,7 @@ struct QueryLogElement std::set query_tables; std::set query_columns; std::set query_projections; + std::set query_views; std::unordered_set used_aggregate_functions; std::unordered_set used_aggregate_function_combinators; diff --git a/src/Interpreters/QueryViewsLog.cpp b/src/Interpreters/QueryViewsLog.cpp new file mode 100644 index 00000000000..fa6fcf66a87 --- /dev/null +++ b/src/Interpreters/QueryViewsLog.cpp @@ -0,0 +1,104 @@ +#include "QueryViewsLog.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +NamesAndTypesList QueryViewsLogElement::getNamesAndTypes() +{ + auto view_status_datatype = std::make_shared(DataTypeEnum8::Values{ + {"QueryStart", static_cast(QUERY_START)}, + {"QueryFinish", static_cast(QUERY_FINISH)}, + {"ExceptionBeforeStart", static_cast(EXCEPTION_BEFORE_START)}, + {"ExceptionWhileProcessing", static_cast(EXCEPTION_WHILE_PROCESSING)}}); + + auto view_type_datatype = std::make_shared(DataTypeEnum8::Values{ + {"Default", static_cast(ViewType::DEFAULT)}, + {"Materialized", static_cast(ViewType::MATERIALIZED)}, + {"Live", static_cast(ViewType::LIVE)}}); + + return { + {"event_date", std::make_shared()}, + {"event_time", std::make_shared()}, + {"event_time_microseconds", std::make_shared(6)}, + {"view_duration_ms", std::make_shared()}, + + {"initial_query_id", std::make_shared()}, + {"view_name", std::make_shared()}, + {"view_uuid", std::make_shared()}, + {"view_type", std::move(view_type_datatype)}, + {"view_query", std::make_shared()}, + {"view_target", std::make_shared()}, + + {"read_rows", std::make_shared()}, + {"read_bytes", std::make_shared()}, + {"written_rows", std::make_shared()}, + {"written_bytes", std::make_shared()}, + {"peak_memory_usage", std::make_shared()}, + {"ProfileEvents", std::make_shared(std::make_shared(), std::make_shared())}, + + {"status", std::move(view_status_datatype)}, + {"exception_code", std::make_shared()}, + {"exception", std::make_shared()}, + {"stack_trace", std::make_shared()}}; +} + +NamesAndAliases QueryViewsLogElement::getNamesAndAliases() +{ + return { + {"ProfileEvents.Names", {std::make_shared(std::make_shared())}, "mapKeys(ProfileEvents)"}, + {"ProfileEvents.Values", {std::make_shared(std::make_shared())}, "mapValues(ProfileEvents)"}}; +} + +void QueryViewsLogElement::appendToBlock(MutableColumns & columns) const +{ + size_t i = 0; + + columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType()); // event_date + columns[i++]->insert(event_time); + columns[i++]->insert(event_time_microseconds); + columns[i++]->insert(view_duration_ms); + + columns[i++]->insertData(initial_query_id.data(), initial_query_id.size()); + columns[i++]->insertData(view_name.data(), view_name.size()); + columns[i++]->insert(view_uuid); + columns[i++]->insert(view_type); + columns[i++]->insertData(view_query.data(), view_query.size()); + columns[i++]->insertData(view_target.data(), view_target.size()); + + columns[i++]->insert(read_rows); + columns[i++]->insert(read_bytes); + columns[i++]->insert(written_rows); + columns[i++]->insert(written_bytes); + columns[i++]->insert(peak_memory_usage); + + if (profile_counters) + { + auto * column = columns[i++].get(); + ProfileEvents::dumpToMapColumn(*profile_counters, column, true); + } + else + { + columns[i++]->insertDefault(); + } + + columns[i++]->insert(status); + columns[i++]->insert(exception_code); + columns[i++]->insertData(exception.data(), exception.size()); + columns[i++]->insertData(stack_trace.data(), stack_trace.size()); +} + +} diff --git a/src/Interpreters/QueryViewsLog.h b/src/Interpreters/QueryViewsLog.h new file mode 100644 index 00000000000..e751224a51e --- /dev/null +++ b/src/Interpreters/QueryViewsLog.h @@ -0,0 +1,87 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace ProfileEvents +{ +class Counters; +} + +namespace DB +{ +class ThreadStatus; + +struct QueryViewsLogElement +{ + using ViewStatus = QueryLogElementType; + + enum class ViewType : int8_t + { + DEFAULT = 1, + MATERIALIZED = 2, + LIVE = 3 + }; + + struct ViewRuntimeStats + { + String target_name; + ViewType type = ViewType::DEFAULT; + std::unique_ptr thread_status = nullptr; + UInt64 elapsed_ms = 0; + std::chrono::time_point event_time; + ViewStatus event_status = ViewStatus::QUERY_START; + + void setStatus(ViewStatus s) + { + event_status = s; + event_time = std::chrono::system_clock::now(); + } + }; + + time_t event_time{}; + Decimal64 event_time_microseconds{}; + UInt64 view_duration_ms{}; + + String initial_query_id; + String view_name; + UUID view_uuid{UUIDHelpers::Nil}; + ViewType view_type{ViewType::DEFAULT}; + String view_query; + String view_target; + + UInt64 read_rows{}; + UInt64 read_bytes{}; + UInt64 written_rows{}; + UInt64 written_bytes{}; + Int64 peak_memory_usage{}; + std::shared_ptr profile_counters; + + ViewStatus status = ViewStatus::QUERY_START; + Int32 exception_code{}; + String exception; + String stack_trace; + + static std::string name() { return "QueryLog"; } + + static NamesAndTypesList getNamesAndTypes(); + static NamesAndAliases getNamesAndAliases(); + void appendToBlock(MutableColumns & columns) const; +}; + + +class QueryViewsLog : public SystemLog +{ + using SystemLog::SystemLog; +}; + +} diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index d3224a53ccd..5c5a6e1439a 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -1,13 +1,14 @@ -#include -#include -#include -#include -#include -#include +#include #include #include -#include #include +#include +#include +#include +#include +#include +#include +#include #include #include @@ -104,6 +105,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf opentelemetry_span_log = createSystemLog( global_context, "system", "opentelemetry_span_log", config, "opentelemetry_span_log"); + query_views_log = createSystemLog(global_context, "system", "query_views_log", config, "query_views_log"); zookeeper_log = createSystemLog(global_context, "system", "zookeeper_log", config, "zookeeper_log"); if (query_log) @@ -124,6 +126,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf logs.emplace_back(asynchronous_metric_log.get()); if (opentelemetry_span_log) logs.emplace_back(opentelemetry_span_log.get()); + if (query_views_log) + logs.emplace_back(query_views_log.get()); if (zookeeper_log) logs.emplace_back(zookeeper_log.get()); diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index 176bf60908b..a332245439b 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -73,6 +73,7 @@ class CrashLog; class MetricLog; class AsynchronousMetricLog; class OpenTelemetrySpanLog; +class QueryViewsLog; class ZooKeeperLog; @@ -110,6 +111,8 @@ struct SystemLogs std::shared_ptr asynchronous_metric_log; /// OpenTelemetry trace spans. std::shared_ptr opentelemetry_span_log; + /// Used to log queries of materialized and live views + std::shared_ptr query_views_log; /// Used to log all actions of ZooKeeper client std::shared_ptr zookeeper_log; diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 8590b3c94f3..2917a399906 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -1,12 +1,17 @@ #include +#include #include -#include #include +#include #include +#include +#include #include #include +#include #include +#include #include #include #include @@ -18,6 +23,14 @@ # include #endif +namespace ProfileEvents +{ +extern const Event SelectedRows; +extern const Event SelectedBytes; +extern const Event InsertedRows; +extern const Event InsertedBytes; +} + /// Implement some methods of ThreadStatus and CurrentThread here to avoid extra linking dependencies in clickhouse_common_io /// TODO It doesn't make sense. @@ -287,8 +300,18 @@ void ThreadStatus::finalizePerformanceCounters() } } +void ThreadStatus::resetPerformanceCountersLastUsage() +{ + *last_rusage = RUsageCounters::current(); + if (taskstats) + taskstats->reset(); +} + void ThreadStatus::initQueryProfiler() { + if (!query_profiled_enabled) + return; + /// query profilers are useless without trace collector auto global_context_ptr = global_context.lock(); if (!global_context_ptr || !global_context_ptr->hasTraceCollector()) @@ -455,6 +478,64 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String thread_log.add(elem); } +static String getCleanQueryAst(const ASTPtr q, ContextPtr context) +{ + String res = serializeAST(*q, true); + if (auto * masker = SensitiveDataMasker::getInstance()) + masker->wipeSensitiveData(res); + + res = res.substr(0, context->getSettingsRef().log_queries_cut_to_length); + + return res; +} + +void ThreadStatus::logToQueryViewsLog(const ViewRuntimeData & vinfo) +{ + auto query_context_ptr = query_context.lock(); + if (!query_context_ptr) + return; + auto views_log = query_context_ptr->getQueryViewsLog(); + if (!views_log) + return; + + QueryViewsLogElement element; + + element.event_time = time_in_seconds(vinfo.runtime_stats.event_time); + element.event_time_microseconds = time_in_microseconds(vinfo.runtime_stats.event_time); + element.view_duration_ms = vinfo.runtime_stats.elapsed_ms; + + element.initial_query_id = query_id; + element.view_name = vinfo.table_id.getFullTableName(); + element.view_uuid = vinfo.table_id.uuid; + element.view_type = vinfo.runtime_stats.type; + if (vinfo.query) + element.view_query = getCleanQueryAst(vinfo.query, query_context_ptr); + element.view_target = vinfo.runtime_stats.target_name; + + auto events = std::make_shared(performance_counters.getPartiallyAtomicSnapshot()); + element.read_rows = progress_in.read_rows.load(std::memory_order_relaxed); + element.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed); + element.written_rows = (*events)[ProfileEvents::InsertedRows]; + element.written_bytes = (*events)[ProfileEvents::InsertedBytes]; + element.peak_memory_usage = memory_tracker.getPeak() > 0 ? memory_tracker.getPeak() : 0; + if (query_context_ptr->getSettingsRef().log_profile_events != 0) + { + element.profile_counters = events; + } + + element.status = vinfo.runtime_stats.event_status; + element.exception_code = 0; + if (vinfo.exception) + { + element.exception_code = getExceptionErrorCode(vinfo.exception); + element.exception = getExceptionMessage(vinfo.exception, false); + if (query_context_ptr->getSettingsRef().calculate_text_stack_trace) + element.stack_trace = getExceptionStackTraceString(vinfo.exception); + } + + views_log->add(element); +} + void CurrentThread::initializeQuery() { if (unlikely(!current_thread)) diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 1b59f3bc7df..3ebc2eb142c 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -663,6 +663,7 @@ static std::tuple executeQueryImpl( elem.query_tables = info.tables; elem.query_columns = info.columns; elem.query_projections = info.projections; + elem.query_views = info.views; } interpreter->extendQueryLogElem(elem, ast, context, query_database, query_table); @@ -708,6 +709,15 @@ static std::tuple executeQueryImpl( element.thread_ids = std::move(info.thread_ids); element.profile_counters = std::move(info.profile_counters); + /// We need to refresh the access info since dependent views might have added extra information, either during + /// creation of the view (PushingToViewsBlockOutputStream) or while executing its internal SELECT + const auto & access_info = context_ptr->getQueryAccessInfo(); + element.query_databases.insert(access_info.databases.begin(), access_info.databases.end()); + element.query_tables.insert(access_info.tables.begin(), access_info.tables.end()); + element.query_columns.insert(access_info.columns.begin(), access_info.columns.end()); + element.query_projections.insert(access_info.projections.begin(), access_info.projections.end()); + element.query_views.insert(access_info.views.begin(), access_info.views.end()); + const auto & factories_info = context_ptr->getQueryFactoriesInfo(); element.used_aggregate_functions = factories_info.aggregate_functions; element.used_aggregate_function_combinators = factories_info.aggregate_function_combinators; diff --git a/src/Interpreters/ya.make b/src/Interpreters/ya.make index 462c778bf3d..c0816bb671c 100644 --- a/src/Interpreters/ya.make +++ b/src/Interpreters/ya.make @@ -131,6 +131,7 @@ SRCS( QueryNormalizer.cpp QueryParameterVisitor.cpp QueryThreadLog.cpp + QueryViewsLog.cpp RemoveInjectiveFunctionsVisitor.cpp RenameColumnVisitor.cpp ReplaceQueryParameterVisitor.cpp diff --git a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py index 9cbf8771ee5..b0b89fde41f 100644 --- a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py +++ b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py @@ -42,6 +42,7 @@ def test_distributed_directory_monitor_split_batch_on_failure_OFF(started_cluste # max_memory_usage is the limit for the batch on the remote node # (local query should not be affected since 30MB is enough for 100K rows) 'max_memory_usage': '30Mi', + 'max_untracked_memory': '0' }) # "Received from" is mandatory, since the exception should be thrown on the remote node. with pytest.raises(QueryRuntimeException, match=r'DB::Exception: Received from.*Memory limit \(for query\) exceeded: .*while pushing to view default\.mv'): @@ -55,6 +56,7 @@ def test_distributed_directory_monitor_split_batch_on_failure_ON(started_cluster # max_memory_usage is the limit for the batch on the remote node # (local query should not be affected since 30MB is enough for 100K rows) 'max_memory_usage': '30Mi', + 'max_untracked_memory': '0' }) node1.query('system flush distributed dist') assert int(node1.query('select count() from dist_data')) == 100000 diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 947b71b5f96..bd3a88403f9 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2122,7 +2122,7 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster): # we have 0.25 (sleepEachRow) * 20 ( Rows ) = 5 sec window after "Polled batch of 20 messages" # while materialized view is working to inject zookeeper failure pm.drop_instance_zk_connections(instance) - instance.wait_for_log_line("Error.*(session has been expired|Connection loss).*while write prefix to view") + instance.wait_for_log_line("Error.*(session has been expired|Connection loss).*while writing suffix to view") pm.heal_all() instance.wait_for_log_line("Committed offset 22") diff --git a/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.reference b/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.reference index e4872ddeddd..d21c1e74770 100644 --- a/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.reference +++ b/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.reference @@ -1,4 +1,8 @@ -0 -0 -800000 -1600000 +Should throw 1 + KO(241) +Should throw 2 + KO(241) +Should pass 1 + OK +Should pass 2 + OK diff --git a/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.sh b/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.sh index 80af1b2c17f..dde6b8ccadb 100755 --- a/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.sh +++ b/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.sh @@ -7,6 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # just in case set -o pipefail +# shellcheck disable=SC2120 function execute() { ${CLICKHOUSE_CLIENT} -n "$@" @@ -64,8 +65,7 @@ echo "create table out_01278 as data_01278 Engine=Merge('$CLICKHOUSE_DATABASE', # function execute_insert() { - { - cat < /dev/null 2>&1 + local ret_code=$? + if [[ $ret_code -eq 0 ]]; + then + echo " OK" + else + echo " KO($ret_code)" + fi } # fails +echo "Should throw 1" execute_insert --testmode +echo "Should throw 2" execute_insert --testmode --min_insert_block_size_rows=1 --min_insert_block_size_rows_for_materialized_views=$((1<<20)) # passes +echo "Should pass 1" execute_insert --min_insert_block_size_rows=1 +echo "Should pass 2" execute_insert --min_insert_block_size_rows_for_materialized_views=1 diff --git a/tests/queries/0_stateless/01927_query_views_log_current_database.reference b/tests/queries/0_stateless/01927_query_views_log_current_database.reference new file mode 100644 index 00000000000..ff9eca2d97f --- /dev/null +++ b/tests/queries/0_stateless/01927_query_views_log_current_database.reference @@ -0,0 +1,70 @@ +Row 1: +────── +stage: Query log rows +read_rows: 100 +written_rows: 201 +databases: ['_table_function','default'] +tables: ['_table_function.numbers','default.table_a','default.table_b','default.table_b_live_view','default.table_c'] +views: ['default.matview_a_to_b','default.matview_b_to_c','default.table_b_live_view'] +sleep_calls: 200 +sleep_us: 298 +Row 1: +────── +stage: Depending views +view_name: default.matview_a_to_b +view_type: Materialized +status: QueryFinish +view_target: default.table_b +view_query: SELECT toFloat64(a) AS a, b + sleepEachRow(0.000001) AS count FROM default.table_a +read_rows: 100 +written_rows: 100 +sleep_calls: 100 +sleep_us: 99 + +Row 2: +────── +stage: Depending views +view_name: default.matview_b_to_c +view_type: Materialized +status: QueryFinish +view_target: default.table_c +view_query: SELECT sum(a + sleepEachRow(0.000002)) AS a FROM default.table_b +read_rows: 100 +written_rows: 1 +sleep_calls: 100 +sleep_us: 199 + +Row 3: +────── +stage: Depending views +view_name: default.table_b_live_view +view_type: Live +status: QueryFinish +view_target: default.table_b_live_view +view_query: SELECT sum(a + b) FROM default.table_b +read_rows: 100 +written_rows: 0 +sleep_calls: 0 +sleep_us: 0 +Row 1: +────── +stage: Query log rows 2 +read_rows: 50 +written_rows: 100 +databases: ['_table_function','default'] +tables: ['_table_function.numbers','default.table_d','default.table_e','default.table_f'] +views: ['default.matview_join_d_e'] +sleep_calls: 50 +sleep_us: 150 +Row 1: +────── +stage: Depending views 2 +view_name: default.matview_join_d_e +view_type: Materialized +status: QueryFinish +view_target: default.table_f +view_query: SELECT table_d.a AS a, table_e.count + sleepEachRow(0.000003) AS count FROM default.table_d LEFT JOIN default.table_e ON table_d.a = table_e.a +read_rows: 50 +written_rows: 50 +sleep_calls: 50 +sleep_us: 150 diff --git a/tests/queries/0_stateless/01927_query_views_log_current_database.sql b/tests/queries/0_stateless/01927_query_views_log_current_database.sql new file mode 100644 index 00000000000..40ab8c8e16a --- /dev/null +++ b/tests/queries/0_stateless/01927_query_views_log_current_database.sql @@ -0,0 +1,129 @@ +SET allow_experimental_live_view = 1; +SET log_queries=0; +SET log_query_threads=0; + +-- SETUP TABLES +CREATE TABLE table_a (a String, b Int64) ENGINE = MergeTree ORDER BY b; +CREATE TABLE table_b (a Float64, b Int64) ENGINE = MergeTree ORDER BY tuple(); +CREATE TABLE table_c (a Float64) ENGINE = MergeTree ORDER BY a; + +CREATE TABLE table_d (a Float64, count Int64) ENGINE MergeTree ORDER BY a; +CREATE TABLE table_e (a Float64, count Int64) ENGINE MergeTree ORDER BY a; +CREATE TABLE table_f (a Float64, count Int64) ENGINE MergeTree ORDER BY a; + +-- SETUP MATERIALIZED VIEWS +CREATE MATERIALIZED VIEW matview_a_to_b TO table_b AS SELECT toFloat64(a) AS a, b + sleepEachRow(0.000001) AS count FROM table_a; +CREATE MATERIALIZED VIEW matview_b_to_c TO table_c AS SELECT SUM(a + sleepEachRow(0.000002)) as a FROM table_b; +CREATE MATERIALIZED VIEW matview_join_d_e TO table_f AS SELECT table_d.a as a, table_e.count + sleepEachRow(0.000003) as count FROM table_d LEFT JOIN table_e ON table_d.a = table_e.a; + +-- SETUP LIVE VIEW +---- table_b_live_view (Int64) +DROP TABLE IF EXISTS table_b_live_view; +CREATE LIVE VIEW table_b_live_view AS SELECT sum(a + b) FROM table_b; + +-- ENABLE LOGS +SET log_query_views=1; +SET log_queries_min_type='QUERY_FINISH'; +SET log_queries=1; + +-- INSERT 1 +INSERT INTO table_a SELECT '111', * FROM numbers(100); + +-- INSERT 2 +INSERT INTO table_d SELECT 0.5, * FROM numbers(50); + +SYSTEM FLUSH LOGS; + + +-- CHECK LOGS OF INSERT 1 +-- Note that live views currently don't report written rows +SELECT + 'Query log rows' as stage, + read_rows, + written_rows, + arraySort(databases) as databases, + arraySort(tables) as tables, + arraySort(views) as views, + ProfileEvents['SleepFunctionCalls'] as sleep_calls, + ProfileEvents['SleepFunctionMicroseconds'] as sleep_us +FROM system.query_log +WHERE query like '-- INSERT 1%INSERT INTO table_a%' + AND current_database = currentDatabase() + AND event_date >= yesterday() +FORMAT Vertical; + +SELECT + 'Depending views' as stage, + view_name, + view_type, + status, + view_target, + view_query, + read_rows, + written_rows, + ProfileEvents['SleepFunctionCalls'] as sleep_calls, + ProfileEvents['SleepFunctionMicroseconds'] as sleep_us +FROM system.query_views_log +WHERE initial_query_id = + ( + SELECT initial_query_id + FROM system.query_log + WHERE query like '-- INSERT 1%INSERT INTO table_a%' + AND current_database = currentDatabase() + AND event_date >= yesterday() + LIMIT 1 + ) +ORDER BY view_name +FORMAT Vertical; + +-- CHECK LOGS OF INSERT 2 +SELECT + 'Query log rows 2' as stage, + read_rows, + written_rows, + arraySort(databases) as databases, + arraySort(tables) as tables, + arraySort(views) as views, + ProfileEvents['SleepFunctionCalls'] as sleep_calls, + ProfileEvents['SleepFunctionMicroseconds'] as sleep_us +FROM system.query_log +WHERE query like '-- INSERT 2%INSERT INTO table_d%' + AND current_database = currentDatabase() + AND event_date >= yesterday() +FORMAT Vertical; + +SELECT + 'Depending views 2' as stage, + view_name, + view_type, + status, + view_target, + view_query, + read_rows, + written_rows, + ProfileEvents['SleepFunctionCalls'] as sleep_calls, + ProfileEvents['SleepFunctionMicroseconds'] as sleep_us +FROM system.query_views_log +WHERE initial_query_id = + ( + SELECT initial_query_id + FROM system.query_log + WHERE query like '-- INSERT 2%INSERT INTO table_d%' + AND current_database = currentDatabase() + AND event_date >= yesterday() + LIMIT 1 + ) +ORDER BY view_name +FORMAT Vertical; + +-- TEARDOWN +DROP TABLE table_b_live_view; +DROP TABLE matview_a_to_b; +DROP TABLE matview_b_to_c; +DROP TABLE matview_join_d_e; +DROP TABLE table_f; +DROP TABLE table_e; +DROP TABLE table_d; +DROP TABLE table_c; +DROP TABLE table_b; +DROP TABLE table_a; diff --git a/tests/queries/0_stateless/01927_query_views_log_matview_exceptions.reference b/tests/queries/0_stateless/01927_query_views_log_matview_exceptions.reference new file mode 100644 index 00000000000..b02dfc5c3d4 --- /dev/null +++ b/tests/queries/0_stateless/01927_query_views_log_matview_exceptions.reference @@ -0,0 +1,14 @@ +--parallel_view_processing 0 +table_exception_a 1 +table_exception_b 0 +table_exception_c 1 +Excep****WhileProcessing +default.matview_exception_a_to_b Excep****WhileProcessing 6 default.table_exception_b SELECT toFloat64(a) AS a, b FROM default.table_exception_a +default.matview_exception_a_to_c QueryFinish 0 default.table_exception_c SELECT b AS a FROM default.table_exception_a +--parallel_view_processing 1 +table_exception_a 2 +table_exception_b 0 +table_exception_c 2 +Excep****WhileProcessing +default.matview_exception_a_to_b Excep****WhileProcessing 6 default.table_exception_b SELECT toFloat64(a) AS a, b FROM default.table_exception_a +default.matview_exception_a_to_c QueryFinish 0 default.table_exception_c SELECT b AS a FROM default.table_exception_a diff --git a/tests/queries/0_stateless/01927_query_views_log_matview_exceptions.sh b/tests/queries/0_stateless/01927_query_views_log_matview_exceptions.sh new file mode 100755 index 00000000000..47d5e733480 --- /dev/null +++ b/tests/queries/0_stateless/01927_query_views_log_matview_exceptions.sh @@ -0,0 +1,94 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +function cleanup() +{ + ${CLICKHOUSE_CLIENT} -n -q " + DROP TABLE IF EXISTS matview_exception_a_to_c; + DROP TABLE IF EXISTS matview_exception_a_to_b; + DROP TABLE IF EXISTS table_exception_c; + DROP TABLE IF EXISTS table_exception_b; + DROP TABLE IF EXISTS table_exception_a; + "; +} + +function setup() +{ + ${CLICKHOUSE_CLIENT} -n -q " + CREATE TABLE table_exception_a (a String, b Int64) ENGINE = MergeTree ORDER BY b; + CREATE TABLE table_exception_b (a Float64, b Int64) ENGINE = MergeTree ORDER BY tuple(); + CREATE TABLE table_exception_c (a Float64) ENGINE = MergeTree ORDER BY a; + + CREATE MATERIALIZED VIEW matview_exception_a_to_b TO table_exception_b AS SELECT toFloat64(a) AS a, b FROM table_exception_a; + CREATE MATERIALIZED VIEW matview_exception_a_to_c TO table_exception_c AS SELECT b AS a FROM table_exception_a; + "; +} + +function test() +{ + echo "$@"; + # We are going to insert an invalid number into table_exception_a. This will fail when inserting into + # table_exception_b via matview_exception_a_to_b, and will work ok when inserting into table_exception_c + ${CLICKHOUSE_CLIENT} "$@" --log_queries=1 --log_query_views=1 -q "INSERT INTO table_exception_a VALUES ('0.Aa234', 22)" > /dev/null 2>&1 || true; + ${CLICKHOUSE_CLIENT} -q " + SELECT * FROM + ( + SELECT 'table_exception_a' as name, count() AS c FROM table_exception_a UNION ALL + SELECT 'table_exception_b' as name, count() AS c FROM table_exception_b UNION ALL + SELECT 'table_exception_c' as name, count() AS c FROM table_exception_c + ) + ORDER BY name ASC + FORMAT TSV"; + + ${CLICKHOUSE_CLIENT} -q 'SYSTEM FLUSH LOGS'; + + ${CLICKHOUSE_CLIENT} -q " + SELECT + replaceOne(CAST(type AS String), 'ExceptionWhileProcessing', 'Excep****WhileProcessing') + exception_code + FROM system.query_log + WHERE + query LIKE 'INSERT INTO table_exception_a%' AND + type > 0 AND + event_date >= yesterday() AND + current_database = currentDatabase() + ORDER BY event_time_microseconds DESC + LIMIT 1 + FORMAT TSV"; + + ${CLICKHOUSE_CLIENT} -q " + SELECT + view_name, + replaceOne(CAST(status AS String), 'ExceptionWhileProcessing', 'Excep****WhileProcessing'), + exception_code, + view_target, + view_query + FROM system.query_views_log + WHERE initial_query_id = + ( + SELECT query_id + FROM system.query_log + WHERE + current_database = '${CLICKHOUSE_DATABASE}' AND + query LIKE 'INSERT INTO table_exception_a%' AND + type > 0 AND + event_date >= yesterday() AND + current_database = currentDatabase() + ORDER BY event_time_microseconds DESC + LIMIT 1 + ) + ORDER BY view_name ASC + "; +} + +trap cleanup EXIT; +cleanup; +setup; + +test --parallel_view_processing 0; +test --parallel_view_processing 1; + +exit 0 diff --git a/tests/queries/0_stateless/02008_tuple_to_name_value_pairs.reference b/tests/queries/0_stateless/02008_tuple_to_name_value_pairs.reference new file mode 100644 index 00000000000..90d15b16dcd --- /dev/null +++ b/tests/queries/0_stateless/02008_tuple_to_name_value_pairs.reference @@ -0,0 +1,6 @@ +[('1',1),('2',2),('3',3)] +[('a',(1,2)),('b',(3,4))] +[('a',(5,6)),('b',(7,8))] +CPU 3.3 +Memory 5.5 +Disk 6.6 diff --git a/tests/queries/0_stateless/02008_tuple_to_name_value_pairs.sql b/tests/queries/0_stateless/02008_tuple_to_name_value_pairs.sql new file mode 100644 index 00000000000..9204975b579 --- /dev/null +++ b/tests/queries/0_stateless/02008_tuple_to_name_value_pairs.sql @@ -0,0 +1,25 @@ +SELECT tupleToNameValuePairs(tuple(1, 2, 3)); + +DROP TABLE IF EXISTS test02008; +CREATE TABLE test02008 ( + col Tuple( + a Tuple(key1 int, key2 int), + b Tuple(key1 int, key3 int) + ) +) ENGINE=Memory(); +INSERT INTO test02008 VALUES (tuple(tuple(1, 2), tuple(3, 4))); +INSERT INTO test02008 VALUES (tuple(tuple(5, 6), tuple(7, 8))); +SELECT tupleToNameValuePairs(col) FROM test02008 ORDER BY col; + +DROP TABLE IF EXISTS test02008; +CREATE TABLE test02008 ( + col Tuple(CPU double, Memory double, Disk double) +) ENGINE=Memory(); +INSERT INTO test02008 VALUES (tuple(3.3, 5.5, 6.6)); +SELECT untuple(arrayJoin(tupleToNameValuePairs(col))) from test02008; + +DROP TABLE IF EXISTS test02008; +SELECT tupleToNameValuePairs(tuple(1, 1.3)); -- { serverError 43; } +SELECT tupleToNameValuePairs(tuple(1, [1,2])); -- { serverError 43; } +SELECT tupleToNameValuePairs(tuple(1, 'a')); -- { serverError 43; } +SELECT tupleToNameValuePairs(33); -- { serverError 43; } diff --git a/tests/testflows/rbac/configs/clickhouse/config.xml b/tests/testflows/rbac/configs/clickhouse/config.xml index 265bcd1882a..47d3ad3840a 100644 --- a/tests/testflows/rbac/configs/clickhouse/config.xml +++ b/tests/testflows/rbac/configs/clickhouse/config.xml @@ -328,6 +328,14 @@ 7500 + + + system + query_views_log
+ toYYYYMM(event_date) + 7500 +
+