Merge branch 'ClickHouse:master' into master

This commit is contained in:
Filatenkov Artur 2021-08-12 19:07:11 +03:00 committed by GitHub
commit d7511ea394
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 1359 additions and 202 deletions

View File

@ -39,7 +39,10 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2,
...
PROJECTION projection_name_1 (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY]),
PROJECTION projection_name_2 (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY])
) ENGINE = MergeTree()
ORDER BY expr
[PARTITION BY expr]
@ -385,6 +388,24 @@ Functions with a constant argument that is less than ngram size cant be used
- `s != 1`
- `NOT startsWith(s, 'test')`
### Projections {#projections}
Projections are like materialized views but defined in part-level. It provides consistency guarantees along with automatic usage in queries.
#### Query {#projection-query}
A projection query is what defines a projection. It has the following grammar:
`SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY]`
It implicitly selects data from the parent table.
#### Storage {#projection-storage}
Projections are stored inside the part directory. It's similar to an index but contains a subdirectory that stores an anonymous MergeTree table's part. The table is induced by the definition query of the projection. If there is a GROUP BY clause, the underlying storage engine becomes AggregatedMergeTree, and all aggregate functions are converted to AggregateFunction. If there is an ORDER BY clause, the MergeTree table will use it as its primary key expression. During the merge process, the projection part will be merged via its storage's merge routine. The checksum of the parent table's part will combine the projection's part. Other maintenance jobs are similar to skip indices.
#### Query Analysis {#projection-query-analysis}
1. Check if the projection can be used to answer the given query, that is, it generates the same answer as querying the base table.
2. Select the best feasible match, which contains the least granules to read.
3. The query pipeline which uses projections will be different from the one that uses the original parts. If the projection is absent in some parts, we can add the pipeline to "project" it on the fly.
## Concurrent Data Access {#concurrent-data-access}
For concurrent table access, we use multi-versioning. In other words, when a table is simultaneously read and updated, data is read from a set of parts that is current at the time of the query. There are no lengthy locks. Inserts do not get in the way of read operations.

View File

@ -892,6 +892,33 @@ If the table does not exist, ClickHouse will create it. If the structure of the
</query_thread_log>
```
## query_views_log {#server_configuration_parameters-query_views_log}
Setting for logging views dependant of queries received with the [log_query_views=1](../../operations/settings/settings.md#settings-log-query-views) setting.
Queries are logged in the [system.query_views_log](../../operations/system-tables/query_thread_log.md#system_tables-query_views_log) table, not in a separate file. You can change the name of the table in the `table` parameter (see below).
Use the following parameters to configure logging:
- `database` Name of the database.
- `table` Name of the system table the queries will be logged in.
- `partition_by` — [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` defined.
- `flush_interval_milliseconds` Interval for flushing data from the buffer in memory to the table.
If the table does not exist, ClickHouse will create it. If the structure of the query views log changed when the ClickHouse server was updated, the table with the old structure is renamed, and a new table is created automatically.
**Example**
``` xml
<query_views_log>
<database>system</database>
<table>query_views_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_views_log>
```
## text_log {#server_configuration_parameters-text_log}
Settings for the [text_log](../../operations/system-tables/text_log.md#system_tables-text_log) system table for logging text messages.

View File

@ -890,7 +890,7 @@ log_queries_min_type='EXCEPTION_WHILE_PROCESSING'
Setting up query threads logging.
Queries threads runned by ClickHouse with this setup are logged according to the rules in the [query_thread_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log) server configuration parameter.
Queries threads run by ClickHouse with this setup are logged according to the rules in the [query_thread_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log) server configuration parameter.
Example:
@ -898,6 +898,19 @@ Example:
log_query_threads=1
```
## log_query_views {#settings-log-query-views}
Setting up query views logging.
When a query run by ClickHouse with this setup on has associated views (materialized or live views), they are logged in the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) server configuration parameter.
Example:
``` text
log_query_views=1
```
## log_comment {#settings-log-comment}
Specifies the value for the `log_comment` field of the [system.query_log](../system-tables/query_log.md) table and comment text for the server log.

View File

@ -50,6 +50,7 @@ Columns:
- `query_kind` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — Type of the query.
- `databases` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the databases present in the query.
- `tables` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the tables present in the query.
- `views` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the (materialized or live) views present in the query.
- `columns` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the columns present in the query.
- `projections` ([String](../../sql-reference/data-types/string.md)) — Names of the projections used during the query execution.
- `exception_code` ([Int32](../../sql-reference/data-types/int-uint.md)) — Code of an exception.
@ -180,5 +181,6 @@ used_table_functions: []
**See Also**
- [system.query_thread_log](../../operations/system-tables/query_thread_log.md#system_tables-query_thread_log) — This table contains information about each query execution thread.
- [system.query_views_log](../../operations/system-tables/query_views_log.md#system_tables-query_views_log) — This table contains information about each view executed during a query.
[Original article](https://clickhouse.tech/docs/en/operations/system-tables/query_log) <!--hide-->

View File

@ -112,5 +112,6 @@ ProfileEvents: {'Query':1,'SelectQuery':1,'ReadCompressedBytes':36,'Compr
**See Also**
- [system.query_log](../../operations/system-tables/query_log.md#system_tables-query_log) — Description of the `query_log` system table which contains common information about queries execution.
- [system.query_views_log](../../operations/system-tables/query_views_log.md#system_tables-query_views_log) — This table contains information about each view executed during a query.
[Original article](https://clickhouse.tech/docs/en/operations/system-tables/query_thread_log) <!--hide-->

View File

@ -0,0 +1,81 @@
# system.query_views_log {#system_tables-query_views_log}
Contains information about the dependent views executed when running a query, for example, the view type or the execution time.
To start logging:
1. Configure parameters in the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) section.
2. Set [log_query_views](../../operations/settings/settings.md#settings-log-query-views) to 1.
The flushing period of data is set in `flush_interval_milliseconds` parameter of the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) server settings section. To force flushing, use the [SYSTEM FLUSH LOGS](../../sql-reference/statements/system.md#query_language-system-flush_logs) query.
ClickHouse does not delete data from the table automatically. See [Introduction](../../operations/system-tables/index.md#system-tables-introduction) for more details.
Columns:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — The date when the last event of the view happened.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — The date and time when the view finished execution.
- `event_time_microseconds` ([DateTime](../../sql-reference/data-types/datetime.md)) — The date and time when the view finished execution with microseconds precision.
- `view_duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Duration of view execution (sum of its stages) in milliseconds.
- `initial_query_id` ([String](../../sql-reference/data-types/string.md)) — ID of the initial query (for distributed query execution).
- `view_name` ([String](../../sql-reference/data-types/string.md)) — Name of the view.
- `view_uuid` ([UUID](../../sql-reference/data-types/uuid.md)) — UUID of the view.
- `view_type` ([Enum8](../../sql-reference/data-types/enum.md)) — Type of the view. Values:
- `'Default' = 1` — [Default views](../../sql-reference/statements/create/view.md#normal). Should not appear in this log.
- `'Materialized' = 2` — [Materialized views](../../sql-reference/statements/create/view.md#materialized).
- `'Live' = 3` — [Live views](../../sql-reference/statements/create/view.md#live-view).
- `view_query` ([String](../../sql-reference/data-types/string.md)) — The query executed by the view.
- `view_target` ([String](../../sql-reference/data-types/string.md)) — The name of the view target table.
- `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of read rows.
- `read_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of read bytes.
- `written_rows` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of written rows.
- `written_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of written bytes.
- `peak_memory_usage` ([Int64](../../sql-reference/data-types/int-uint.md)) — The maximum difference between the amount of allocated and freed memory in context of this view.
- `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/array.md)) — ProfileEvents that measure different metrics. The description of them could be found in the table [system.events](../../operations/system-tables/events.md#system_tables-events).
- `status` ([Enum8](../../sql-reference/data-types/enum.md)) — Status of the view. Values:
- `'QueryStart' = 1` — Successful start the view execution. Should not appear.
- `'QueryFinish' = 2` — Successful end of the view execution.
- `'ExceptionBeforeStart' = 3` — Exception before the start of the view execution.
- `'ExceptionWhileProcessing' = 4` — Exception during the view execution.
- `exception_code` ([Int32](../../sql-reference/data-types/int-uint.md)) — Code of an exception.
- `exception` ([String](../../sql-reference/data-types/string.md)) — Exception message.
- `stack_trace` ([String](../../sql-reference/data-types/string.md)) — [Stack trace](https://en.wikipedia.org/wiki/Stack_trace). An empty string, if the query was completed successfully.
**Example**
``` sql
SELECT * FROM system.query_views_log LIMIT 1 \G
```
``` text
Row 1:
──────
event_date: 2021-06-22
event_time: 2021-06-22 13:23:07
event_time_microseconds: 2021-06-22 13:23:07.738221
view_duration_ms: 0
initial_query_id: c3a1ac02-9cad-479b-af54-9e9c0a7afd70
view_name: default.matview_inner
view_uuid: 00000000-0000-0000-0000-000000000000
view_type: Materialized
view_query: SELECT * FROM default.table_b
view_target: default.`.inner.matview_inner`
read_rows: 4
read_bytes: 64
written_rows: 2
written_bytes: 32
peak_memory_usage: 4196188
ProfileEvents: {'FileOpen':2,'WriteBufferFromFileDescriptorWrite':2,'WriteBufferFromFileDescriptorWriteBytes':187,'IOBufferAllocs':3,'IOBufferAllocBytes':3145773,'FunctionExecute':3,'DiskWriteElapsedMicroseconds':13,'InsertedRows':2,'InsertedBytes':16,'SelectedRows':4,'SelectedBytes':48,'ContextLock':16,'RWLockAcquiredReadLocks':1,'RealTimeMicroseconds':698,'SoftPageFaults':4,'OSReadChars':463}
status: QueryFinish
exception_code: 0
exception:
stack_trace:
```
**See Also**
- [system.query_log](../../operations/system-tables/query_log.md#system_tables-query_log) — Description of the `query_log` system table which contains common information about queries execution.
- [system.query_thread_log](../../operations/system-tables/query_thread_log.md#system_tables-query_thread_log) — This table contains information about each query execution thread.
[Original article](https://clickhouse.tech/docs/en/operations/system_tables/query_thread_log) <!--hide-->

View File

@ -320,7 +320,7 @@
The amount of data in mapped files can be monitored
in system.metrics, system.metric_log by the MMappedFiles, MMappedFileBytes metrics
and in system.asynchronous_metrics, system.asynchronous_metrics_log by the MMapCacheCells metric,
and also in system.events, system.processes, system.query_log, system.query_thread_log by the
and also in system.events, system.processes, system.query_log, system.query_thread_log, system.query_views_log by the
CreatedReadBufferMMap, CreatedReadBufferMMapFailed, MMappedFileCacheHits, MMappedFileCacheMisses events.
Note that the amount of data in mapped files does not consume memory directly and is not accounted
in query or server memory usage - because this memory can be discarded similar to OS page cache.
@ -878,6 +878,15 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_thread_log>
<!-- Query views log. Has information about all dependent views associated with a query.
Used only for queries with setting log_query_views = 1. -->
<query_views_log>
<database>system</database>
<table>query_views_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_views_log>
<!-- Uncomment if use part log.
Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads).
<part_log>
@ -955,6 +964,7 @@
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
</crash_log>
<!-- Parameters for embedded dictionaries, used in Yandex.Metrica.
See https://clickhouse.yandex/docs/en/dicts/internal_dicts/
-->

View File

@ -271,7 +271,7 @@ mark_cache_size: 5368709120
# The amount of data in mapped files can be monitored
# in system.metrics, system.metric_log by the MMappedFiles, MMappedFileBytes metrics
# and in system.asynchronous_metrics, system.asynchronous_metrics_log by the MMapCacheCells metric,
# and also in system.events, system.processes, system.query_log, system.query_thread_log by the
# and also in system.events, system.processes, system.query_log, system.query_thread_log, system.query_views_log by the
# CreatedReadBufferMMap, CreatedReadBufferMMapFailed, MMappedFileCacheHits, MMappedFileCacheMisses events.
# Note that the amount of data in mapped files does not consume memory directly and is not accounted
# in query or server memory usage - because this memory can be discarded similar to OS page cache.
@ -731,6 +731,14 @@ query_thread_log:
partition_by: toYYYYMM(event_date)
flush_interval_milliseconds: 7500
# Query views log. Has information about all dependent views associated with a query.
# Used only for queries with setting log_query_views = 1.
query_views_log:
database: system
table: query_views_log
partition_by: toYYYYMM(event_date)
flush_interval_milliseconds: 7500
# Uncomment if use part log.
# Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads).
# part_log:

View File

@ -94,6 +94,22 @@ std::string getExceptionStackTraceString(const std::exception & e)
#endif
}
std::string getExceptionStackTraceString(std::exception_ptr e)
{
try
{
std::rethrow_exception(e);
}
catch (const std::exception & exception)
{
return getExceptionStackTraceString(exception);
}
catch (...)
{
return {};
}
}
std::string Exception::getStackTraceString() const
{
@ -380,6 +396,30 @@ int getCurrentExceptionCode()
}
}
int getExceptionErrorCode(std::exception_ptr e)
{
try
{
std::rethrow_exception(e);
}
catch (const Exception & exception)
{
return exception.code();
}
catch (const Poco::Exception &)
{
return ErrorCodes::POCO_EXCEPTION;
}
catch (const std::exception &)
{
return ErrorCodes::STD_EXCEPTION;
}
catch (...)
{
return ErrorCodes::UNKNOWN_EXCEPTION;
}
}
void rethrowFirstException(const Exceptions & exceptions)
{

View File

@ -82,6 +82,7 @@ private:
std::string getExceptionStackTraceString(const std::exception & e);
std::string getExceptionStackTraceString(std::exception_ptr e);
/// Contains an additional member `saved_errno`. See the throwFromErrno function.
@ -167,6 +168,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
/// Returns error code from ErrorCodes
int getCurrentExceptionCode();
int getExceptionErrorCode(std::exception_ptr e);
/// An execution status of any piece of code, contains return code and optional error

View File

@ -149,7 +149,11 @@ ThreadStatus::~ThreadStatus()
if (deleter)
deleter();
current_thread = nullptr;
/// Only change current_thread if it's currently being used by this ThreadStatus
/// For example, PushingToViewsBlockOutputStream creates and deletes ThreadStatus instances while running in the main query thread
if (current_thread == this)
current_thread = nullptr;
}
void ThreadStatus::updatePerformanceCounters()

View File

@ -37,6 +37,8 @@ struct RUsageCounters;
struct PerfEventsCounters;
class TaskStatsInfoGetter;
class InternalTextLogsQueue;
struct ViewRuntimeData;
class QueryViewsLog;
using InternalTextLogsQueuePtr = std::shared_ptr<InternalTextLogsQueue>;
using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>;
@ -143,6 +145,7 @@ protected:
Poco::Logger * log = nullptr;
friend class CurrentThread;
friend class PushingToViewsBlockOutputStream;
/// Use ptr not to add extra dependencies in the header
std::unique_ptr<RUsageCounters> last_rusage;
@ -151,6 +154,9 @@ protected:
/// Is used to send logs from logs_queue to client in case of fatal errors.
std::function<void()> fatal_error_callback;
/// It is used to avoid enabling the query profiler when you have multiple ThreadStatus in the same thread
bool query_profiled_enabled = true;
public:
ThreadStatus();
~ThreadStatus();
@ -210,9 +216,13 @@ public:
/// Update ProfileEvents and dumps info to system.query_thread_log
void finalizePerformanceCounters();
/// Set the counters last usage to now
void resetPerformanceCountersLastUsage();
/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false);
protected:
void applyQuerySettings();
@ -224,6 +234,8 @@ protected:
void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database, std::chrono::time_point<std::chrono::system_clock> now);
void logToQueryViewsLog(const ViewRuntimeData & vinfo);
void assertState(const std::initializer_list<int> & permitted_states, const char * description = nullptr) const;

View File

@ -173,7 +173,7 @@ class IColumn;
M(Bool, log_queries, 1, "Log requests and write the log to the system table.", 0) \
M(Bool, log_formatted_queries, 0, "Log formatted queries and write the log to the system table.", 0) \
M(LogQueriesType, log_queries_min_type, QueryLogElementType::QUERY_START, "Minimal type in query_log to log, possible values (from low to high): QUERY_START, QUERY_FINISH, EXCEPTION_BEFORE_START, EXCEPTION_WHILE_PROCESSING.", 0) \
M(Milliseconds, log_queries_min_query_duration_ms, 0, "Minimal time for the query to run, to get to the query_log/query_thread_log.", 0) \
M(Milliseconds, log_queries_min_query_duration_ms, 0, "Minimal time for the query to run, to get to the query_log/query_thread_log/query_views_log.", 0) \
M(UInt64, log_queries_cut_to_length, 100000, "If query length is greater than specified threshold (in bytes), then cut query when writing to query log. Also limit length of printed query in ordinary text log.", 0) \
\
M(DistributedProductMode, distributed_product_mode, DistributedProductMode::DENY, "How are distributed subqueries performed inside IN or JOIN sections?", IMPORTANT) \
@ -352,9 +352,10 @@ class IColumn;
M(UInt64, max_network_bandwidth_for_user, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means unlimited.", 0)\
M(UInt64, max_network_bandwidth_for_all_users, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means unlimited.", 0) \
\
M(Bool, log_profile_events, true, "Log query performance statistics into the query_log and query_thread_log.", 0) \
M(Bool, log_profile_events, true, "Log query performance statistics into the query_log, query_thread_log and query_views_log.", 0) \
M(Bool, log_query_settings, true, "Log query settings into the query_log.", 0) \
M(Bool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.", 0) \
M(Bool, log_query_views, true, "Log query dependent views into system.query_views_log table. This setting have effect only when 'log_queries' is true.", 0) \
M(String, log_comment, "", "Log comment into system.query_log table and server log. It can be set to arbitrary string no longer than max_query_size.", 0) \
M(LogsLevel, send_logs_level, LogsLevel::fatal, "Send server text logs with specified minimum level to client. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(Bool, enable_optimize_predicate_expression, 1, "If it is set to true, optimize predicates to subqueries.", 0) \

View File

@ -1,24 +1,31 @@
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <Common/checkStackSize.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Storages/StorageValues.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageValues.h>
#include <Common/CurrentThread.h>
#include <Common/MemoryTracker.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/ThreadStatus.h>
#include <Common/checkStackSize.h>
#include <Common/setThreadName.h>
#include <common/logger_useful.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <common/scope_guard.h>
#include <atomic>
#include <chrono>
namespace DB
{
@ -79,9 +86,12 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
ASTPtr query;
BlockOutputStreamPtr out;
QueryViewsLogElement::ViewType type = QueryViewsLogElement::ViewType::DEFAULT;
String target_name = database_table.getFullTableName();
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{
type = QueryViewsLogElement::ViewType::MATERIALIZED;
addTableLock(
materialized_view->lockForShare(getContext()->getInitialQueryId(), getContext()->getSettingsRef().lock_acquire_timeout));
@ -89,6 +99,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
auto inner_table_id = inner_table->getStorageID();
auto inner_metadata_snapshot = inner_table->getInMemoryMetadataPtr();
query = dependent_metadata_snapshot->getSelectQuery().inner_query;
target_name = inner_table_id.getFullTableName();
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->table_id = inner_table_id;
@ -114,14 +125,57 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
BlockIO io = interpreter.execute();
out = io.out;
}
else if (dynamic_cast<const StorageLiveView *>(dependent_table.get()))
else if (const auto * live_view = dynamic_cast<const StorageLiveView *>(dependent_table.get()))
{
type = QueryViewsLogElement::ViewType::LIVE;
query = live_view->getInnerQuery(); // Used only to log in system.query_views_log
out = std::make_shared<PushingToViewsBlockOutputStream>(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true);
}
else
out = std::make_shared<PushingToViewsBlockOutputStream>(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr());
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, 0 /* elapsed_ms */});
/// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or
/// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics
std::unique_ptr<ThreadStatus> thread_status = nullptr;
ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup()
? current_thread->getThreadGroup()
: MainThreadStatus::getInstance().thread_group;
if (running_group)
{
/// We are creating a ThreadStatus per view to store its metrics individually
/// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls
/// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work,
/// and switch back to the original thread_status.
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
thread_status = std::make_unique<ThreadStatus>();
/// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one
/// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means
/// N times more interruptions
thread_status->query_profiled_enabled = false;
thread_status->setupState(running_group);
}
QueryViewsLogElement::ViewRuntimeStats runtime_stats{
target_name,
type,
std::move(thread_status),
0,
std::chrono::system_clock::now(),
QueryViewsLogElement::ViewStatus::EXCEPTION_BEFORE_START};
views.emplace_back(ViewRuntimeData{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)});
/// Add the view to the query access info so it can appear in system.query_log
if (!no_destination)
{
getContext()->getQueryContext()->addQueryAccessInfo(
backQuoteIfNeed(database_table.getDatabaseName()), target_name, {}, "", database_table.getFullTableName());
}
}
/// Do not push to destination table if the flag is set
@ -136,7 +190,6 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
}
}
Block PushingToViewsBlockOutputStream::getHeader() const
{
/// If we don't write directly to the destination
@ -147,6 +200,39 @@ Block PushingToViewsBlockOutputStream::getHeader() const
return metadata_snapshot->getSampleBlockWithVirtuals(storage->getVirtuals());
}
/// Auxiliary function to do the setup and teardown to run a view individually and collect its metrics inside the view ThreadStatus
void inline runViewStage(ViewRuntimeData & view, const std::string & action, std::function<void()> stage)
{
Stopwatch watch;
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
if (view.runtime_stats.thread_status)
{
/// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread
view.runtime_stats.thread_status->resetPerformanceCountersLastUsage();
current_thread = view.runtime_stats.thread_status.get();
}
try
{
stage();
}
catch (Exception & ex)
{
ex.addMessage(action + " " + view.table_id.getNameForLogs());
view.setException(std::current_exception());
}
catch (...)
{
view.setException(std::current_exception());
}
if (view.runtime_stats.thread_status)
view.runtime_stats.thread_status->updatePerformanceCounters();
view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds();
}
void PushingToViewsBlockOutputStream::write(const Block & block)
{
@ -169,39 +255,34 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
output->write(block);
}
/// Don't process materialized views if this block is duplicate
if (!getContext()->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views && replicated_output && replicated_output->lastBlockIsDuplicate())
if (views.empty())
return;
// Insert data into materialized views only after successful insert into main table
/// Don't process materialized views if this block is duplicate
const Settings & settings = getContext()->getSettingsRef();
if (settings.parallel_view_processing && views.size() > 1)
if (!settings.deduplicate_blocks_in_dependent_materialized_views && replicated_output && replicated_output->lastBlockIsDuplicate())
return;
size_t max_threads = 1;
if (settings.parallel_view_processing)
max_threads = settings.max_threads ? std::min(static_cast<size_t>(settings.max_threads), views.size()) : views.size();
if (max_threads > 1)
{
// Push to views concurrently if enabled and more than one view is attached
ThreadPool pool(std::min(size_t(settings.max_threads), views.size()));
ThreadPool pool(max_threads);
for (auto & view : views)
{
auto thread_group = CurrentThread::getGroup();
pool.scheduleOrThrowOnError([=, &view, this]
{
pool.scheduleOrThrowOnError([&] {
setThreadName("PushingToViews");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
process(block, view);
runViewStage(view, "while pushing to view", [&]() { process(block, view); });
});
}
// Wait for concurrent view processing
pool.wait();
}
else
{
// Process sequentially
for (auto & view : views)
{
process(block, view);
if (view.exception)
std::rethrow_exception(view.exception);
runViewStage(view, "while pushing to view", [&]() { process(block, view); });
}
}
}
@ -213,14 +294,11 @@ void PushingToViewsBlockOutputStream::writePrefix()
for (auto & view : views)
{
try
runViewStage(view, "while writing prefix to view", [&] { view.out->writePrefix(); });
if (view.exception)
{
view.out->writePrefix();
}
catch (Exception & ex)
{
ex.addMessage("while write prefix to view " + view.table_id.getNameForLogs());
throw;
logQueryViews();
std::rethrow_exception(view.exception);
}
}
}
@ -230,95 +308,92 @@ void PushingToViewsBlockOutputStream::writeSuffix()
if (output)
output->writeSuffix();
std::exception_ptr first_exception;
if (views.empty())
return;
const Settings & settings = getContext()->getSettingsRef();
bool parallel_processing = false;
auto process_suffix = [](ViewRuntimeData & view)
{
view.out->writeSuffix();
view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::QUERY_FINISH);
};
static std::string stage_step = "while writing suffix to view";
/// Run writeSuffix() for views in separate thread pool.
/// In could have been done in PushingToViewsBlockOutputStream::process, however
/// it is not good if insert into main table fail but into view succeed.
if (settings.parallel_view_processing && views.size() > 1)
const Settings & settings = getContext()->getSettingsRef();
size_t max_threads = 1;
if (settings.parallel_view_processing)
max_threads = settings.max_threads ? std::min(static_cast<size_t>(settings.max_threads), views.size()) : views.size();
bool exception_happened = false;
if (max_threads > 1)
{
parallel_processing = true;
// Push to views concurrently if enabled and more than one view is attached
ThreadPool pool(std::min(size_t(settings.max_threads), views.size()));
auto thread_group = CurrentThread::getGroup();
ThreadPool pool(max_threads);
std::atomic_uint8_t exception_count = 0;
for (auto & view : views)
{
if (view.exception)
continue;
pool.scheduleOrThrowOnError([thread_group, &view, this]
{
exception_happened = true;
continue;
}
pool.scheduleOrThrowOnError([&] {
setThreadName("PushingToViews");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
Stopwatch watch;
try
runViewStage(view, stage_step, [&] { process_suffix(view); });
if (view.exception)
{
view.out->writeSuffix();
exception_count.fetch_add(1, std::memory_order_relaxed);
}
catch (...)
else
{
view.exception = std::current_exception();
LOG_TRACE(
log,
"Pushing (parallel {}) from {} to {} took {} ms.",
max_threads,
storage->getStorageID().getNameForLogs(),
view.table_id.getNameForLogs(),
view.runtime_stats.elapsed_ms);
}
view.elapsed_ms += watch.elapsedMilliseconds();
LOG_TRACE(log, "Pushing from {} to {} took {} ms.",
storage->getStorageID().getNameForLogs(),
view.table_id.getNameForLogs(),
view.elapsed_ms);
});
}
// Wait for concurrent view processing
pool.wait();
exception_happened |= exception_count.load(std::memory_order_relaxed) != 0;
}
for (auto & view : views)
else
{
if (view.exception)
for (auto & view : views)
{
if (!first_exception)
first_exception = view.exception;
continue;
if (view.exception)
{
exception_happened = true;
continue;
}
runViewStage(view, stage_step, [&] { process_suffix(view); });
if (view.exception)
{
exception_happened = true;
}
else
{
LOG_TRACE(
log,
"Pushing (sequentially) from {} to {} took {} ms.",
storage->getStorageID().getNameForLogs(),
view.table_id.getNameForLogs(),
view.runtime_stats.elapsed_ms);
}
}
if (parallel_processing)
continue;
Stopwatch watch;
try
{
view.out->writeSuffix();
}
catch (Exception & ex)
{
ex.addMessage("while write prefix to view " + view.table_id.getNameForLogs());
throw;
}
view.elapsed_ms += watch.elapsedMilliseconds();
LOG_TRACE(log, "Pushing from {} to {} took {} ms.",
storage->getStorageID().getNameForLogs(),
view.table_id.getNameForLogs(),
view.elapsed_ms);
}
if (exception_happened)
checkExceptionsInViews();
if (first_exception)
std::rethrow_exception(first_exception);
UInt64 milliseconds = main_watch.elapsedMilliseconds();
if (views.size() > 1)
{
LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.",
storage->getStorageID().getNameForLogs(), views.size(),
milliseconds);
UInt64 milliseconds = main_watch.elapsedMilliseconds();
LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.", storage->getStorageID().getNameForLogs(), views.size(), milliseconds);
}
logQueryViews();
}
void PushingToViewsBlockOutputStream::flush()
@ -330,70 +405,103 @@ void PushingToViewsBlockOutputStream::flush()
view.out->flush();
}
void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & view)
void PushingToViewsBlockOutputStream::process(const Block & block, ViewRuntimeData & view)
{
Stopwatch watch;
BlockInputStreamPtr in;
try
/// We need keep InterpreterSelectQuery, until the processing will be finished, since:
///
/// - We copy Context inside InterpreterSelectQuery to support
/// modification of context (Settings) for subqueries
/// - InterpreterSelectQuery lives shorter than query pipeline.
/// It's used just to build the query pipeline and no longer needed
/// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery,
/// **can** take a reference to Context from InterpreterSelectQuery
/// (the problem raises only when function uses context from the
/// execute*() method, like FunctionDictGet do)
/// - These objects live inside query pipeline (DataStreams) and the reference become dangling.
std::optional<InterpreterSelectQuery> select;
if (view.runtime_stats.type == QueryViewsLogElement::ViewType::MATERIALIZED)
{
BlockInputStreamPtr in;
/// We create a table with the same name as original table and the same alias columns,
/// but it will contain single block (that is INSERT-ed into main table).
/// InterpreterSelectQuery will do processing of alias columns.
/// We need keep InterpreterSelectQuery, until the processing will be finished, since:
///
/// - We copy Context inside InterpreterSelectQuery to support
/// modification of context (Settings) for subqueries
/// - InterpreterSelectQuery lives shorter than query pipeline.
/// It's used just to build the query pipeline and no longer needed
/// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery,
/// **can** take a reference to Context from InterpreterSelectQuery
/// (the problem raises only when function uses context from the
/// execute*() method, like FunctionDictGet do)
/// - These objects live inside query pipeline (DataStreams) and the reference become dangling.
std::optional<InterpreterSelectQuery> select;
auto local_context = Context::createCopy(select_context);
local_context->addViewSource(
StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals()));
select.emplace(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select->execute().getInputStream());
if (view.query)
{
/// We create a table with the same name as original table and the same alias columns,
/// but it will contain single block (that is INSERT-ed into main table).
/// InterpreterSelectQuery will do processing of alias columns.
auto local_context = Context::createCopy(select_context);
local_context->addViewSource(
StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals()));
select.emplace(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select->execute().getInputStream());
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
in = std::make_shared<SquashingBlockInputStream>(
in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes);
in = std::make_shared<ConvertingBlockInputStream>(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name);
}
else
in = std::make_shared<OneBlockInputStream>(block);
in->readPrefix();
while (Block result_block = in->read())
{
Nested::validateArraySizes(result_block);
view.out->write(result_block);
}
in->readSuffix();
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
in = std::make_shared<SquashingBlockInputStream>(
in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes);
in = std::make_shared<ConvertingBlockInputStream>(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name);
}
catch (Exception & ex)
else
in = std::make_shared<OneBlockInputStream>(block);
in->setProgressCallback([this](const Progress & progress)
{
ex.addMessage("while pushing to view " + view.table_id.getNameForLogs());
view.exception = std::current_exception();
}
catch (...)
CurrentThread::updateProgressIn(progress);
this->onProgress(progress);
});
in->readPrefix();
while (Block result_block = in->read())
{
view.exception = std::current_exception();
Nested::validateArraySizes(result_block);
view.out->write(result_block);
}
view.elapsed_ms += watch.elapsedMilliseconds();
in->readSuffix();
}
void PushingToViewsBlockOutputStream::checkExceptionsInViews()
{
for (auto & view : views)
{
if (view.exception)
{
logQueryViews();
std::rethrow_exception(view.exception);
}
}
}
void PushingToViewsBlockOutputStream::logQueryViews()
{
const auto & settings = getContext()->getSettingsRef();
const UInt64 min_query_duration = settings.log_queries_min_query_duration_ms.totalMilliseconds();
const QueryViewsLogElement::ViewStatus min_status = settings.log_queries_min_type;
if (views.empty() || !settings.log_queries || !settings.log_query_views)
return;
for (auto & view : views)
{
if ((min_query_duration && view.runtime_stats.elapsed_ms <= min_query_duration) || (view.runtime_stats.event_status < min_status))
continue;
try
{
if (view.runtime_stats.thread_status)
view.runtime_stats.thread_status->logToQueryViewsLog(view);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
void PushingToViewsBlockOutputStream::onProgress(const Progress & progress)
{
if (getContext()->getProgressCallback())
getContext()->getProgressCallback()(progress);
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/QueryViewsLog.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage.h>
#include <Common/Stopwatch.h>
@ -8,13 +9,28 @@
namespace Poco
{
class Logger;
};
}
namespace DB
{
class ReplicatedMergeTreeSink;
struct ViewRuntimeData
{
const ASTPtr query;
StorageID table_id;
BlockOutputStreamPtr out;
std::exception_ptr exception;
QueryViewsLogElement::ViewRuntimeStats runtime_stats;
void setException(std::exception_ptr e)
{
exception = e;
runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::EXCEPTION_WHILE_PROCESSING);
}
};
/** Writes data to the specified table and to all dependent materialized views.
*/
class PushingToViewsBlockOutputStream : public IBlockOutputStream, WithContext
@ -33,6 +49,7 @@ public:
void flush() override;
void writePrefix() override;
void writeSuffix() override;
void onProgress(const Progress & progress) override;
private:
StoragePtr storage;
@ -44,20 +61,13 @@ private:
ASTPtr query_ptr;
Stopwatch main_watch;
struct ViewInfo
{
ASTPtr query;
StorageID table_id;
BlockOutputStreamPtr out;
std::exception_ptr exception;
UInt64 elapsed_ms = 0;
};
std::vector<ViewInfo> views;
std::vector<ViewRuntimeData> views;
ContextMutablePtr select_context;
ContextMutablePtr insert_context;
void process(const Block & block, ViewInfo & view);
void process(const Block & block, ViewRuntimeData & view);
void checkExceptionsInViews();
void logQueryViews();
};

View File

@ -5,11 +5,13 @@ class FunctionFactory;
void registerFunctionTuple(FunctionFactory &);
void registerFunctionTupleElement(FunctionFactory &);
void registerFunctionTupleToNameValuePairs(FunctionFactory &);
void registerFunctionsTuple(FunctionFactory & factory)
{
registerFunctionTuple(factory);
registerFunctionTupleElement(factory);
registerFunctionTupleToNameValuePairs(factory);
}
}

View File

@ -0,0 +1,131 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Common/assert_cast.h>
#include <memory>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{
/** Transform a named tuple into an array of pairs, where the first element
* of the pair corresponds to the tuple field name and the second one to the
* tuple value.
*/
class FunctionTupleToNameValuePairs : public IFunction
{
public:
static constexpr auto name = "tupleToNameValuePairs";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionTupleToNameValuePairs>();
}
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override
{
return 1;
}
bool useDefaultImplementationForConstants() const override
{
return true;
}
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
// get the type of all the fields in the tuple
const IDataType * col = arguments[0].type.get();
const DataTypeTuple * tuple = checkAndGetDataType<DataTypeTuple>(col);
if (!tuple)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"First argument for function {} must be a tuple.",
getName());
const auto & element_types = tuple->getElements();
if (element_types.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"The argument tuple for function {} must not be empty.",
getName());
const auto & first_element_type = element_types[0];
bool all_value_types_equal = std::all_of(element_types.begin() + 1,
element_types.end(),
[&](const auto &other)
{
return first_element_type->equals(*other);
});
if (!all_value_types_equal)
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"The argument tuple for function {} must contain just one type.",
getName());
}
DataTypePtr tuple_name_type = std::make_shared<DataTypeString>();
DataTypes item_data_types = {tuple_name_type,
first_element_type};
auto item_data_type = std::make_shared<DataTypeTuple>(item_data_types);
return std::make_shared<DataTypeArray>(item_data_type);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
const IColumn * tuple_col = arguments[0].column.get();
const DataTypeTuple * tuple = checkAndGetDataType<DataTypeTuple>(arguments[0].type.get());
const auto * tuple_col_concrete = assert_cast<const ColumnTuple*>(tuple_col);
auto keys = ColumnString::create();
MutableColumnPtr values = tuple_col_concrete->getColumn(0).cloneEmpty();
auto offsets = ColumnVector<UInt64>::create();
for (size_t row = 0; row < tuple_col_concrete->size(); ++row)
{
for (size_t col = 0; col < tuple_col_concrete->tupleSize(); ++col)
{
const std::string & key = tuple->getElementNames()[col];
const IColumn & value_column = tuple_col_concrete->getColumn(col);
values->insertFrom(value_column, row);
keys->insertData(key.data(), key.size());
}
offsets->insertValue(tuple_col_concrete->tupleSize() * (row + 1));
}
std::vector<ColumnPtr> tuple_columns = { std::move(keys), std::move(values) };
auto tuple_column = ColumnTuple::create(std::move(tuple_columns));
return ColumnArray::create(std::move(tuple_column), std::move(offsets));
}
};
}
void registerFunctionTupleToNameValuePairs(FunctionFactory & factory)
{
factory.registerFunction<FunctionTupleToNameValuePairs>();
}
}

View File

@ -1088,7 +1088,11 @@ bool Context::hasScalar(const String & name) const
void Context::addQueryAccessInfo(
const String & quoted_database_name, const String & full_quoted_table_name, const Names & column_names, const String & projection_name)
const String & quoted_database_name,
const String & full_quoted_table_name,
const Names & column_names,
const String & projection_name,
const String & view_name)
{
assert(!isGlobalContext() || getApplicationType() == ApplicationType::LOCAL);
std::lock_guard<std::mutex> lock(query_access_info.mutex);
@ -1098,6 +1102,8 @@ void Context::addQueryAccessInfo(
query_access_info.columns.emplace(full_quoted_table_name + "." + backQuoteIfNeed(column_name));
if (!projection_name.empty())
query_access_info.projections.emplace(full_quoted_table_name + "." + backQuoteIfNeed(projection_name));
if (!view_name.empty())
query_access_info.views.emplace(view_name);
}
@ -2118,7 +2124,6 @@ std::shared_ptr<QueryLog> Context::getQueryLog() const
return shared->system_logs->query_log;
}
std::shared_ptr<QueryThreadLog> Context::getQueryThreadLog() const
{
auto lock = getLock();
@ -2129,6 +2134,15 @@ std::shared_ptr<QueryThreadLog> Context::getQueryThreadLog() const
return shared->system_logs->query_thread_log;
}
std::shared_ptr<QueryViewsLog> Context::getQueryViewsLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->query_views_log;
}
std::shared_ptr<PartLog> Context::getPartLog(const String & part_database) const
{

View File

@ -70,6 +70,7 @@ struct Progress;
class Clusters;
class QueryLog;
class QueryThreadLog;
class QueryViewsLog;
class PartLog;
class TextLog;
class TraceLog;
@ -219,6 +220,7 @@ private:
tables = rhs.tables;
columns = rhs.columns;
projections = rhs.projections;
views = rhs.views;
}
QueryAccessInfo(QueryAccessInfo && rhs) = delete;
@ -235,6 +237,7 @@ private:
std::swap(tables, rhs.tables);
std::swap(columns, rhs.columns);
std::swap(projections, rhs.projections);
std::swap(views, rhs.views);
}
/// To prevent a race between copy-constructor and other uses of this structure.
@ -242,7 +245,8 @@ private:
std::set<std::string> databases{};
std::set<std::string> tables{};
std::set<std::string> columns{};
std::set<std::string> projections;
std::set<std::string> projections{};
std::set<std::string> views{};
};
QueryAccessInfo query_access_info;
@ -469,7 +473,8 @@ public:
const String & quoted_database_name,
const String & full_quoted_table_name,
const Names & column_names,
const String & projection_name = {});
const String & projection_name = {},
const String & view_name = {});
/// Supported factories for records in query_log
enum class QueryLogFactories
@ -730,6 +735,7 @@ public:
/// Nullptr if the query log is not ready for this moment.
std::shared_ptr<QueryLog> getQueryLog() const;
std::shared_ptr<QueryThreadLog> getQueryThreadLog() const;
std::shared_ptr<QueryViewsLog> getQueryViewsLog() const;
std::shared_ptr<TraceLog> getTraceLog() const;
std::shared_ptr<TextLog> getTextLog() const;
std::shared_ptr<MetricLog> getMetricLog() const;

View File

@ -1965,12 +1965,14 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
if (context->hasQueryContext() && !options.is_internal)
{
const String view_name{};
auto local_storage_id = storage->getStorageID();
context->getQueryContext()->addQueryAccessInfo(
backQuoteIfNeed(local_storage_id.getDatabaseName()),
local_storage_id.getFullTableName(),
required_columns,
query_info.projection ? query_info.projection->desc->name : "");
query_info.projection ? query_info.projection->desc->name : "",
view_name);
}
/// Create step which reads from empty source if storage has no data.

View File

@ -20,6 +20,7 @@
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/MetricLog.h>
@ -418,6 +419,7 @@ BlockIO InterpreterSystemQuery::execute()
[&] { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); },
[&] { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); },
[&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); },
[&] { if (auto query_views_log = getContext()->getQueryViewsLog()) query_views_log->flush(true); },
[&] { if (auto zookeeper_log = getContext()->getZooKeeperLog()) zookeeper_log->flush(true); }
);
break;

View File

@ -68,6 +68,8 @@ NamesAndTypesList QueryLogElement::getNamesAndTypes()
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()))},
{"projections", std::make_shared<DataTypeArray>(
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()))},
{"views", std::make_shared<DataTypeArray>(
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()))},
{"exception_code", std::make_shared<DataTypeInt32>()},
{"exception", std::make_shared<DataTypeString>()},
{"stack_trace", std::make_shared<DataTypeString>()},
@ -161,6 +163,7 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
auto & column_tables = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_columns = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_projections = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_views = typeid_cast<ColumnArray &>(*columns[i++]);
auto fill_column = [](const std::set<String> & data, ColumnArray & column)
{
@ -178,6 +181,7 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
fill_column(query_tables, column_tables);
fill_column(query_columns, column_columns);
fill_column(query_projections, column_projections);
fill_column(query_views, column_views);
}
columns[i++]->insert(exception_code);

View File

@ -59,6 +59,7 @@ struct QueryLogElement
std::set<String> query_tables;
std::set<String> query_columns;
std::set<String> query_projections;
std::set<String> query_views;
std::unordered_set<String> used_aggregate_functions;
std::unordered_set<String> used_aggregate_function_combinators;

View File

@ -0,0 +1,104 @@
#include "QueryViewsLog.h"
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/ProfileEventsExt.h>
#include <common/DateLUT.h>
#include <common/types.h>
namespace DB
{
NamesAndTypesList QueryViewsLogElement::getNamesAndTypes()
{
auto view_status_datatype = std::make_shared<DataTypeEnum8>(DataTypeEnum8::Values{
{"QueryStart", static_cast<Int8>(QUERY_START)},
{"QueryFinish", static_cast<Int8>(QUERY_FINISH)},
{"ExceptionBeforeStart", static_cast<Int8>(EXCEPTION_BEFORE_START)},
{"ExceptionWhileProcessing", static_cast<Int8>(EXCEPTION_WHILE_PROCESSING)}});
auto view_type_datatype = std::make_shared<DataTypeEnum8>(DataTypeEnum8::Values{
{"Default", static_cast<Int8>(ViewType::DEFAULT)},
{"Materialized", static_cast<Int8>(ViewType::MATERIALIZED)},
{"Live", static_cast<Int8>(ViewType::LIVE)}});
return {
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
{"event_time_microseconds", std::make_shared<DataTypeDateTime64>(6)},
{"view_duration_ms", std::make_shared<DataTypeUInt64>()},
{"initial_query_id", std::make_shared<DataTypeString>()},
{"view_name", std::make_shared<DataTypeString>()},
{"view_uuid", std::make_shared<DataTypeUUID>()},
{"view_type", std::move(view_type_datatype)},
{"view_query", std::make_shared<DataTypeString>()},
{"view_target", std::make_shared<DataTypeString>()},
{"read_rows", std::make_shared<DataTypeUInt64>()},
{"read_bytes", std::make_shared<DataTypeUInt64>()},
{"written_rows", std::make_shared<DataTypeUInt64>()},
{"written_bytes", std::make_shared<DataTypeUInt64>()},
{"peak_memory_usage", std::make_shared<DataTypeInt64>()},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>())},
{"status", std::move(view_status_datatype)},
{"exception_code", std::make_shared<DataTypeInt32>()},
{"exception", std::make_shared<DataTypeString>()},
{"stack_trace", std::make_shared<DataTypeString>()}};
}
NamesAndAliases QueryViewsLogElement::getNamesAndAliases()
{
return {
{"ProfileEvents.Names", {std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, "mapKeys(ProfileEvents)"},
{"ProfileEvents.Values", {std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())}, "mapValues(ProfileEvents)"}};
}
void QueryViewsLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType()); // event_date
columns[i++]->insert(event_time);
columns[i++]->insert(event_time_microseconds);
columns[i++]->insert(view_duration_ms);
columns[i++]->insertData(initial_query_id.data(), initial_query_id.size());
columns[i++]->insertData(view_name.data(), view_name.size());
columns[i++]->insert(view_uuid);
columns[i++]->insert(view_type);
columns[i++]->insertData(view_query.data(), view_query.size());
columns[i++]->insertData(view_target.data(), view_target.size());
columns[i++]->insert(read_rows);
columns[i++]->insert(read_bytes);
columns[i++]->insert(written_rows);
columns[i++]->insert(written_bytes);
columns[i++]->insert(peak_memory_usage);
if (profile_counters)
{
auto * column = columns[i++].get();
ProfileEvents::dumpToMapColumn(*profile_counters, column, true);
}
else
{
columns[i++]->insertDefault();
}
columns[i++]->insert(status);
columns[i++]->insert(exception_code);
columns[i++]->insertData(exception.data(), exception.size());
columns[i++]->insertData(stack_trace.data(), stack_trace.size());
}
}

View File

@ -0,0 +1,87 @@
#pragma once
#include <chrono>
#include <memory>
#include <sys/types.h>
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <Core/SettingsEnums.h>
#include <Core/Types.h>
#include <Core/UUID.h>
#include <Interpreters/SystemLog.h>
#include <common/types.h>
namespace ProfileEvents
{
class Counters;
}
namespace DB
{
class ThreadStatus;
struct QueryViewsLogElement
{
using ViewStatus = QueryLogElementType;
enum class ViewType : int8_t
{
DEFAULT = 1,
MATERIALIZED = 2,
LIVE = 3
};
struct ViewRuntimeStats
{
String target_name;
ViewType type = ViewType::DEFAULT;
std::unique_ptr<ThreadStatus> thread_status = nullptr;
UInt64 elapsed_ms = 0;
std::chrono::time_point<std::chrono::system_clock> event_time;
ViewStatus event_status = ViewStatus::QUERY_START;
void setStatus(ViewStatus s)
{
event_status = s;
event_time = std::chrono::system_clock::now();
}
};
time_t event_time{};
Decimal64 event_time_microseconds{};
UInt64 view_duration_ms{};
String initial_query_id;
String view_name;
UUID view_uuid{UUIDHelpers::Nil};
ViewType view_type{ViewType::DEFAULT};
String view_query;
String view_target;
UInt64 read_rows{};
UInt64 read_bytes{};
UInt64 written_rows{};
UInt64 written_bytes{};
Int64 peak_memory_usage{};
std::shared_ptr<ProfileEvents::Counters> profile_counters;
ViewStatus status = ViewStatus::QUERY_START;
Int32 exception_code{};
String exception;
String stack_trace;
static std::string name() { return "QueryLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases();
void appendToBlock(MutableColumns & columns) const;
};
class QueryViewsLog : public SystemLog<QueryViewsLogElement>
{
using SystemLog<QueryViewsLogElement>::SystemLog;
};
}

View File

@ -1,13 +1,14 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/CrashLog.h>
#include <Interpreters/MetricLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -104,6 +105,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
opentelemetry_span_log = createSystemLog<OpenTelemetrySpanLog>(
global_context, "system", "opentelemetry_span_log", config,
"opentelemetry_span_log");
query_views_log = createSystemLog<QueryViewsLog>(global_context, "system", "query_views_log", config, "query_views_log");
zookeeper_log = createSystemLog<ZooKeeperLog>(global_context, "system", "zookeeper_log", config, "zookeeper_log");
if (query_log)
@ -124,6 +126,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
logs.emplace_back(asynchronous_metric_log.get());
if (opentelemetry_span_log)
logs.emplace_back(opentelemetry_span_log.get());
if (query_views_log)
logs.emplace_back(query_views_log.get());
if (zookeeper_log)
logs.emplace_back(zookeeper_log.get());

View File

@ -73,6 +73,7 @@ class CrashLog;
class MetricLog;
class AsynchronousMetricLog;
class OpenTelemetrySpanLog;
class QueryViewsLog;
class ZooKeeperLog;
@ -110,6 +111,8 @@ struct SystemLogs
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans.
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
/// Used to log queries of materialized and live views
std::shared_ptr<QueryViewsLog> query_views_log;
/// Used to log all actions of ZooKeeper client
std::shared_ptr<ZooKeeperLog> zookeeper_log;

View File

@ -1,12 +1,17 @@
#include <Common/ThreadStatus.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Parsers/formatAST.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/QueryProfiler.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/TraceCollector.h>
#include <common/errnoToString.h>
@ -18,6 +23,14 @@
# include <sys/resource.h>
#endif
namespace ProfileEvents
{
extern const Event SelectedRows;
extern const Event SelectedBytes;
extern const Event InsertedRows;
extern const Event InsertedBytes;
}
/// Implement some methods of ThreadStatus and CurrentThread here to avoid extra linking dependencies in clickhouse_common_io
/// TODO It doesn't make sense.
@ -287,8 +300,18 @@ void ThreadStatus::finalizePerformanceCounters()
}
}
void ThreadStatus::resetPerformanceCountersLastUsage()
{
*last_rusage = RUsageCounters::current();
if (taskstats)
taskstats->reset();
}
void ThreadStatus::initQueryProfiler()
{
if (!query_profiled_enabled)
return;
/// query profilers are useless without trace collector
auto global_context_ptr = global_context.lock();
if (!global_context_ptr || !global_context_ptr->hasTraceCollector())
@ -455,6 +478,64 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String
thread_log.add(elem);
}
static String getCleanQueryAst(const ASTPtr q, ContextPtr context)
{
String res = serializeAST(*q, true);
if (auto * masker = SensitiveDataMasker::getInstance())
masker->wipeSensitiveData(res);
res = res.substr(0, context->getSettingsRef().log_queries_cut_to_length);
return res;
}
void ThreadStatus::logToQueryViewsLog(const ViewRuntimeData & vinfo)
{
auto query_context_ptr = query_context.lock();
if (!query_context_ptr)
return;
auto views_log = query_context_ptr->getQueryViewsLog();
if (!views_log)
return;
QueryViewsLogElement element;
element.event_time = time_in_seconds(vinfo.runtime_stats.event_time);
element.event_time_microseconds = time_in_microseconds(vinfo.runtime_stats.event_time);
element.view_duration_ms = vinfo.runtime_stats.elapsed_ms;
element.initial_query_id = query_id;
element.view_name = vinfo.table_id.getFullTableName();
element.view_uuid = vinfo.table_id.uuid;
element.view_type = vinfo.runtime_stats.type;
if (vinfo.query)
element.view_query = getCleanQueryAst(vinfo.query, query_context_ptr);
element.view_target = vinfo.runtime_stats.target_name;
auto events = std::make_shared<ProfileEvents::Counters>(performance_counters.getPartiallyAtomicSnapshot());
element.read_rows = progress_in.read_rows.load(std::memory_order_relaxed);
element.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed);
element.written_rows = (*events)[ProfileEvents::InsertedRows];
element.written_bytes = (*events)[ProfileEvents::InsertedBytes];
element.peak_memory_usage = memory_tracker.getPeak() > 0 ? memory_tracker.getPeak() : 0;
if (query_context_ptr->getSettingsRef().log_profile_events != 0)
{
element.profile_counters = events;
}
element.status = vinfo.runtime_stats.event_status;
element.exception_code = 0;
if (vinfo.exception)
{
element.exception_code = getExceptionErrorCode(vinfo.exception);
element.exception = getExceptionMessage(vinfo.exception, false);
if (query_context_ptr->getSettingsRef().calculate_text_stack_trace)
element.stack_trace = getExceptionStackTraceString(vinfo.exception);
}
views_log->add(element);
}
void CurrentThread::initializeQuery()
{
if (unlikely(!current_thread))

View File

@ -663,6 +663,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.query_tables = info.tables;
elem.query_columns = info.columns;
elem.query_projections = info.projections;
elem.query_views = info.views;
}
interpreter->extendQueryLogElem(elem, ast, context, query_database, query_table);
@ -708,6 +709,15 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
element.thread_ids = std::move(info.thread_ids);
element.profile_counters = std::move(info.profile_counters);
/// We need to refresh the access info since dependent views might have added extra information, either during
/// creation of the view (PushingToViewsBlockOutputStream) or while executing its internal SELECT
const auto & access_info = context_ptr->getQueryAccessInfo();
element.query_databases.insert(access_info.databases.begin(), access_info.databases.end());
element.query_tables.insert(access_info.tables.begin(), access_info.tables.end());
element.query_columns.insert(access_info.columns.begin(), access_info.columns.end());
element.query_projections.insert(access_info.projections.begin(), access_info.projections.end());
element.query_views.insert(access_info.views.begin(), access_info.views.end());
const auto & factories_info = context_ptr->getQueryFactoriesInfo();
element.used_aggregate_functions = factories_info.aggregate_functions;
element.used_aggregate_function_combinators = factories_info.aggregate_function_combinators;

View File

@ -131,6 +131,7 @@ SRCS(
QueryNormalizer.cpp
QueryParameterVisitor.cpp
QueryThreadLog.cpp
QueryViewsLog.cpp
RemoveInjectiveFunctionsVisitor.cpp
RenameColumnVisitor.cpp
ReplaceQueryParameterVisitor.cpp

View File

@ -42,6 +42,7 @@ def test_distributed_directory_monitor_split_batch_on_failure_OFF(started_cluste
# max_memory_usage is the limit for the batch on the remote node
# (local query should not be affected since 30MB is enough for 100K rows)
'max_memory_usage': '30Mi',
'max_untracked_memory': '0'
})
# "Received from" is mandatory, since the exception should be thrown on the remote node.
with pytest.raises(QueryRuntimeException, match=r'DB::Exception: Received from.*Memory limit \(for query\) exceeded: .*while pushing to view default\.mv'):
@ -55,6 +56,7 @@ def test_distributed_directory_monitor_split_batch_on_failure_ON(started_cluster
# max_memory_usage is the limit for the batch on the remote node
# (local query should not be affected since 30MB is enough for 100K rows)
'max_memory_usage': '30Mi',
'max_untracked_memory': '0'
})
node1.query('system flush distributed dist')
assert int(node1.query('select count() from dist_data')) == 100000

View File

@ -2122,7 +2122,7 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
# we have 0.25 (sleepEachRow) * 20 ( Rows ) = 5 sec window after "Polled batch of 20 messages"
# while materialized view is working to inject zookeeper failure
pm.drop_instance_zk_connections(instance)
instance.wait_for_log_line("Error.*(session has been expired|Connection loss).*while write prefix to view")
instance.wait_for_log_line("Error.*(session has been expired|Connection loss).*while writing suffix to view")
pm.heal_all()
instance.wait_for_log_line("Committed offset 22")

View File

@ -1,4 +1,8 @@
0
0
800000
1600000
Should throw 1
KO(241)
Should throw 2
KO(241)
Should pass 1
OK
Should pass 2
OK

View File

@ -7,6 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# just in case
set -o pipefail
# shellcheck disable=SC2120
function execute()
{
${CLICKHOUSE_CLIENT} -n "$@"
@ -64,8 +65,7 @@ echo "create table out_01278 as data_01278 Engine=Merge('$CLICKHOUSE_DATABASE',
#
function execute_insert()
{
{
cat <<EOL
${CLICKHOUSE_CLIENT} --max_memory_usage=$TEST_01278_MEMORY --optimize_trivial_insert_select='false' "$@" -q "
insert into data_01278 select
number,
reinterpretAsString(number), // s1
@ -76,18 +76,24 @@ insert into data_01278 select
reinterpretAsString(number), // s6
reinterpretAsString(number), // s7
reinterpretAsString(number) // s8
from numbers(100000); -- { serverError 241; }
EOL
} | {
execute --max_memory_usage=$TEST_01278_MEMORY --optimize_trivial_insert_select='false' "$@"
}
echo 'select count() from out_01278' | execute
from numbers(100000); -- { serverError 241; }" > /dev/null 2>&1
local ret_code=$?
if [[ $ret_code -eq 0 ]];
then
echo " OK"
else
echo " KO($ret_code)"
fi
}
# fails
echo "Should throw 1"
execute_insert --testmode
echo "Should throw 2"
execute_insert --testmode --min_insert_block_size_rows=1 --min_insert_block_size_rows_for_materialized_views=$((1<<20))
# passes
echo "Should pass 1"
execute_insert --min_insert_block_size_rows=1
echo "Should pass 2"
execute_insert --min_insert_block_size_rows_for_materialized_views=1

View File

@ -0,0 +1,70 @@
Row 1:
──────
stage: Query log rows
read_rows: 100
written_rows: 201
databases: ['_table_function','default']
tables: ['_table_function.numbers','default.table_a','default.table_b','default.table_b_live_view','default.table_c']
views: ['default.matview_a_to_b','default.matview_b_to_c','default.table_b_live_view']
sleep_calls: 200
sleep_us: 298
Row 1:
──────
stage: Depending views
view_name: default.matview_a_to_b
view_type: Materialized
status: QueryFinish
view_target: default.table_b
view_query: SELECT toFloat64(a) AS a, b + sleepEachRow(0.000001) AS count FROM default.table_a
read_rows: 100
written_rows: 100
sleep_calls: 100
sleep_us: 99
Row 2:
──────
stage: Depending views
view_name: default.matview_b_to_c
view_type: Materialized
status: QueryFinish
view_target: default.table_c
view_query: SELECT sum(a + sleepEachRow(0.000002)) AS a FROM default.table_b
read_rows: 100
written_rows: 1
sleep_calls: 100
sleep_us: 199
Row 3:
──────
stage: Depending views
view_name: default.table_b_live_view
view_type: Live
status: QueryFinish
view_target: default.table_b_live_view
view_query: SELECT sum(a + b) FROM default.table_b
read_rows: 100
written_rows: 0
sleep_calls: 0
sleep_us: 0
Row 1:
──────
stage: Query log rows 2
read_rows: 50
written_rows: 100
databases: ['_table_function','default']
tables: ['_table_function.numbers','default.table_d','default.table_e','default.table_f']
views: ['default.matview_join_d_e']
sleep_calls: 50
sleep_us: 150
Row 1:
──────
stage: Depending views 2
view_name: default.matview_join_d_e
view_type: Materialized
status: QueryFinish
view_target: default.table_f
view_query: SELECT table_d.a AS a, table_e.count + sleepEachRow(0.000003) AS count FROM default.table_d LEFT JOIN default.table_e ON table_d.a = table_e.a
read_rows: 50
written_rows: 50
sleep_calls: 50
sleep_us: 150

View File

@ -0,0 +1,129 @@
SET allow_experimental_live_view = 1;
SET log_queries=0;
SET log_query_threads=0;
-- SETUP TABLES
CREATE TABLE table_a (a String, b Int64) ENGINE = MergeTree ORDER BY b;
CREATE TABLE table_b (a Float64, b Int64) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE table_c (a Float64) ENGINE = MergeTree ORDER BY a;
CREATE TABLE table_d (a Float64, count Int64) ENGINE MergeTree ORDER BY a;
CREATE TABLE table_e (a Float64, count Int64) ENGINE MergeTree ORDER BY a;
CREATE TABLE table_f (a Float64, count Int64) ENGINE MergeTree ORDER BY a;
-- SETUP MATERIALIZED VIEWS
CREATE MATERIALIZED VIEW matview_a_to_b TO table_b AS SELECT toFloat64(a) AS a, b + sleepEachRow(0.000001) AS count FROM table_a;
CREATE MATERIALIZED VIEW matview_b_to_c TO table_c AS SELECT SUM(a + sleepEachRow(0.000002)) as a FROM table_b;
CREATE MATERIALIZED VIEW matview_join_d_e TO table_f AS SELECT table_d.a as a, table_e.count + sleepEachRow(0.000003) as count FROM table_d LEFT JOIN table_e ON table_d.a = table_e.a;
-- SETUP LIVE VIEW
---- table_b_live_view (Int64)
DROP TABLE IF EXISTS table_b_live_view;
CREATE LIVE VIEW table_b_live_view AS SELECT sum(a + b) FROM table_b;
-- ENABLE LOGS
SET log_query_views=1;
SET log_queries_min_type='QUERY_FINISH';
SET log_queries=1;
-- INSERT 1
INSERT INTO table_a SELECT '111', * FROM numbers(100);
-- INSERT 2
INSERT INTO table_d SELECT 0.5, * FROM numbers(50);
SYSTEM FLUSH LOGS;
-- CHECK LOGS OF INSERT 1
-- Note that live views currently don't report written rows
SELECT
'Query log rows' as stage,
read_rows,
written_rows,
arraySort(databases) as databases,
arraySort(tables) as tables,
arraySort(views) as views,
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us
FROM system.query_log
WHERE query like '-- INSERT 1%INSERT INTO table_a%'
AND current_database = currentDatabase()
AND event_date >= yesterday()
FORMAT Vertical;
SELECT
'Depending views' as stage,
view_name,
view_type,
status,
view_target,
view_query,
read_rows,
written_rows,
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us
FROM system.query_views_log
WHERE initial_query_id =
(
SELECT initial_query_id
FROM system.query_log
WHERE query like '-- INSERT 1%INSERT INTO table_a%'
AND current_database = currentDatabase()
AND event_date >= yesterday()
LIMIT 1
)
ORDER BY view_name
FORMAT Vertical;
-- CHECK LOGS OF INSERT 2
SELECT
'Query log rows 2' as stage,
read_rows,
written_rows,
arraySort(databases) as databases,
arraySort(tables) as tables,
arraySort(views) as views,
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us
FROM system.query_log
WHERE query like '-- INSERT 2%INSERT INTO table_d%'
AND current_database = currentDatabase()
AND event_date >= yesterday()
FORMAT Vertical;
SELECT
'Depending views 2' as stage,
view_name,
view_type,
status,
view_target,
view_query,
read_rows,
written_rows,
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_us
FROM system.query_views_log
WHERE initial_query_id =
(
SELECT initial_query_id
FROM system.query_log
WHERE query like '-- INSERT 2%INSERT INTO table_d%'
AND current_database = currentDatabase()
AND event_date >= yesterday()
LIMIT 1
)
ORDER BY view_name
FORMAT Vertical;
-- TEARDOWN
DROP TABLE table_b_live_view;
DROP TABLE matview_a_to_b;
DROP TABLE matview_b_to_c;
DROP TABLE matview_join_d_e;
DROP TABLE table_f;
DROP TABLE table_e;
DROP TABLE table_d;
DROP TABLE table_c;
DROP TABLE table_b;
DROP TABLE table_a;

View File

@ -0,0 +1,14 @@
--parallel_view_processing 0
table_exception_a 1
table_exception_b 0
table_exception_c 1
Excep****WhileProcessing
default.matview_exception_a_to_b Excep****WhileProcessing 6 default.table_exception_b SELECT toFloat64(a) AS a, b FROM default.table_exception_a
default.matview_exception_a_to_c QueryFinish 0 default.table_exception_c SELECT b AS a FROM default.table_exception_a
--parallel_view_processing 1
table_exception_a 2
table_exception_b 0
table_exception_c 2
Excep****WhileProcessing
default.matview_exception_a_to_b Excep****WhileProcessing 6 default.table_exception_b SELECT toFloat64(a) AS a, b FROM default.table_exception_a
default.matview_exception_a_to_c QueryFinish 0 default.table_exception_c SELECT b AS a FROM default.table_exception_a

View File

@ -0,0 +1,94 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function cleanup()
{
${CLICKHOUSE_CLIENT} -n -q "
DROP TABLE IF EXISTS matview_exception_a_to_c;
DROP TABLE IF EXISTS matview_exception_a_to_b;
DROP TABLE IF EXISTS table_exception_c;
DROP TABLE IF EXISTS table_exception_b;
DROP TABLE IF EXISTS table_exception_a;
";
}
function setup()
{
${CLICKHOUSE_CLIENT} -n -q "
CREATE TABLE table_exception_a (a String, b Int64) ENGINE = MergeTree ORDER BY b;
CREATE TABLE table_exception_b (a Float64, b Int64) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE table_exception_c (a Float64) ENGINE = MergeTree ORDER BY a;
CREATE MATERIALIZED VIEW matview_exception_a_to_b TO table_exception_b AS SELECT toFloat64(a) AS a, b FROM table_exception_a;
CREATE MATERIALIZED VIEW matview_exception_a_to_c TO table_exception_c AS SELECT b AS a FROM table_exception_a;
";
}
function test()
{
echo "$@";
# We are going to insert an invalid number into table_exception_a. This will fail when inserting into
# table_exception_b via matview_exception_a_to_b, and will work ok when inserting into table_exception_c
${CLICKHOUSE_CLIENT} "$@" --log_queries=1 --log_query_views=1 -q "INSERT INTO table_exception_a VALUES ('0.Aa234', 22)" > /dev/null 2>&1 || true;
${CLICKHOUSE_CLIENT} -q "
SELECT * FROM
(
SELECT 'table_exception_a' as name, count() AS c FROM table_exception_a UNION ALL
SELECT 'table_exception_b' as name, count() AS c FROM table_exception_b UNION ALL
SELECT 'table_exception_c' as name, count() AS c FROM table_exception_c
)
ORDER BY name ASC
FORMAT TSV";
${CLICKHOUSE_CLIENT} -q 'SYSTEM FLUSH LOGS';
${CLICKHOUSE_CLIENT} -q "
SELECT
replaceOne(CAST(type AS String), 'ExceptionWhileProcessing', 'Excep****WhileProcessing')
exception_code
FROM system.query_log
WHERE
query LIKE 'INSERT INTO table_exception_a%' AND
type > 0 AND
event_date >= yesterday() AND
current_database = currentDatabase()
ORDER BY event_time_microseconds DESC
LIMIT 1
FORMAT TSV";
${CLICKHOUSE_CLIENT} -q "
SELECT
view_name,
replaceOne(CAST(status AS String), 'ExceptionWhileProcessing', 'Excep****WhileProcessing'),
exception_code,
view_target,
view_query
FROM system.query_views_log
WHERE initial_query_id =
(
SELECT query_id
FROM system.query_log
WHERE
current_database = '${CLICKHOUSE_DATABASE}' AND
query LIKE 'INSERT INTO table_exception_a%' AND
type > 0 AND
event_date >= yesterday() AND
current_database = currentDatabase()
ORDER BY event_time_microseconds DESC
LIMIT 1
)
ORDER BY view_name ASC
";
}
trap cleanup EXIT;
cleanup;
setup;
test --parallel_view_processing 0;
test --parallel_view_processing 1;
exit 0

View File

@ -0,0 +1,6 @@
[('1',1),('2',2),('3',3)]
[('a',(1,2)),('b',(3,4))]
[('a',(5,6)),('b',(7,8))]
CPU 3.3
Memory 5.5
Disk 6.6

View File

@ -0,0 +1,25 @@
SELECT tupleToNameValuePairs(tuple(1, 2, 3));
DROP TABLE IF EXISTS test02008;
CREATE TABLE test02008 (
col Tuple(
a Tuple(key1 int, key2 int),
b Tuple(key1 int, key3 int)
)
) ENGINE=Memory();
INSERT INTO test02008 VALUES (tuple(tuple(1, 2), tuple(3, 4)));
INSERT INTO test02008 VALUES (tuple(tuple(5, 6), tuple(7, 8)));
SELECT tupleToNameValuePairs(col) FROM test02008 ORDER BY col;
DROP TABLE IF EXISTS test02008;
CREATE TABLE test02008 (
col Tuple(CPU double, Memory double, Disk double)
) ENGINE=Memory();
INSERT INTO test02008 VALUES (tuple(3.3, 5.5, 6.6));
SELECT untuple(arrayJoin(tupleToNameValuePairs(col))) from test02008;
DROP TABLE IF EXISTS test02008;
SELECT tupleToNameValuePairs(tuple(1, 1.3)); -- { serverError 43; }
SELECT tupleToNameValuePairs(tuple(1, [1,2])); -- { serverError 43; }
SELECT tupleToNameValuePairs(tuple(1, 'a')); -- { serverError 43; }
SELECT tupleToNameValuePairs(33); -- { serverError 43; }

View File

@ -328,6 +328,14 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_thread_log>
<!-- Logs materialized and live views associated with a query -->
<query_views_log>
<database>system</database>
<table>query_views_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_views_log>
<!-- Uncomment if use part log.
Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads).
<part_log>

View File

@ -2897,6 +2897,7 @@ version: 1.0
* processes
* query_log
* query_thread_log
* query_views_log
* clusters
* events
* graphite_retentions

View File

@ -5350,6 +5350,7 @@ RQ_SRS_006_RBAC_Table_SensitiveTables = Requirement(
'* processes\n'
'* query_log\n'
'* query_thread_log\n'
'* query_views_log\n'
'* clusters\n'
'* events\n'
'* graphite_retentions\n'
@ -8854,11 +8855,11 @@ RQ_SRS_006_RBAC_Privileges_AdminOption = Requirement(
num='5.40')
SRS_006_ClickHouse_Role_Based_Access_Control = Specification(
name='SRS-006 ClickHouse Role Based Access Control',
name='SRS-006 ClickHouse Role Based Access Control',
description=None,
author=None,
date=None,
status=None,
date=None,
status=None,
approved_by=None,
approved_date=None,
approved_version=None,
@ -12877,6 +12878,7 @@ version: 1.0
* processes
* query_log
* query_thread_log
* query_views_log
* clusters
* events
* graphite_retentions

View File

@ -58,6 +58,10 @@ def sensitive_tables(self, node=None):
output = node.query("SELECT count(*) FROM system.query_thread_log", settings = [("user",user_name)]).output
assert output == 0, error()
with And("I select from query_views_log"):
output = node.query("SELECT count(*) FROM system.query_views_log", settings = [("user",user_name)]).output
assert output == 0, error()
with And("I select from clusters"):
output = node.query("SELECT count(*) FROM system.clusters", settings = [("user",user_name)]).output
assert output == 0, error()