Merge branch 'master' into tsv-csv-detect-header

This commit is contained in:
Kruglov Pavel 2023-01-23 21:47:17 +01:00 committed by GitHub
commit 478a552a0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
262 changed files with 5017 additions and 1407 deletions

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 0ab9bba7ccad3c8dacce04a35cb3b78218547ab4
Subproject commit 4b1c8dd9913d2a16db62df0e509fa598da5c8219

View File

@ -18,13 +18,25 @@ repo_dir=ch
BINARY_TO_DOWNLOAD=${BINARY_TO_DOWNLOAD:="clang-15_debug_none_unsplitted_disable_False_binary"}
BINARY_URL_TO_DOWNLOAD=${BINARY_URL_TO_DOWNLOAD:="https://clickhouse-builds.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/clickhouse_build_check/$BINARY_TO_DOWNLOAD/clickhouse"}
function git_clone_with_retry
{
for _ in 1 2 3 4; do
if git clone --depth 1 https://github.com/ClickHouse/ClickHouse.git -- "$1" 2>&1 | ts '%Y-%m-%d %H:%M:%S';then
return 0
else
sleep 0.5
fi
done
return 1
}
function clone
{
# For local runs, start directly from the "fuzz" stage.
rm -rf "$repo_dir" ||:
mkdir "$repo_dir" ||:
git clone --depth 1 https://github.com/ClickHouse/ClickHouse.git -- "$repo_dir" 2>&1 | ts '%Y-%m-%d %H:%M:%S'
git_clone_with_retry "$repo_dir"
(
cd "$repo_dir"
if [ "$PR_TO_TEST" != "0" ]; then

View File

@ -50,7 +50,7 @@ Action required for every item -- these are errors that must be fixed.
A query is supposed to run longer than 0.1 second. If your query runs faster, increase the amount of processed data to bring the run time above this threshold. You can use a bigger table (e.g. `hits_100m` instead of `hits_10m`), increase a `LIMIT`, make a query single-threaded, and so on. Queries that are too fast suffer from poor stability and precision.
#### Partial Queries
#### Backward-incompatible Queries
Action required for the cells marked in red.
Shows the queries we are unable to run on an old server -- probably because they contain a new function. You should see this table when you add a new function and a performance test for it. Check that the run time and variance are acceptable (run time between 0.1 and 1 seconds, variance below 10%). If not, they will be highlighted in red.

View File

@ -399,7 +399,7 @@ clickhouse-local --query "
create view query_runs as select * from file('analyze/query-runs.tsv', TSV,
'test text, query_index int, query_id text, version UInt8, time float');
-- Separately process 'partial' queries which we could only run on the new server
-- Separately process backward-incompatible ('partial') queries which we could only run on the new server
-- because they use new functions. We can't make normal stats for them, but still
-- have to show some stats so that the PR author can tweak them.
create view partial_queries as select test, query_index
@ -650,7 +650,7 @@ create view partial_query_times as select * from
'test text, query_index int, time_stddev float, time_median double')
;
-- Report for partial queries that we could only run on the new server (e.g.
-- Report for backward-incompatible ('partial') queries that we could only run on the new server (e.g.
-- queries with new functions added in the tested PR).
create table partial_queries_report engine File(TSV, 'report/partial-queries-report.tsv')
settings output_format_decimal_trailing_zeros = 1
@ -829,7 +829,7 @@ create view query_runs as select * from file('analyze/query-runs.tsv', TSV,
-- Guess the number of query runs used for this test. The number is required to
-- calculate and check the average query run time in the report.
-- We have to be careful, because we will encounter:
-- 1) partial queries which run only on one server
-- 1) backward-incompatible ('partial') queries which run only on one server
-- 3) some errors that make query run for a different number of times on a
-- particular server.
--

View File

@ -30,7 +30,7 @@ faster_queries = 0
slower_queries = 0
unstable_queries = 0
very_unstable_queries = 0
unstable_partial_queries = 0
unstable_backward_incompatible_queries = 0
# max seconds to run one query by itself, not counting preparation
allowed_single_run_time = 2
@ -378,13 +378,13 @@ if args.report == "main":
]
)
def add_partial():
def add_backward_incompatible():
rows = tsvRows("report/partial-queries-report.tsv")
if not rows:
return
global unstable_partial_queries, slow_average_tests, tables
text = tableStart("Partial Queries")
global unstable_backward_incompatible_queries, slow_average_tests, tables
text = tableStart("Backward-incompatible queries")
columns = ["Median time, s", "Relative time variance", "Test", "#", "Query"]
text += tableHeader(columns)
attrs = ["" for c in columns]
@ -392,7 +392,7 @@ if args.report == "main":
anchor = f"{currentTableAnchor()}.{row[2]}.{row[3]}"
if float(row[1]) > 0.10:
attrs[1] = f'style="background: {color_bad}"'
unstable_partial_queries += 1
unstable_backward_incompatible_queries += 1
errors_explained.append(
[
f"<a href=\"#{anchor}\">The query no. {row[3]} of test '{row[2]}' has excessive variance of run time. Keep it below 10%</a>"
@ -414,7 +414,7 @@ if args.report == "main":
text += tableEnd()
tables.append(text)
add_partial()
add_backward_incompatible()
def add_changes():
rows = tsvRows("report/changed-perf.tsv")
@ -630,8 +630,8 @@ if args.report == "main":
status = "failure"
message_array.append(str(slower_queries) + " slower")
if unstable_partial_queries:
very_unstable_queries += unstable_partial_queries
if unstable_backward_incompatible_queries:
very_unstable_queries += unstable_backward_incompatible_queries
status = "failure"
# Don't show mildly unstable queries, only the very unstable ones we

View File

@ -128,9 +128,10 @@ function run_tests()
if [[ "${HIGH_LEVEL_COVERAGE}" = "YES" ]]; then
ADDITIONAL_OPTIONS+=('--report-coverage')
ADDITIONAL_OPTIONS+=('--report-logs-stats')
fi
ADDITIONAL_OPTIONS+=('--report-logs-stats')
set +e
clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --hung-check --print-time \
--test-runs "$NUM_TRIES" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \

View File

@ -40,8 +40,8 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2,
INDEX index_name1 expr1 TYPE type1(...) [GRANULARITY value1],
INDEX index_name2 expr2 TYPE type2(...) [GRANULARITY value2],
...
PROJECTION projection_name_1 (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY]),
PROJECTION projection_name_2 (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY])
@ -359,13 +359,15 @@ ClickHouse uses this logic not only for days of the month sequences, but for any
The index declaration is in the columns section of the `CREATE` query.
``` sql
INDEX index_name expr TYPE type(...) GRANULARITY granularity_value
INDEX index_name expr TYPE type(...) [GRANULARITY granularity_value]
```
For tables from the `*MergeTree` family, data skipping indices can be specified.
These indices aggregate some information about the specified expression on blocks, which consist of `granularity_value` granules (the size of the granule is specified using the `index_granularity` setting in the table engine). Then these aggregates are used in `SELECT` queries for reducing the amount of data to read from the disk by skipping big blocks of data where the `where` query cannot be satisfied.
The `GRANULARITY` clause can be omitted, the default value of `granularity_value` is 1.
**Example**
``` sql

View File

@ -22,8 +22,8 @@ functions in ClickHouse. The sample datasets include:
- The [Cell Towers dataset](../getting-started/example-datasets/cell-towers.md) imports a CSV into ClickHouse
- The [NYPD Complaint Data](../getting-started/example-datasets/nypd_complaint_data.md) demonstrates how to use data inference to simplify creating tables
- The ["What's on the Menu?" dataset](../getting-started/example-datasets/menus.md) has an example of denormalizing data
- The [Getting Data Into ClickHouse - Part 1](https://clickhouse.com/blog/getting-data-into-clickhouse-part-1) provides examples of defining a schema and loading a small Hacker News dataset
- The [Getting Data Into ClickHouse - Part 2 - A JSON detour](https://clickhouse.com/blog/getting-data-into-clickhouse-part-2-json) shows how JSON data can be loaded
- The [Getting Data Into ClickHouse - Part 3 - Using S3](https://clickhouse.com/blog/getting-data-into-clickhouse-part-3-s3) has examples of loading data from s3
- [Getting Data Into ClickHouse - Part 1](https://clickhouse.com/blog/getting-data-into-clickhouse-part-1) provides examples of defining a schema and loading a small Hacker News dataset
- [Getting Data Into ClickHouse - Part 3 - Using S3](https://clickhouse.com/blog/getting-data-into-clickhouse-part-3-s3) has examples of loading data from s3
- [Generating random data in ClickHouse](https://clickhouse.com/blog/generating-random-test-distribution-data-for-clickhouse) shows how to generate random data if none of the above fit your needs.
View the **Tutorials and Datasets** menu for a complete list of sample datasets.

View File

@ -22,5 +22,6 @@ Additional cache types:
- [Dictionaries](../sql-reference/dictionaries/index.md) data cache.
- Schema inference cache.
- [Filesystem cache](storing-data.md) over S3, Azure, Local and other disks.
- [(Experimental) Query result cache](query-result-cache.md).
To drop one of the caches, use [SYSTEM DROP ... CACHE](../sql-reference/statements/system.md#drop-mark-cache) statements.

View File

@ -0,0 +1,99 @@
---
slug: /en/operations/query-result-cache
sidebar_position: 65
sidebar_label: Query Result Cache [experimental]
---
# Query Result Cache [experimental]
The query result cache allows to compute SELECT queries just once and to serve further executions of the same query directly from the cache.
Depending on the type of the queries, this can dramatically reduce latency and resource consumption of the ClickHouse server.
## Background, Design and Limitations
Query result caches can generally be viewed as transactionally consistent or inconsistent.
- In transactionally consistent caches, the database invalidates (discards) cached query results if the result of the SELECT query changes
or potentially changes. In ClickHouse, operations which change the data include inserts/updates/deletes in/of/from tables or collapsing
merges. Transactionally consistent caching is especially suitable for OLTP databases, for example
[MySQL](https://dev.mysql.com/doc/refman/5.6/en/query-cache.html) (which removed query result cache after v8.0) and
[Oracle](https://docs.oracle.com/database/121/TGDBA/tune_result_cache.htm).
- In transactionally inconsistent caches, slight inaccuracies in query results are accepted under the assumption that all cache entries are
assigned a validity period after which they expire (e.g. 1 minute) and that the underlying data changes only little during this period.
This approach is overall more suitable for OLAP databases. As an example where transactionally inconsistent caching is sufficient,
consider an hourly sales report in a reporting tool which is simultaneously accessed by multiple users. Sales data changes typically
slowly enough that the database only needs to compute the report once (represented by the first SELECT query). Further queries can be
served directly from the query result cache. In this example, a reasonable validity period could be 30 min.
Transactionally inconsistent caching is traditionally provided by client tools or proxy packages interacting with the database. As a result,
the same caching logic and configuration is often duplicated. With ClickHouse's query result cache, the caching logic moves to the server
side. This reduces maintenance effort and avoids redundancy.
:::warning
The query result cache is an experimental feature that should not be used in production. There are known cases (e.g. in distributed query
processing) where wrong results are returned.
:::
## Configuration Settings and Usage
Parameter [enable_experimental_query_result_cache](settings/settings.md#enable-experimental-query-result-cache) controls whether query
results are inserted into / retrieved from the cache for the current query or session. For example, the first execution of query
``` sql
SELECT some_expensive_calculation(column_1, column_2)
FROM table
SETTINGS enable_experimental_query_result_cache = true;
```
stores the query result into the query result cache. Subsequent executions of the same query (also with parameter
`enable_experimental_query_result_cache = true`) will read the computed result directly from the cache.
Sometimes, it is desirable to use the query result cache only passively, i.e. to allow reading from it but not writing into it (if the cache
result is not stored yet). Parameter [enable_experimental_query_result_cache_passive_usage](settings/settings.md#enable-experimental-query-result-cache-passive-usage)
instead of 'enable_experimental_query_result_cache' can be used for that.
For maximum control, it is generally recommended to provide settings "enable_experimental_query_result_cache" or
"enable_experimental_query_result_cache_passive_usage" only with specific queries. It is also possible to enable caching at user or profile
level but one should keep in mind that all SELECT queries may return a cached results, including monitoring or debugging queries to system
tables.
The query result cache can be cleared using statement `SYSTEM DROP QUERY RESULT CACHE`. The content of the query result cache is displayed
in system table `SYSTEM.QUERY_RESULT_CACHE`. The number of query result cache hits and misses are shown as events "QueryResultCacheHits" and
"QueryResultCacheMisses" in system table `SYSTEM.EVENTS`. Both counters are only updated for SELECT queries which run with settings
"enable_experimental_query_result_cache = true" or "enable_experimental_query_result_cache_passive_usage = true". Other queries do not
affect the cache miss counter.
The query result cache exists once per ClickHouse server process. However, cache results are by default not shared between users. This can
be changed (see below) but doing so is not recommended for security reasons.
Query results are referenced in the query result cache by the [Abstract Syntax Tree (AST)](https://en.wikipedia.org/wiki/Abstract_syntax_tree)
of their query. This means that caching is agnostic to upper/lowercase, for example `SELECT 1` and `select 1` are treated as the same query.
To make the matching more natural, all query-level settings related to the query result cache are removed from the AST.
If the query was aborted due to an exception or user cancellation, no entry is written into the query result cache.
The size of the query result cache, the maximum number of cache entries and the maximum size of cache entries (in bytes and in records) can
be configured using different [server configuration options](server-configuration-parameters/settings.md#server_configuration_parameters_query-result-cache).
To define how long a query must run at least such that its result can be cached, you can use setting
[query_result_cache_min_query_duration](settings/settings.md#query-result-cache-min-query-duration). For example, the result of query
``` sql
SELECT some_expensive_calculation(column_1, column_2)
FROM table
SETTINGS enable_experimental_query_result_cache = true, query_result_cache_min_query_duration = 5000;
```
is only cached if the query runs longer than 5 seconds. It is also possible to specify how often a query needs to run until its result is
cached - for that use setting [query_result_cache_min_query_runs](settings/settings.md#query-result-cache-min-query-runs).
Entries in the query result cache become stale after a certain time period (time-to-live). By default, this period is 60 seconds but a
different value can be specified at session, profile or query level using setting [query_result_cache_ttl](settings/settings.md#query-result-cache-ttl).
Also, results of queries with non-deterministic functions such as `rand()` and `now()` are not cached. This can be overruled using
setting [query_result_cache_store_results_of_queries_with_nondeterministic_functions](settings/settings.md#query-result-cache-store-results-of-queries-with-nondeterministic-functions).
Finally, entries in the query cache are not shared between users due to security reasons. For example, user A must not be able to bypass a
row policy on a table by running the same query as another user B for whom no such policy exists. However, if necessary, cache entries can
be marked accessible by other users (i.e. shared) by supplying setting
[query_result_cache_share_between_users]{settings/settings.md#query-result-cache-share-between-users}.

View File

@ -1270,6 +1270,32 @@ If the table does not exist, ClickHouse will create it. If the structure of the
</query_log>
```
## query_result_cache {#server_configuration_parameters_query-result-cache}
[Query result cache](../query-result-cache.md) configuration.
The following settings are available:
- `size`: The maximum cache size in bytes. 0 means the query result cache is disabled. Default value: `1073741824` (1 GiB).
- `max_entries`: The maximum number of SELECT query results stored in the cache. Default value: `1024`.
- `max_entry_size`: The maximum size in bytes SELECT query results may have to be saved in the cache. Default value: `1048576` (1 MiB).
- `max_entry_records`: The maximum number of records SELECT query results may have to be saved in the cache. Default value: `30000000` (30 mil).
:::warning
Data for the query result cache is allocated in DRAM. If memory is scarce, make sure to set a small value for `size` or disable the query result cache altogether.
:::
**Example**
```xml
<query_result_cache>
<size>1073741824</size>
<max_entries>1024</max_entries>
<max_entry_size>1048576</max_entry_size>
<max_entry_records>30000000</max_entry_records>
</query_result_cache>
```
## query_thread_log {#server_configuration_parameters-query_thread_log}
Setting for logging threads of queries received with the [log_query_threads=1](../../operations/settings/settings.md#settings-log-query-threads) setting.

View File

@ -176,6 +176,59 @@ Similar to [replicated_deduplication_window](#replicated-deduplication-window),
The time is relative to the time of the most recent record, not to the wall time. If it's the only record it will be stored forever.
## replicated_deduplication_window_for_async_inserts {#replicated-deduplication-window-for-async-inserts}
The number of most recently async inserted blocks for which ClickHouse Keeper stores hash sums to check for duplicates.
Possible values:
- Any positive integer.
- 0 (disable deduplication for async_inserts)
Default value: 10000.
The [Async Insert](./settings.md#async-insert) command will be cached in one or more blocks (parts). For [insert deduplication](../../engines/table-engines/mergetree-family/replication.md), when writing into replicated tables, ClickHouse writes the hash sums of each insert into ClickHouse Keeper. Hash sums are stored only for the most recent `replicated_deduplication_window_for_async_inserts` blocks. The oldest hash sums are removed from ClickHouse Keeper.
A large number of `replicated_deduplication_window_for_async_inserts` slows down `Async Inserts` because it needs to compare more entries.
The hash sum is calculated from the composition of the field names and types and the data of the insert (stream of bytes).
## replicated_deduplication_window_seconds_for_async_inserts {#replicated-deduplication-window-seconds-for-async_inserts}
The number of seconds after which the hash sums of the async inserts are removed from ClickHouse Keeper.
Possible values:
- Any positive integer.
Default value: 604800 (1 week).
Similar to [replicated_deduplication_window_for_async_inserts](#replicated-deduplication-window-for-async-inserts), `replicated_deduplication_window_seconds_for_async_inserts` specifies how long to store hash sums of blocks for async insert deduplication. Hash sums older than `replicated_deduplication_window_seconds_for_async_inserts` are removed from ClickHouse Keeper, even if they are less than ` replicated_deduplication_window_for_async_inserts`.
The time is relative to the time of the most recent record, not to the wall time. If it's the only record it will be stored forever.
## use_async_block_ids_cache {#use-async-block-ids-cache}
If true, we cache the hash sums of the async inserts.
Possible values:
- true, false
Default value: false.
A block bearing multiple async inserts will generate multiple hash sums. When some of the inserts are duplicated, keeper will only return one duplicated hash sum in one RPC, which will cause unnecessary RPC retries. This cache will watch the hash sums path in Keeper. If updates are watched in the Keeper, the cache will update as soon as possible, so that we are able to filter the duplicated inserts in the memory.
## async_block_ids_cache_min_update_interval_ms
The minimum interval (in milliseconds) to update the `use_async_block_ids_cache`
Possible values:
- Any positive integer.
Default value: 100.
Normally, the `use_async_block_ids_cache` updates as soon as there are updates in the watching keeper path. However, the cache updates might be too frequent and become a heavy burden. This minimum interval prevents the cache from updating too fast. Note that if we set this value too long, the block with duplicated inserts will have a longer retry time.
## max_replicated_logs_to_keep
How many records may be in the ClickHouse Keeper log if there is inactive replica. An inactive replica becomes lost when when this number exceed.
@ -745,4 +798,4 @@ You can see which parts of `s` were stored using the sparse serialization:
│ id │ Default │
│ s │ Sparse │
└────────┴────────────────────┘
```
```

View File

@ -408,51 +408,51 @@ Several algorithms can be specified, and an available one would be chosen for a
Possible values:
### `default`
- default
This is the equivalent of `hash` or `direct`, if possible (same as `direct,hash`)
This is the equivalent of `hash` or `direct`, if possible (same as `direct,hash`)
### `grace_hash`
- grace_hash
[Grace hash join](https://en.wikipedia.org/wiki/Hash_join#Grace_hash_join) is used. Grace hash provides an algorithm option that provides performant complex joins while limiting memory use.
[Grace hash join](https://en.wikipedia.org/wiki/Hash_join#Grace_hash_join) is used. Grace hash provides an algorithm option that provides performant complex joins while limiting memory use.
The first phase of a grace join reads the right table and splits it into N buckets depending on the hash value of key columns (initially, N is `grace_hash_join_initial_buckets`). This is done in a way to ensure that each bucket can be processed independently. Rows from the first bucket are added to an in-memory hash table while the others are saved to disk. If the hash table grows beyond the memory limit (e.g., as set by [`max_bytes_in_join`](/docs/en/operations/settings/query-complexity.md/#settings-max_bytes_in_join)), the number of buckets is increased and the assigned bucket for each row. Any rows which dont belong to the current bucket are flushed and reassigned.
The first phase of a grace join reads the right table and splits it into N buckets depending on the hash value of key columns (initially, N is `grace_hash_join_initial_buckets`). This is done in a way to ensure that each bucket can be processed independently. Rows from the first bucket are added to an in-memory hash table while the others are saved to disk. If the hash table grows beyond the memory limit (e.g., as set by [`max_bytes_in_join`](/docs/en/operations/settings/query-complexity.md/#settings-max_bytes_in_join)), the number of buckets is increased and the assigned bucket for each row. Any rows which dont belong to the current bucket are flushed and reassigned.
### `hash`
- hash
[Hash join algorithm](https://en.wikipedia.org/wiki/Hash_join) is used. The most generic implementation that supports all combinations of kind and strictness and multiple join keys that are combined with `OR` in the `JOIN ON` section.
[Hash join algorithm](https://en.wikipedia.org/wiki/Hash_join) is used. The most generic implementation that supports all combinations of kind and strictness and multiple join keys that are combined with `OR` in the `JOIN ON` section.
### `parallel_hash`
- parallel_hash
A variation of `hash` join that splits the data into buckets and builds several hashtables instead of one concurrently to speed up this process.
A variation of `hash` join that splits the data into buckets and builds several hashtables instead of one concurrently to speed up this process.
When using the `hash` algorithm, the right part of `JOIN` is uploaded into RAM.
When using the `hash` algorithm, the right part of `JOIN` is uploaded into RAM.
### `partial_merge`
- partial_merge
A variation of the [sort-merge algorithm](https://en.wikipedia.org/wiki/Sort-merge_join), where only the right table is fully sorted.
A variation of the [sort-merge algorithm](https://en.wikipedia.org/wiki/Sort-merge_join), where only the right table is fully sorted.
The `RIGHT JOIN` and `FULL JOIN` are supported only with `ALL` strictness (`SEMI`, `ANTI`, `ANY`, and `ASOF` are not supported).
The `RIGHT JOIN` and `FULL JOIN` are supported only with `ALL` strictness (`SEMI`, `ANTI`, `ANY`, and `ASOF` are not supported).
When using the `partial_merge` algorithm, ClickHouse sorts the data and dumps it to the disk. The `partial_merge` algorithm in ClickHouse differs slightly from the classic realization. First, ClickHouse sorts the right table by joining keys in blocks and creates a min-max index for sorted blocks. Then it sorts parts of the left table by the `join key` and joins them over the right table. The min-max index is also used to skip unneeded right table blocks.
When using the `partial_merge` algorithm, ClickHouse sorts the data and dumps it to the disk. The `partial_merge` algorithm in ClickHouse differs slightly from the classic realization. First, ClickHouse sorts the right table by joining keys in blocks and creates a min-max index for sorted blocks. Then it sorts parts of the left table by the `join key` and joins them over the right table. The min-max index is also used to skip unneeded right table blocks.
### `direct`
- direct
This algorithm can be applied when the storage for the right table supports key-value requests.
This algorithm can be applied when the storage for the right table supports key-value requests.
The `direct` algorithm performs a lookup in the right table using rows from the left table as keys. It's supported only by special storage such as [Dictionary](../../engines/table-engines/special/dictionary.md/#dictionary) or [EmbeddedRocksDB](../../engines/table-engines/integrations/embedded-rocksdb.md) and only the `LEFT` and `INNER` JOINs.
The `direct` algorithm performs a lookup in the right table using rows from the left table as keys. It's supported only by special storage such as [Dictionary](../../engines/table-engines/special/dictionary.md/#dictionary) or [EmbeddedRocksDB](../../engines/table-engines/integrations/embedded-rocksdb.md) and only the `LEFT` and `INNER` JOINs.
### `auto`
- auto
When set to `auto`, `hash` join is tried first, and the algorithm is switched on the fly to another algorithm if the memory limit is violated.
When set to `auto`, `hash` join is tried first, and the algorithm is switched on the fly to another algorithm if the memory limit is violated.
### `full_sorting_merge`
- full_sorting_merge
[Sort-merge algorithm](https://en.wikipedia.org/wiki/Sort-merge_join) with full sorting joined tables before joining.
[Sort-merge algorithm](https://en.wikipedia.org/wiki/Sort-merge_join) with full sorting joined tables before joining.
### `prefer_partial_merge`
- prefer_partial_merge
ClickHouse always tries to use `partial_merge` join if possible, otherwise, it uses `hash`. *Deprecated*, same as `partial_merge,hash`.
ClickHouse always tries to use `partial_merge` join if possible, otherwise, it uses `hash`. *Deprecated*, same as `partial_merge,hash`.
## join_any_take_last_row {#settings-join_any_take_last_row}
@ -1300,6 +1300,81 @@ Possible values:
Default value: `3`.
## enable_experimental_query_result_cache {#enable-experimental-query-result-cache}
If turned on, results of SELECT queries are stored in and (if available) retrieved from the [query result cache](../query-result-cache.md).
Possible values:
- 0 - Disabled
- 1 - Enabled
Default value: `0`.
## enable_experimental_query_result_cache_passive_usage {#enable-experimental-query-result-cache-passive-usage}
If turned on, results of SELECT queries are (if available) retrieved from the [query result cache](../query-result-cache.md).
Possible values:
- 0 - Disabled
- 1 - Enabled
Default value: `0`.
## query_result_cache_store_results_of_queries_with_nondeterministic_functions {#query-result-cache-store-results-of-queries-with-nondeterministic-functions}
If turned on, then results of SELECT queries with non-deterministic functions (e.g. `rand()`, `now()`) can be cached in the [query result cache](../query-result-cache.md).
Possible values:
- 0 - Disabled
- 1 - Enabled
Default value: `0`.
## query_result_cache_min_query_runs {#query-result-cache-min-query-runs}
Minimum number of times a SELECT query must run before its result is stored in the [query result cache](../query-result-cache.md).
Possible values:
- Positive integer >= 0.
Default value: `0`
## query_result_cache_min_query_duration {#query-result-cache-min-query-duration}
Minimum duration in milliseconds a query needs to run for its result to be stored in the [query result cache](../query-result-cache.md).
Possible values:
- Positive integer >= 0.
Default value: `0`
## query_result_cache_ttl {#query-result-cache-ttl}
After this time in seconds entries in the [query result cache](../query-result-cache.md) become stale.
Possible values:
- Positive integer >= 0.
Default value: `60`
## query_result_cache_share_between_users {#query-result-cache-share-between-users}
If turned on, the result of SELECT queries cached in the [query result cache](../query-result-cache.md) can be read by other users.
It is not recommended to enable this setting due to security reasons.
Possible values:
- 0 - Disabled
- 1 - Enabled
Default value: `0`.
## insert_quorum {#settings-insert_quorum}
Enables the quorum writes.
@ -1394,6 +1469,22 @@ By default, blocks inserted into replicated tables by the `INSERT` statement are
For the replicated tables by default the only 100 of the most recent blocks for each partition are deduplicated (see [replicated_deduplication_window](merge-tree-settings.md/#replicated-deduplication-window), [replicated_deduplication_window_seconds](merge-tree-settings.md/#replicated-deduplication-window-seconds)).
For not replicated tables see [non_replicated_deduplication_window](merge-tree-settings.md/#non-replicated-deduplication-window).
## async_insert_deduplicate {#settings-async-insert-deduplicate}
Enables or disables insert deduplication of `ASYNC INSERT` (for Replicated\* tables).
Possible values:
- 0 — Disabled.
- 1 — Enabled.
Default value: 1.
By default, async inserts are inserted into replicated tables by the `INSERT` statement enabling [async_isnert](#async-insert) are deduplicated (see [Data Replication](../../engines/table-engines/mergetree-family/replication.md)).
For the replicated tables, by default, only 10000 of the most recent inserts for each partition are deduplicated (see [replicated_deduplication_window_for_async_inserts](merge-tree-settings.md/#replicated-deduplication-window-async-inserts), [replicated_deduplication_window_seconds_for_async_inserts](merge-tree-settings.md/#replicated-deduplication-window-seconds-async-inserts)).
We recommend enabling the [async_block_ids_cache](merge-tree-settings.md/#use-async-block-ids-cache) to increase the efficiency of deduplication.
This function does not work for non-replicated tables.
## deduplicate_blocks_in_dependent_materialized_views {#settings-deduplicate-blocks-in-dependent-materialized-views}
Enables or disables the deduplication check for materialized views that receive data from Replicated\* tables.

View File

@ -72,3 +72,10 @@ If procfs is supported and enabled on the system, ClickHouse server collects the
- `OSWriteChars`
- `OSReadBytes`
- `OSWriteBytes`
## Related content
- Blog: [System Tables and a window into the internals of ClickHouse](https://clickhouse.com/blog/clickhouse-debugging-issues-with-system-tables)
- Blog: [Essential monitoring queries - part 1 - INSERT queries](https://clickhouse.com/blog/monitoring-troubleshooting-insert-queries-clickhouse)
- Blog: [Essential monitoring queries - part 2 - SELECT queries](https://clickhouse.com/blog/monitoring-troubleshooting-select-queries-clickhouse)

View File

@ -54,7 +54,9 @@ Functions:
- [toLowCardinality](../../sql-reference/functions/type-conversion-functions.md#tolowcardinality)
## See Also
## Related content
- [Reducing ClickHouse Storage Cost with the Low Cardinality Type Lessons from an Instana Engineer](https://www.instana.com/blog/reducing-clickhouse-storage-cost-with-the-low-cardinality-type-lessons-from-an-instana-engineer/).
- [String Optimization (video presentation in Russian)](https://youtu.be/rqf-ILRgBdY?list=PL0Z2YDlm0b3iwXCpEFiOOYmwXzVmjJfEt). [Slides in English](https://github.com/ClickHouse/clickhouse-presentations/raw/master/meetup19/string_optimization.pdf).
- [Reducing ClickHouse Storage Cost with the Low Cardinality Type Lessons from an Instana Engineer](https://www.instana.com/blog/reducing-clickhouse-storage-cost-with-the-low-cardinality-type-lessons-from-an-instana-engineer/)
- [String Optimization (video presentation in Russian)](https://youtu.be/rqf-ILRgBdY?list=PL0Z2YDlm0b3iwXCpEFiOOYmwXzVmjJfEt). [Slides in English](https://github.com/ClickHouse/clickhouse-presentations/raw/master/meetup19/string_optimization.pdf)
- Blog: [Optimizing ClickHouse with Schemas and Codecs](https://clickhouse.com/blog/optimize-clickhouse-codecs-compression-schema)
- Blog: [Working with time series data in ClickHouse](https://clickhouse.com/blog/working-with-time-series-data-and-functions-ClickHouse)

View File

@ -1582,3 +1582,8 @@ Result:
│ 2020-01-01 │
└────────────────────────────────────┘
```
## Related content
- Blog: [Working with time series data in ClickHouse](https://clickhouse.com/blog/working-with-time-series-data-and-functions-ClickHouse)

View File

@ -588,3 +588,6 @@ Result:
│ aeca2A │
└───────────────────────────────────────┘
```
## Related content
- Blog: [Generating random data in ClickHouse](https://clickhouse.com/blog/generating-random-test-distribution-data-for-clickhouse)

View File

@ -115,3 +115,7 @@ Returns the exclusive upper bound of the corresponding hopping window.
hopEnd(bounds_tuple);
hopEnd(time_attr, hop_interval, window_interval [, timezone]);
```
## Related content
- Blog: [Working with time series data in ClickHouse](https://clickhouse.com/blog/working-with-time-series-data-and-functions-ClickHouse)

View File

@ -1402,6 +1402,8 @@ The output value is a timestamp in UTC, not in the timezone of `DateTime64`.
```sql
toUnixTimestamp64Milli(value)
toUnixTimestamp64Micro(value)
toUnixTimestamp64Nano(value)
```
**Arguments**
@ -1455,7 +1457,9 @@ Converts an `Int64` to a `DateTime64` value with fixed sub-second precision and
**Syntax**
``` sql
fromUnixTimestamp64Milli(value [, ti])
fromUnixTimestamp64Milli(value [, timezone])
fromUnixTimestamp64Micro(value [, timezone])
fromUnixTimestamp64Nano(value [, timezone])
```
**Arguments**

View File

@ -158,8 +158,6 @@ For examples of columns TTL modifying, see [Column TTL](/docs/en/engines/table-e
If the `IF EXISTS` clause is specified, the query wont return an error if the column does not exist.
The query also can change the order of the columns using `FIRST | AFTER` clause, see [ADD COLUMN](#alter_add-column) description.
When changing the type, values are converted as if the [toType](/docs/en/sql-reference/functions/type-conversion-functions.md) functions were applied to them. If only the default expression is changed, the query does not do anything complex, and is completed almost instantly.
Example:
@ -170,6 +168,40 @@ ALTER TABLE visits MODIFY COLUMN browser Array(String)
Changing the column type is the only complex action it changes the contents of files with data. For large tables, this may take a long time.
The query also can change the order of the columns using `FIRST | AFTER` clause, see [ADD COLUMN](#alter_add-column) description, but column type is mandatory in this case.
Example:
```sql
CREATE TABLE users (
c1 Int16,
c2 String
) ENGINE = MergeTree
ORDER BY c1;
DESCRIBE users;
┌─name─┬─type───┬
│ c1 │ Int16 │
│ c2 │ String │
└──────┴────────┴
ALTER TABLE users MODIFY COLUMN c2 String FIRST;
DESCRIBE users;
┌─name─┬─type───┬
│ c2 │ String │
│ c1 │ Int16 │
└──────┴────────┴
ALTER TABLE users ALTER COLUMN c2 TYPE String AFTER c1;
DESCRIBE users;
┌─name─┬─type───┬
│ c1 │ Int16 │
│ c2 │ String │
└──────┴────────┴
```
The `ALTER` query is atomic. For MergeTree tables it is also lock-free.
The `ALTER` query for changing columns is replicated. The instructions are saved in ZooKeeper, then each replica applies them. All `ALTER` queries are run in the same order. The query waits for the appropriate actions to be completed on the other replicas. However, a query to change columns in a replicated table can be interrupted, and all actions will be performed asynchronously.

View File

@ -502,3 +502,9 @@ Result:
│ t1 │ The temporary table │
└──────┴─────────────────────┘
```
## Related content
- Blog: [Optimizing ClickHouse with Schemas and Codecs](https://clickhouse.com/blog/optimize-clickhouse-codecs-compression-schema)
- Blog: [Working with time series data in ClickHouse](https://clickhouse.com/blog/working-with-time-series-data-and-functions-ClickHouse)

View File

@ -350,3 +350,7 @@ The window view is useful in the following scenarios:
* **Monitoring**: Aggregate and calculate the metrics logs by time, and output the results to a target table. The dashboard can use the target table as a source table.
* **Analyzing**: Automatically aggregate and preprocess data in the time window. This can be useful when analyzing a large number of logs. The preprocessing eliminates repeated calculations in multiple queries and reduces query latency.
## Related Content
- Blog: [Working with time series data in ClickHouse](https://clickhouse.com/blog/working-with-time-series-data-and-functions-ClickHouse)

View File

@ -299,3 +299,8 @@ ARRAY JOIN nest AS n, arrayEnumerate(`nest.x`) AS num;
## Implementation Details
The query execution order is optimized when running `ARRAY JOIN`. Although `ARRAY JOIN` must always be specified before the [WHERE](../../../sql-reference/statements/select/where.md)/[PREWHERE](../../../sql-reference/statements/select/prewhere.md) clause in a query, technically they can be performed in any order, unless result of `ARRAY JOIN` is used for filtering. The processing order is controlled by the query optimizer.
## Related content
- Blog: [Working with time series data in ClickHouse](https://clickhouse.com/blog/working-with-time-series-data-and-functions-ClickHouse)

View File

@ -543,3 +543,7 @@ Result:
│ 7 │ original │ 7 │
└─────┴──────────┴───────┘
```
## Related content
- Blog: [Working with time series data in ClickHouse](https://clickhouse.com/blog/working-with-time-series-data-and-functions-ClickHouse)

View File

@ -72,7 +72,7 @@ For more convenient (automatic) cache management, see disable_internal_dns_cache
## DROP MARK CACHE
Resets the mark cache. Used in development of ClickHouse and performance tests.
Resets the mark cache.
## DROP REPLICA
@ -94,13 +94,18 @@ The fourth one is useful to remove metadata of dead replica when all other repli
## DROP UNCOMPRESSED CACHE
Reset the uncompressed data cache. Used in development of ClickHouse and performance tests.
For manage uncompressed data cache parameters use following server level settings [uncompressed_cache_size](../../operations/server-configuration-parameters/settings.md#server-settings-uncompressed_cache_size) and query/user/profile level settings [use_uncompressed_cache](../../operations/settings/settings.md#setting-use_uncompressed_cache)
Reset the uncompressed data cache.
The uncompressed data cache is enabled/disabled with the query/user/profile-level setting [use_uncompressed_cache](../../operations/settings/settings.md#setting-use_uncompressed_cache).
Its size can be configured using the server-level setting [uncompressed_cache_size](../../operations/server-configuration-parameters/settings.md#server-settings-uncompressed_cache_size).
## DROP COMPILED EXPRESSION CACHE
Reset the compiled expression cache. Used in development of ClickHouse and performance tests.
Compiled expression cache used when query/user/profile enable option [compile-expressions](../../operations/settings/settings.md#compile-expressions)
Reset the compiled expression cache.
The compiled expression cache is enabled/disabled with the query/user/profile-level setting [compile_expressions](../../operations/settings/settings.md#compile-expressions).
## DROP QUERY RESULT CACHE
Resets the [query result cache](../../operations/query-result-cache.md).
## FLUSH LOGS

View File

@ -51,4 +51,7 @@ SELECT * FROM random;
│ [] │ 68091.8197 │ ('2037-10-02 12:44:23.368','039ecab7-81c2-45ee-208c-844e5c6c5652') │
│ [8,-83,0,-22,65,9,-30,28,64] │ -186233.4909 │ ('2062-01-11 00:06:04.124','69563ea1-5ad1-f870-16d8-67061da0df25') │
└──────────────────────────────┴──────────────┴────────────────────────────────────────────────────────────────────┘
```
```
## Related content
- Blog: [Generating random data in ClickHouse](https://clickhouse.com/blog/generating-random-test-distribution-data-for-clickhouse)

View File

@ -131,3 +131,6 @@ CREATE TABLE pg_table_schema_with_dots (a UInt32)
- [The PostgreSQL table engine](../../engines/table-engines/integrations/postgresql.md)
- [Using PostgreSQL as a dictionary source](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md#dicts-external_dicts_dict_sources-postgresql)
## Related content
- Blog: [ClickHouse and PostgreSQL - a match made in data heaven - part 1](https://clickhouse.com/blog/migrating-data-between-clickhouse-postgres)

View File

@ -590,5 +590,6 @@ ORDER BY
## Related Content
- [Window and array functions for Git commit sequences](https://clickhouse.com/blog/clickhouse-window-array-functions-git-commits)
- [Getting Data Into ClickHouse - Part 3 - Using S3](https://clickhouse.com/blog/getting-data-into-clickhouse-part-3-s3)
- Blog: [Working with time series data in ClickHouse](https://clickhouse.com/blog/working-with-time-series-data-and-functions-ClickHouse)
- Blog: [Window and array functions for Git commit sequences](https://clickhouse.com/blog/clickhouse-window-array-functions-git-commits)
- Blog: [Getting Data Into ClickHouse - Part 3 - Using S3](https://clickhouse.com/blog/getting-data-into-clickhouse-part-3-s3)

View File

@ -1 +1 @@
See https://github.com/ClickHouse/ClickHouse/blob/master/docs/tools/README.md
See https://github.com/ClickHouse/clickhouse-docs/blob/main/contrib-writing-guide.md

View File

@ -1517,6 +1517,15 @@ try
if (mmap_cache_size)
global_context->setMMappedFileCache(mmap_cache_size);
/// A cache for query results.
size_t query_result_cache_size = config().getUInt64("query_result_cache.size", 1_GiB);
if (query_result_cache_size)
global_context->setQueryResultCache(
query_result_cache_size,
config().getUInt64("query_result_cache.max_entries", 1024),
config().getUInt64("query_result_cache.max_entry_size", 1_MiB),
config().getUInt64("query_result_cache.max_entry_records", 30'000'000));
#if USE_EMBEDDED_COMPILER
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;

View File

@ -1453,6 +1453,14 @@
</rocksdb>
-->
<!-- Configuration for the query result cache -->
<!-- <query_result_cache> -->
<!-- <size>1073741824</size> -->
<!-- <max_entries>1024</max_entries> -->
<!-- <max_entry_size>1048576</max_entry_size> -->
<!-- <max_entry_records>30000000</max_entry_records> -->
<!-- </query_result_cache> -->
<!-- Uncomment if enable merge tree metadata cache -->
<!--merge_tree_metadata_cache>
<lru_cache_size>268435456</lru_cache_size>

View File

@ -575,7 +575,9 @@ UUID AccessControl::authenticate(const Credentials & credentials, const Poco::Ne
/// We use the same message for all authentication failures because we don't want to give away any unnecessary information for security reasons,
/// only the log will show the exact reason.
throw Exception(message.str(), ErrorCodes::AUTHENTICATION_FAILED);
throw Exception(PreformattedMessage{message.str(),
"{}: Authentication failed: password is incorrect, or there is no user with such name.{}"},
ErrorCodes::AUTHENTICATION_FAILED);
}
}

View File

@ -142,6 +142,7 @@ enum class AccessType
M(SYSTEM_DROP_MARK_CACHE, "SYSTEM DROP MARK, DROP MARK CACHE, DROP MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_UNCOMPRESSED_CACHE, "SYSTEM DROP UNCOMPRESSED, DROP UNCOMPRESSED CACHE, DROP UNCOMPRESSED", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_QUERY_RESULT_CACHE, "SYSTEM DROP QUERY RESULT, DROP QUERY RESULT CACHE, DROP QUERY RESULT", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \

View File

@ -118,8 +118,7 @@ public:
const auto * y_arg = arguments.at(1).get();
if (!x_arg->isValueRepresentedByNumber() || !y_arg->isValueRepresentedByNumber())
throw Exception("Illegal types of arguments of aggregate function " + getName() + ", must have number representation.",
ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal types of arguments of aggregate function {}, must have number representation.", getName());
}
bool allocatesMemoryInArena() const override { return false; }

View File

@ -226,7 +226,7 @@ public:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
if (!this->data(place).size_x || !this->data(place).size_y)
throw Exception("Aggregate function " + getName() + " require both samples to be non empty", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Aggregate function {} require both samples to be non empty", getName());
auto [u_statistic, p_value] = this->data(place).getResult(alternative, continuity_correction);

View File

@ -0,0 +1,253 @@
#include <Analyzer/Passes/GroupingFunctionsResolvePass.h>
#include <Core/ColumnNumbers.h>
#include <Functions/grouping.h>
#include <Interpreters/Context.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/HashUtils.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/ColumnNode.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
namespace
{
enum class GroupByKind
{
ORDINARY,
ROLLUP,
CUBE,
GROUPING_SETS
};
class GroupingFunctionResolveVisitor : public InDepthQueryTreeVisitor<GroupingFunctionResolveVisitor>
{
public:
GroupingFunctionResolveVisitor(GroupByKind group_by_kind_,
QueryTreeNodePtrWithHashMap<size_t> aggregation_key_to_index_,
ColumnNumbersList grouping_sets_keys_indices_,
ContextPtr context_)
: group_by_kind(group_by_kind_)
, aggregation_key_to_index(std::move(aggregation_key_to_index_))
, grouping_sets_keys_indexes(std::move(grouping_sets_keys_indices_))
, context(std::move(context_))
{
}
void visitImpl(const QueryTreeNodePtr & node)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node || function_node->getFunctionName() != "grouping")
return;
auto & function_arguments = function_node->getArguments().getNodes();
ColumnNumbers arguments_indexes;
arguments_indexes.reserve(function_arguments.size());
for (const auto & argument : function_arguments)
{
auto it = aggregation_key_to_index.find(argument);
if (it == aggregation_key_to_index.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Argument {} of GROUPING function is not a part of GROUP BY clause",
argument->formatASTForErrorMessage());
arguments_indexes.push_back(it->second);
}
FunctionOverloadResolverPtr grouping_function_resolver;
bool add_grouping_set_column = false;
bool force_grouping_standard_compatibility = context->getSettingsRef().force_grouping_standard_compatibility;
size_t aggregation_keys_size = aggregation_key_to_index.size();
switch (group_by_kind)
{
case GroupByKind::ORDINARY:
{
auto grouping_ordinary_function = std::make_shared<FunctionGroupingOrdinary>(arguments_indexes,
force_grouping_standard_compatibility);
grouping_function_resolver = std::make_shared<FunctionToOverloadResolverAdaptor>(std::move(grouping_ordinary_function));
break;
}
case GroupByKind::ROLLUP:
{
auto grouping_rollup_function = std::make_shared<FunctionGroupingForRollup>(arguments_indexes,
aggregation_keys_size,
force_grouping_standard_compatibility);
grouping_function_resolver = std::make_shared<FunctionToOverloadResolverAdaptor>(std::move(grouping_rollup_function));
add_grouping_set_column = true;
break;
}
case GroupByKind::CUBE:
{
auto grouping_cube_function = std::make_shared<FunctionGroupingForCube>(arguments_indexes,
aggregation_keys_size,
force_grouping_standard_compatibility);
grouping_function_resolver = std::make_shared<FunctionToOverloadResolverAdaptor>(std::move(grouping_cube_function));
add_grouping_set_column = true;
break;
}
case GroupByKind::GROUPING_SETS:
{
auto grouping_grouping_sets_function = std::make_shared<FunctionGroupingForGroupingSets>(arguments_indexes,
grouping_sets_keys_indexes,
force_grouping_standard_compatibility);
grouping_function_resolver = std::make_shared<FunctionToOverloadResolverAdaptor>(std::move(grouping_grouping_sets_function));
add_grouping_set_column = true;
break;
}
}
if (add_grouping_set_column)
{
QueryTreeNodeWeakPtr column_source;
auto grouping_set_column = NameAndTypePair{"__grouping_set", std::make_shared<DataTypeUInt64>()};
auto grouping_set_argument_column = std::make_shared<ColumnNode>(std::move(grouping_set_column), std::move(column_source));
function_arguments.insert(function_arguments.begin(), std::move(grouping_set_argument_column));
}
function_node->resolveAsFunction(grouping_function_resolver->build(function_node->getArgumentColumns()));
}
static bool needChildVisit(const QueryTreeNodePtr &, const QueryTreeNodePtr & child_node)
{
return !(child_node->getNodeType() == QueryTreeNodeType::QUERY || child_node->getNodeType() == QueryTreeNodeType::UNION);
}
private:
GroupByKind group_by_kind;
QueryTreeNodePtrWithHashMap<size_t> aggregation_key_to_index;
ColumnNumbersList grouping_sets_keys_indexes;
ContextPtr context;
};
void resolveGroupingFunctions(QueryTreeNodePtr & query_node, ContextPtr context)
{
auto & query_node_typed = query_node->as<QueryNode &>();
size_t aggregation_node_index = 0;
QueryTreeNodePtrWithHashMap<size_t> aggregation_key_to_index;
std::vector<QueryTreeNodes> grouping_sets_used_aggregation_keys_list;
if (query_node_typed.hasGroupBy())
{
/// It is expected by execution layer that if there are only 1 grouping set it will be removed
if (query_node_typed.isGroupByWithGroupingSets() && query_node_typed.getGroupBy().getNodes().size() == 1)
{
auto & grouping_set_list_node = query_node_typed.getGroupBy().getNodes().front()->as<ListNode &>();
query_node_typed.getGroupBy().getNodes() = std::move(grouping_set_list_node.getNodes());
query_node_typed.setIsGroupByWithGroupingSets(false);
}
if (query_node_typed.isGroupByWithGroupingSets())
{
for (const auto & grouping_set_keys_list_node : query_node_typed.getGroupBy().getNodes())
{
auto & grouping_set_keys_list_node_typed = grouping_set_keys_list_node->as<ListNode &>();
grouping_sets_used_aggregation_keys_list.emplace_back();
auto & grouping_sets_used_aggregation_keys = grouping_sets_used_aggregation_keys_list.back();
for (auto & grouping_set_key_node : grouping_set_keys_list_node_typed.getNodes())
{
if (aggregation_key_to_index.contains(grouping_set_key_node))
continue;
grouping_sets_used_aggregation_keys.push_back(grouping_set_key_node);
aggregation_key_to_index.emplace(grouping_set_key_node, aggregation_node_index);
++aggregation_node_index;
}
}
}
else
{
for (auto & group_by_key_node : query_node_typed.getGroupBy().getNodes())
{
if (aggregation_key_to_index.contains(group_by_key_node))
continue;
aggregation_key_to_index.emplace(group_by_key_node, aggregation_node_index);
++aggregation_node_index;
}
}
}
/// Indexes of aggregation keys used in each grouping set (only for GROUP BY GROUPING SETS)
ColumnNumbersList grouping_sets_keys_indexes;
for (const auto & grouping_set_used_aggregation_keys : grouping_sets_used_aggregation_keys_list)
{
grouping_sets_keys_indexes.emplace_back();
auto & grouping_set_keys_indexes = grouping_sets_keys_indexes.back();
for (const auto & used_aggregation_key : grouping_set_used_aggregation_keys)
{
auto aggregation_node_index_it = aggregation_key_to_index.find(used_aggregation_key);
if (aggregation_node_index_it == aggregation_key_to_index.end())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Aggregation key {} in GROUPING SETS is not found in GROUP BY keys",
used_aggregation_key->formatASTForErrorMessage());
grouping_set_keys_indexes.push_back(aggregation_node_index_it->second);
}
}
GroupByKind group_by_kind = GroupByKind::ORDINARY;
if (query_node_typed.isGroupByWithRollup())
group_by_kind = GroupByKind::ROLLUP;
else if (query_node_typed.isGroupByWithCube())
group_by_kind = GroupByKind::CUBE;
else if (query_node_typed.isGroupByWithGroupingSets())
group_by_kind = GroupByKind::GROUPING_SETS;
GroupingFunctionResolveVisitor visitor(group_by_kind,
std::move(aggregation_key_to_index),
std::move(grouping_sets_keys_indexes),
std::move(context));
visitor.visit(query_node);
}
class GroupingFunctionsResolveVisitor : public InDepthQueryTreeVisitor<GroupingFunctionsResolveVisitor>
{
public:
explicit GroupingFunctionsResolveVisitor(ContextPtr context_)
: context(std::move(context_))
{}
void visitImpl(QueryTreeNodePtr & node)
{
if (node->getNodeType() != QueryTreeNodeType::QUERY)
return;
resolveGroupingFunctions(node, context);
}
private:
ContextPtr context;
};
}
void GroupingFunctionsResolvePass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
{
GroupingFunctionsResolveVisitor visitor(std::move(context));
visitor.visit(query_tree_node);
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Analyzer/IQueryTreePass.h>
namespace DB
{
/** Resolve GROUPING functions in query node.
* GROUPING function is replaced with specialized GROUPING function based on GROUP BY modifiers.
* For ROLLUP, CUBE, GROUPING SETS specialized GROUPING function take special __grouping_set column as argument
* and previous GROUPING function arguments.
*
* Example: SELECT grouping(id) FROM test_table GROUP BY id;
* Result: SELECT groupingOrdinary(id) FROM test_table GROUP BY id;
*
* Example: SELECT grouping(id), grouping(value) FROM test_table GROUP BY GROUPING SETS ((id), (value));
* Result: SELECT groupingForGroupingSets(__grouping_set, id), groupingForGroupingSets(__grouping_set, value)
* FROM test_table GROUP BY GROUPING SETS ((id), (value));
*/
class GroupingFunctionsResolvePass final : public IQueryTreePass
{
public:
String getName() override { return "GroupingFunctionsResolvePass"; }
String getDescription() override { return "Resolve GROUPING functions based on GROUP BY modifiers"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
};
}

View File

@ -4352,7 +4352,8 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
bool force_grouping_standard_compatibility = scope.context->getSettingsRef().force_grouping_standard_compatibility;
auto grouping_function = std::make_shared<FunctionGrouping>(force_grouping_standard_compatibility);
auto grouping_function_adaptor = std::make_shared<FunctionToOverloadResolverAdaptor>(std::move(grouping_function));
function_node.resolveAsFunction(grouping_function_adaptor->build({}));
function_node.resolveAsFunction(grouping_function_adaptor->build(argument_columns));
return result_projection_names;
}
}

View File

@ -32,6 +32,7 @@
#include <Analyzer/Passes/IfTransformStringsToEnumPass.h>
#include <Analyzer/Passes/ConvertOrLikeChainPass.h>
#include <Analyzer/Passes/OptimizeRedundantFunctionsInOrderByPass.h>
#include <Analyzer/Passes/GroupingFunctionsResolvePass.h>
namespace DB
{
@ -67,7 +68,7 @@ public:
private:
void visitColumn(ColumnNode * column) const
{
if (column->getColumnSourceOrNull() == nullptr)
if (column->getColumnSourceOrNull() == nullptr && column->getColumnName() != "__grouping_set")
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Column {} {} query tree node does not have valid source node after running {} pass",
column->getColumnName(), column->getColumnType(), pass_name);
@ -258,6 +259,8 @@ void addQueryTreePasses(QueryTreePassManager & manager)
manager.addPass(std::make_unique<IfTransformStringsToEnumPass>());
manager.addPass(std::make_unique<ConvertOrLikeChainPass>());
manager.addPass(std::make_unique<GroupingFunctionsResolvePass>());
}
}

View File

@ -130,7 +130,7 @@ BackupEntries BackupEntriesCollector::run()
Strings BackupEntriesCollector::setStage(const String & new_stage, const String & message)
{
LOG_TRACE(log, "{}", toUpperFirst(new_stage));
LOG_TRACE(log, fmt::runtime(toUpperFirst(new_stage)));
current_stage = new_stage;
backup_coordination->setStage(backup_settings.host_id, new_stage, message);
@ -215,7 +215,7 @@ void BackupEntriesCollector::gatherMetadataAndCheckConsistency()
if (std::chrono::steady_clock::now() > consistent_metadata_snapshot_end_time)
inconsistency_error->rethrow();
else
LOG_WARNING(log, "{}", inconsistency_error->displayText());
LOG_WARNING(log, getExceptionMessageAndPattern(*inconsistency_error, /* with_stacktrace */ false));
}
auto sleep_time = getSleepTimeAfterInconsistencyError(pass);

View File

@ -7,6 +7,7 @@
#include <Storages/IStorage_fwd.h>
#include <Storages/TableLockHolder.h>
#include <filesystem>
#include <queue>
namespace DB

View File

@ -19,7 +19,7 @@ BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
std::unique_ptr<SeekableReadBuffer> BackupEntryFromAppendOnlyFile::getReadBuffer() const
{
auto buf = BackupEntryFromImmutableFile::getReadBuffer();
return std::make_unique<LimitSeekableReadBuffer>(std::move(buf), limit);
return std::make_unique<LimitSeekableReadBuffer>(std::move(buf), 0, limit);
}
}

View File

@ -12,16 +12,19 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
void IBackupWriter::copyFileThroughBuffer(std::unique_ptr<SeekableReadBuffer> && source, const String & file_name)
void IBackupWriter::copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name)
{
auto write_buffer = writeFile(file_name);
copyData(*source, *write_buffer);
auto read_buffer = create_read_buffer();
if (offset)
read_buffer->seek(offset, SEEK_SET);
auto write_buffer = writeFile(dest_file_name);
copyData(*read_buffer, *write_buffer, size);
write_buffer->finalize();
}
void IBackupWriter::copyFileNative(DiskPtr /* from_disk */, const String & /* file_name_from */, const String & /* file_name_to */)
void IBackupWriter::copyFileNative(
DiskPtr /* src_disk */, const String & /* src_file_name */, UInt64 /* src_offset */, UInt64 /* src_size */, const String & /* dest_file_name */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Native copy not implemented for backup writer");
}
}

View File

@ -24,6 +24,8 @@ public:
class IBackupWriter /// BackupWriterFile, BackupWriterDisk
{
public:
using CreateReadBufferFunction = std::function<std::unique_ptr<SeekableReadBuffer>()>;
virtual ~IBackupWriter() = default;
virtual bool fileExists(const String & file_name) = 0;
virtual UInt64 getFileSize(const String & file_name) = 0;
@ -32,14 +34,9 @@ public:
virtual void removeFile(const String & file_name) = 0;
virtual void removeFiles(const Strings & file_names) = 0;
virtual DataSourceDescription getDataSourceDescription() const = 0;
virtual void copyFileThroughBuffer(std::unique_ptr<SeekableReadBuffer> && source, const String & file_name);
virtual bool supportNativeCopy(DataSourceDescription /* data_source_description */) const
{
return false;
}
virtual void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to);
virtual void copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name);
virtual bool supportNativeCopy(DataSourceDescription /* data_source_description */) const { return false; }
virtual void copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name);
};
}

View File

@ -105,13 +105,21 @@ bool BackupWriterDisk::supportNativeCopy(DataSourceDescription data_source_descr
return data_source_description == disk->getDataSourceDescription();
}
void BackupWriterDisk::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
void BackupWriterDisk::copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name)
{
if (!from_disk)
if (!src_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot natively copy data to disk without source disk");
auto file_path = path / file_name_to;
if ((src_offset != 0) || (src_size != src_disk->getFileSize(src_file_name)))
{
auto create_read_buffer = [src_disk, src_file_name] { return src_disk->readFile(src_file_name); };
copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name);
return;
}
auto file_path = path / dest_file_name;
disk->createDirectories(file_path.parent_path());
from_disk->copyFile(file_name_from, *disk, file_path);
src_disk->copyFile(src_file_name, *disk, file_path);
}
}

View File

@ -39,8 +39,8 @@ public:
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name) override;
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
private:
DiskPtr disk;
std::filesystem::path path;

View File

@ -125,17 +125,24 @@ bool BackupWriterFile::supportNativeCopy(DataSourceDescription data_source_descr
return data_source_description == getDataSourceDescription();
}
void BackupWriterFile::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
void BackupWriterFile::copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name)
{
auto file_path = path / file_name_to;
fs::create_directories(file_path.parent_path());
std::string abs_source_path;
if (from_disk)
abs_source_path = fullPath(from_disk, file_name_from);
if (src_disk)
abs_source_path = fullPath(src_disk, src_file_name);
else
abs_source_path = fs::absolute(file_name_from);
abs_source_path = fs::absolute(src_file_name);
fs::copy(abs_source_path, file_path, fs::copy_options::recursive | fs::copy_options::overwrite_existing);
if ((src_offset != 0) || (src_size != fs::file_size(abs_source_path)))
{
auto create_read_buffer = [abs_source_path] { return createReadBufferFromFileBase(abs_source_path, {}); };
copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name);
return;
}
auto file_path = path / dest_file_name;
fs::create_directories(file_path.parent_path());
fs::copy(abs_source_path, file_path, fs::copy_options::overwrite_existing);
}
}

View File

@ -35,8 +35,7 @@ public:
void removeFiles(const Strings & file_names) override;
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
void copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name) override;
private:
std::filesystem::path path;

View File

@ -4,17 +4,19 @@
#include <Common/quoteString.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Interpreters/Context.h>
#include <Storages/StorageS3Settings.h>
#include <IO/IOThreadPool.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/HTTPHeaderEntries.h>
#include <IO/S3/copyDataToS3.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <filesystem>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <filesystem>
namespace fs = std::filesystem;
@ -24,7 +26,6 @@ namespace DB
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int INVALID_CONFIG_PARAMETER;
extern const int LOGICAL_ERROR;
}
@ -150,177 +151,33 @@ bool BackupWriterS3::supportNativeCopy(DataSourceDescription data_source_descrip
return getDataSourceDescription() == data_source_description;
}
void BackupWriterS3::copyObjectImpl(
const String & src_bucket,
const String & src_key,
const String & dst_bucket,
const String & dst_key,
size_t size,
const std::optional<ObjectAttributes> & metadata) const
void BackupWriterS3::copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name)
{
LOG_TRACE(log, "Copying {} bytes using single-operation copy", size);
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(src_bucket + "/" + src_key);
request.SetBucket(dst_bucket);
request.SetKey(dst_key);
if (metadata)
{
request.SetMetadata(*metadata);
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
}
auto outcome = client->CopyObject(request);
if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge"
|| outcome.GetError().GetExceptionName() == "InvalidRequest"))
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, size, metadata);
return;
}
if (!outcome.IsSuccess())
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
void BackupWriterS3::copyObjectMultipartImpl(
const String & src_bucket,
const String & src_key,
const String & dst_bucket,
const String & dst_key,
size_t size,
const std::optional<ObjectAttributes> & metadata) const
{
LOG_TRACE(log, "Copying {} bytes using multipart upload copy", size);
String multipart_upload_id;
{
Aws::S3::Model::CreateMultipartUploadRequest request;
request.SetBucket(dst_bucket);
request.SetKey(dst_key);
if (metadata)
request.SetMetadata(*metadata);
auto outcome = client->CreateMultipartUpload(request);
if (!outcome.IsSuccess())
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
multipart_upload_id = outcome.GetResult().GetUploadId();
}
std::vector<String> part_tags;
size_t position = 0;
const auto & settings = request_settings.getUploadSettings();
size_t upload_part_size = settings.min_upload_part_size;
for (size_t part_number = 1; position < size; ++part_number)
{
/// Check that part number is not too big.
if (part_number > settings.max_part_number)
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Part number exceeded {} while writing {} bytes to S3. Check min_upload_part_size = {}, max_upload_part_size = {}, "
"upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, max_single_operation_copy_size = {}",
settings.max_part_number, size, settings.min_upload_part_size, settings.max_upload_part_size,
settings.upload_part_size_multiply_factor, settings.upload_part_size_multiply_parts_count_threshold,
settings.max_single_operation_copy_size);
}
size_t next_position = std::min(position + upload_part_size, size);
/// Make a copy request to copy a part.
Aws::S3::Model::UploadPartCopyRequest part_request;
part_request.SetCopySource(src_bucket + "/" + src_key);
part_request.SetBucket(dst_bucket);
part_request.SetKey(dst_key);
part_request.SetUploadId(multipart_upload_id);
part_request.SetPartNumber(static_cast<int>(part_number));
part_request.SetCopySourceRange(fmt::format("bytes={}-{}", position, next_position - 1));
auto outcome = client->UploadPartCopy(part_request);
if (!outcome.IsSuccess())
{
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
abort_request.SetBucket(dst_bucket);
abort_request.SetKey(dst_key);
abort_request.SetUploadId(multipart_upload_id);
client->AbortMultipartUpload(abort_request);
// In error case we throw exception later with first error from UploadPartCopy
}
if (!outcome.IsSuccess())
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
auto etag = outcome.GetResult().GetCopyPartResult().GetETag();
part_tags.push_back(etag);
position = next_position;
/// Maybe increase `upload_part_size` (we need to increase it sometimes to keep `part_number` less or equal than `max_part_number`).
if (part_number % settings.upload_part_size_multiply_parts_count_threshold == 0)
{
upload_part_size *= settings.upload_part_size_multiply_factor;
upload_part_size = std::min(upload_part_size, settings.max_upload_part_size);
}
}
{
Aws::S3::Model::CompleteMultipartUploadRequest req;
req.SetBucket(dst_bucket);
req.SetKey(dst_key);
req.SetUploadId(multipart_upload_id);
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
for (size_t i = 0; i < part_tags.size(); ++i)
{
Aws::S3::Model::CompletedPart part;
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(static_cast<int>(i) + 1));
}
req.SetMultipartUpload(multipart_upload);
auto outcome = client->CompleteMultipartUpload(req);
if (!outcome.IsSuccess())
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
}
void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
{
if (!from_disk)
if (!src_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot natively copy data to disk without source disk");
auto objects = from_disk->getStorageObjects(file_name_from);
auto objects = src_disk->getStorageObjects(src_file_name);
if (objects.size() > 1)
{
copyFileThroughBuffer(from_disk->readFile(file_name_from), file_name_to);
auto create_read_buffer = [src_disk, src_file_name] { return src_disk->readFile(src_file_name); };
copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name);
}
else
{
auto object_storage = from_disk->getObjectStorage();
std::string source_bucket = object_storage->getObjectsNamespace();
auto file_path = fs::path(s3_uri.key) / file_name_to;
auto size = S3::getObjectSize(*client, source_bucket, objects[0].absolute_path);
if (size < request_settings.getUploadSettings().max_single_operation_copy_size)
{
copyObjectImpl(
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size);
}
else
{
copyObjectMultipartImpl(
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size);
}
auto object_storage = src_disk->getObjectStorage();
std::string src_bucket = object_storage->getObjectsNamespace();
auto file_path = fs::path(s3_uri.key) / dest_file_name;
copyFileS3ToS3(client, src_bucket, objects[0].absolute_path, src_offset, src_size, s3_uri.bucket, file_path, request_settings, {},
threadPoolCallbackRunner<void>(IOThreadPool::get(), "BackupWriterS3"));
}
}
void BackupWriterS3::copyDataToFile(
const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name)
{
copyDataToS3(create_read_buffer, offset, size, client, s3_uri.bucket, fs::path(s3_uri.key) / dest_file_name, request_settings, {},
threadPoolCallbackRunner<void>(IOThreadPool::get(), "BackupWriterS3"));
}
BackupWriterS3::~BackupWriterS3() = default;

View File

@ -4,22 +4,11 @@
#if USE_AWS_S3
#include <Backups/BackupIO.h>
#include <IO/S3Common.h>
#include <IO/ReadSettings.h>
#include <IO/S3Common.h>
#include <Storages/StorageS3Settings.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/UploadPartCopyRequest.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/HeadObjectResult.h>
#include <aws/s3/model/ListObjectsV2Result.h>
namespace DB
{
@ -54,12 +43,15 @@ public:
UInt64 getFileSize(const String & file_name) override;
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
void copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name) override;
void removeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override;
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
void copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name) override;
private:
void copyObjectImpl(

View File

@ -874,23 +874,18 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
/// We need to copy whole file without archive, we can do it faster
/// if source and destination are compatible
if (!use_archives && info.base_size == 0 && writer->supportNativeCopy(reader_description))
if (!use_archives && writer->supportNativeCopy(reader_description))
{
/// Should be much faster than writing data through server.
LOG_TRACE(log, "Will copy file {} using native copy", adjusted_path);
/// NOTE: `mutex` must be unlocked here otherwise writing will be in one thread maximum and hence slow.
writer->copyFileNative(entry->tryGetDiskIfExists(), entry->getFilePath(), info.data_file_name);
writer->copyFileNative(entry->tryGetDiskIfExists(), entry->getFilePath(), info.base_size, info.size - info.base_size, info.data_file_name);
}
else
{
LOG_TRACE(log, "Will copy file {} through memory buffers", adjusted_path);
auto read_buffer = entry->getReadBuffer();
/// If we have prefix in base we will seek to the start of the suffix which differs
if (info.base_size != 0)
read_buffer->seek(info.base_size, SEEK_SET);
LOG_TRACE(log, "Will copy file {}", adjusted_path);
if (!num_files_written)
checkLockFile(true);
@ -919,13 +914,18 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
coordination->updateFileInfo(info);
}
auto out = getArchiveWriter(current_archive_suffix)->writeFile(info.data_file_name);
auto read_buffer = entry->getReadBuffer();
if (info.base_size != 0)
read_buffer->seek(info.base_size, SEEK_SET);
copyData(*read_buffer, *out);
out->finalize();
}
else
{
auto create_read_buffer = [entry] { return entry->getReadBuffer(); };
/// NOTE: `mutex` must be unlocked here otherwise writing will be in one thread maximum and hence slow.
writer->copyFileThroughBuffer(std::move(read_buffer), info.data_file_name);
writer->copyDataToFile(create_read_buffer, info.base_size, info.size - info.base_size, info.data_file_name);
}
}

View File

@ -145,7 +145,7 @@ RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
void RestorerFromBackup::setStage(const String & new_stage, const String & message)
{
LOG_TRACE(log, "{}", toUpperFirst(new_stage));
LOG_TRACE(log, fmt::runtime(toUpperFirst(new_stage)));
current_stage = new_stage;
if (restore_coordination)

View File

@ -58,9 +58,8 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::
auto table_status_it = status_response.table_states_by_id.find(*table_to_check);
if (table_status_it == status_response.table_states_by_id.end())
{
fail_message = fmt::format("There is no table {}.{} on server: {}",
backQuote(table_to_check->database), backQuote(table_to_check->table), result.entry->getDescription());
LOG_WARNING(log, fmt::runtime(fail_message));
LOG_WARNING(LogToStr(fail_message, log), "There is no table {}.{} on server: {}",
backQuote(table_to_check->database), backQuote(table_to_check->table), result.entry->getDescription());
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
return;
}

View File

@ -5,6 +5,7 @@
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
#include <Storages/IStorage.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Core/Protocol.h>

View File

@ -37,14 +37,12 @@ public:
using Mapped = TMapped;
using MappedPtr = std::shared_ptr<Mapped>;
CacheBase(size_t max_size, size_t max_elements_size = 0, String cache_policy_name = "", double size_ratio = 0.5)
explicit CacheBase(size_t max_size, size_t max_elements_size = 0, String cache_policy_name = "", double size_ratio = 0.5)
{
auto on_weight_loss_function = [&](size_t weight_loss) { onRemoveOverflowWeightLoss(weight_loss); };
if (cache_policy_name.empty())
{
cache_policy_name = default_cache_policy_name;
}
if (cache_policy_name == "LRU")
{

View File

@ -36,6 +36,7 @@
M(TemporaryFilesForJoin, "Number of temporary files created for JOIN") \
M(TemporaryFilesUnknown, "Number of temporary files created without known purpose") \
M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \
M(RemoteRead, "Number of read with remote reader in fly") \
M(Write, "Number of write (write, pwrite, io_getevents, etc.) syscalls in fly") \
M(NetworkReceive, "Number of threads receiving data from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSend, "Number of threads sending data to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \

View File

@ -71,6 +71,13 @@ Exception::MessageMasked::MessageMasked(const std::string & msg_)
masker->wipeSensitiveData(msg);
}
Exception::MessageMasked::MessageMasked(std::string && msg_)
: msg(std::move(msg_))
{
if (auto * masker = SensitiveDataMasker::getInstance())
masker->wipeSensitiveData(msg);
}
Exception::Exception(const MessageMasked & msg_masked, int code, bool remote_)
: Poco::Exception(msg_masked.msg, code)
, remote(remote_)
@ -78,6 +85,13 @@ Exception::Exception(const MessageMasked & msg_masked, int code, bool remote_)
handle_error_code(msg_masked.msg, code, remote, getStackFramePointers());
}
Exception::Exception(MessageMasked && msg_masked, int code, bool remote_)
: Poco::Exception(msg_masked.msg, code)
, remote(remote_)
{
handle_error_code(message(), code, remote, getStackFramePointers());
}
Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc)
: Poco::Exception(exc.displayText(), ErrorCodes::POCO_EXCEPTION)
{
@ -172,10 +186,11 @@ static void tryLogCurrentExceptionImpl(Poco::Logger * logger, const std::string
{
try
{
if (start_of_message.empty())
LOG_ERROR(logger, "{}", getCurrentExceptionMessage(true));
else
LOG_ERROR(logger, "{}: {}", start_of_message, getCurrentExceptionMessage(true));
PreformattedMessage message = getCurrentExceptionMessageAndPattern(true);
if (!start_of_message.empty())
message.message = fmt::format("{}: {}", start_of_message, message.message);
LOG_ERROR(logger, message);
}
catch (...)
{
@ -323,8 +338,14 @@ std::string getExtraExceptionInfo(const std::exception & e)
}
std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded_stacktrace /*= false*/, bool with_extra_info /*= true*/)
{
return getCurrentExceptionMessageAndPattern(with_stacktrace, check_embedded_stacktrace, with_extra_info).message;
}
PreformattedMessage getCurrentExceptionMessageAndPattern(bool with_stacktrace, bool check_embedded_stacktrace /*= false*/, bool with_extra_info /*= true*/)
{
WriteBufferFromOwnString stream;
std::string_view message_format_string;
try
{
@ -335,6 +356,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
stream << getExceptionMessage(e, with_stacktrace, check_embedded_stacktrace)
<< (with_extra_info ? getExtraExceptionInfo(e) : "")
<< " (version " << VERSION_STRING << VERSION_OFFICIAL << ")";
message_format_string = e.tryGetMessageFormatString();
}
catch (const Poco::Exception & e)
{
@ -380,7 +402,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
catch (...) {}
}
return stream.str();
return PreformattedMessage{stream.str(), message_format_string};
}
@ -433,14 +455,6 @@ int getExceptionErrorCode(std::exception_ptr e)
}
void rethrowFirstException(const Exceptions & exceptions)
{
for (const auto & exception : exceptions)
if (exception)
std::rethrow_exception(exception);
}
void tryLogException(std::exception_ptr e, const char * log_name, const std::string & start_of_message)
{
try
@ -466,6 +480,11 @@ void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::str
}
std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace)
{
return getExceptionMessageAndPattern(e, with_stacktrace, check_embedded_stacktrace).message;
}
PreformattedMessage getExceptionMessageAndPattern(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace)
{
WriteBufferFromOwnString stream;
@ -497,7 +516,7 @@ std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool
}
catch (...) {}
return stream.str();
return PreformattedMessage{stream.str(), e.tryGetMessageFormatString()};
}
std::string getExceptionMessage(std::exception_ptr e, bool with_stacktrace)

View File

@ -9,12 +9,32 @@
#include <base/defines.h>
#include <Common/StackTrace.h>
#include <Common/LoggingFormatStringHelpers.h>
#include <fmt/format.h>
namespace Poco { class Logger; }
/// Extract format string from a string literal and constructs consteval fmt::format_string
template <typename... Args>
struct FormatStringHelperImpl
{
std::string_view message_format_string;
fmt::format_string<Args...> fmt_str;
template<typename T>
consteval FormatStringHelperImpl(T && str) : message_format_string(tryGetStaticFormatString(str)), fmt_str(std::forward<T>(str)) {}
template<typename T>
FormatStringHelperImpl(fmt::basic_runtime<T> && str) : message_format_string(), fmt_str(std::forward<fmt::basic_runtime<T>>(str)) {}
PreformattedMessage format(Args && ...args) const
{
return PreformattedMessage{fmt::format(fmt_str, std::forward<Args...>(args)...), message_format_string};
}
};
template <typename... Args>
using FormatStringHelper = FormatStringHelperImpl<std::type_identity_t<Args>...>;
namespace DB
{
@ -33,22 +53,37 @@ public:
{
std::string msg;
MessageMasked(const std::string & msg_);
MessageMasked(std::string && msg_);
};
Exception(const MessageMasked & msg_masked, int code, bool remote_);
Exception(MessageMasked && msg_masked, int code, bool remote_);
// delegating constructor to mask sensitive information from the message
Exception(const std::string & msg, int code, bool remote_ = false): Exception(MessageMasked(msg), code, remote_)
{}
Exception(const std::string & msg, int code, bool remote_ = false): Exception(MessageMasked(msg), code, remote_) {}
Exception(std::string && msg, int code, bool remote_ = false): Exception(MessageMasked(std::move(msg)), code, remote_) {}
Exception(PreformattedMessage && msg, int code): Exception(std::move(msg.message), code)
{
message_format_string = msg.format_string;
}
Exception(int code, const std::string & message)
template<typename T, typename = std::enable_if_t<std::is_convertible_v<T, String>>>
Exception(int code, T && message)
: Exception(message, code)
{}
{
message_format_string = tryGetStaticFormatString(message);
}
template<> Exception(int code, const String & message) : Exception(message, code) {}
template<> Exception(int code, String & message) : Exception(message, code) {}
template<> Exception(int code, String && message) : Exception(std::move(message), code) {}
// Format message with fmt::format, like the logging functions.
template <typename... Args>
Exception(int code, fmt::format_string<Args...> fmt, Args &&... args) : Exception(fmt::format(fmt, std::forward<Args>(args)...), code)
Exception(int code, FormatStringHelper<Args...> fmt, Args &&... args)
: Exception(fmt::format(fmt.fmt_str, std::forward<Args>(args)...), code)
{
message_format_string = fmt.message_format_string;
}
struct CreateFromPocoTag {};
@ -87,6 +122,8 @@ public:
/// Used for system.errors
FramePointers getStackFramePointers() const;
std::string_view tryGetMessageFormatString() const { return message_format_string; }
private:
#ifndef STD_EXCEPTION_HAS_STACK_TRACE
StackTrace trace;
@ -94,6 +131,9 @@ private:
bool remote = false;
const char * className() const noexcept override { return "DB::Exception"; }
protected:
std::string_view message_format_string;
};
@ -131,14 +171,15 @@ public:
ParsingException();
ParsingException(const std::string & msg, int code);
ParsingException(int code, const std::string & message);
ParsingException(int code, std::string && message) : Exception(message, code) {}
// Format message with fmt::format, like the logging functions.
template <typename... Args>
ParsingException(int code, fmt::format_string<Args...> fmt, Args &&... args) : Exception(code, fmt, std::forward<Args>(args)...)
ParsingException(int code, FormatStringHelper<Args...> fmt, Args &&... args) : Exception(fmt::format(fmt.fmt_str, std::forward<Args>(args)...), code)
{
message_format_string = fmt.message_format_string;
}
std::string displayText() const override;
ssize_t getLineNumber() const { return line_number; }
@ -184,6 +225,8 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_
*/
std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded_stacktrace = false,
bool with_extra_info = true);
PreformattedMessage getCurrentExceptionMessageAndPattern(bool with_stacktrace, bool check_embedded_stacktrace = false,
bool with_extra_info = true);
/// Returns error code from ErrorCodes
int getCurrentExceptionCode();
@ -219,12 +262,10 @@ void tryLogException(std::exception_ptr e, const char * log_name, const std::str
void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::string & start_of_message = "");
std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace = false);
PreformattedMessage getExceptionMessageAndPattern(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace = false);
std::string getExceptionMessage(std::exception_ptr e, bool with_stacktrace);
void rethrowFirstException(const Exceptions & exceptions);
template <typename T>
requires std::is_pointer_v<T>
T exception_cast(std::exception_ptr e)

View File

@ -0,0 +1,55 @@
#pragma once
#include <base/defines.h>
#include <fmt/format.h>
/// Saves a format string for already formatted message
struct PreformattedMessage
{
String message;
std::string_view format_string;
operator const String & () const { return message; }
operator String () && { return std::move(message); }
operator fmt::format_string<> () const { UNREACHABLE(); }
};
template<typename T> struct is_fmt_runtime : std::false_type {};
template<typename T> struct is_fmt_runtime<fmt::basic_runtime<T>> : std::true_type {};
template <typename T> constexpr std::string_view tryGetStaticFormatString(T && x)
{
/// Failure of this asserting indicates that something went wrong during type deduction.
/// For example, a string literal was implicitly converted to std::string. It should not happen.
static_assert(!std::is_same_v<std::string, std::decay_t<T>>);
if constexpr (is_fmt_runtime<std::decay_t<T>>::value)
{
/// It definitely was fmt::runtime(something).
/// We are not sure about a lifetime of the string, so return empty view.
/// Also it can be arbitrary string, not a formatting pattern.
/// So returning empty pattern will not pollute the set of patterns.
return std::string_view();
}
else
{
if constexpr (std::is_same_v<PreformattedMessage, std::decay_t<T>>)
{
return x.format_string;
}
else
{
/// Most likely it was a string literal.
/// Unfortunately, there's no good way to check if something is a string literal.
/// But fmtlib requires a format string to be compile-time constant unless fmt::runtime is used.
static_assert(std::is_nothrow_convertible<T, const char * const>::value);
static_assert(!std::is_pointer<T>::value);
return std::string_view(x);
}
}
}
template <typename... Ts> constexpr size_t numArgs(Ts &&...) { return sizeof...(Ts); }
template <typename T, typename... Ts> constexpr auto firstArg(T && x, Ts &&...) { return std::forward<T>(x); }
/// For implicit conversion of fmt::basic_runtime<> to char* for std::string ctor
template <typename T, typename... Ts> constexpr auto firstArg(fmt::basic_runtime<T> && data, Ts &&...) { return data.str.data(); }

View File

@ -53,6 +53,8 @@
M(TableFunctionExecute, "Number of table function calls.") \
M(MarkCacheHits, "Number of times an entry has been found in the mark cache, so we didn't have to load a mark file.") \
M(MarkCacheMisses, "Number of times an entry has not been found in the mark cache, so we had to load a mark file in memory, which is a costly operation, adding to query latency.") \
M(QueryResultCacheHits, "Number of times a query result has been found in the query result cache (and query computation was avoided).") \
M(QueryResultCacheMisses, "Number of times a query result has not been found in the query result cache (and required query computation).") \
M(CreatedReadBufferOrdinary, "Number of times ordinary read buffer was created for reading data (while choosing among other read methods).") \
M(CreatedReadBufferDirectIO, "Number of times a read buffer with O_DIRECT was created for reading data (while choosing among other read methods).") \
M(CreatedReadBufferDirectIOFailed, "Number of times a read buffer with O_DIRECT was attempted to be created for reading data (while choosing among other read methods), but the OS did not allow it (due to lack of filesystem support or other reasons) and we fallen back to the ordinary reading method.") \
@ -64,7 +66,20 @@
M(NetworkSendElapsedMicroseconds, "Total time spent waiting for data to send to network or sending data to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries..") \
M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_network_bandwidth' and other throttling settings.") \
\
M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \
M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \
M(DiskS3PutRequestThrottlerCount, "Number of DiskS3 PUT, COPY, POST and LIST requests passed through throttler.") \
M(DiskS3PutRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 PUT, COPY, POST and LIST request throttling.") \
M(S3GetRequestThrottlerCount, "Number of S3 GET and SELECT requests passed through throttler.") \
M(S3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform S3 GET and SELECT request throttling.") \
M(S3PutRequestThrottlerCount, "Number of S3 PUT, COPY, POST and LIST requests passed through throttler.") \
M(S3PutRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform S3 PUT, COPY, POST and LIST request throttling.") \
M(RemoteReadThrottlerBytes, "Bytes passed through 'max_remote_read_network_bandwidth_for_server' throttler.") \
M(RemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth_for_server' throttling.") \
M(RemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth_for_server' throttler.") \
M(RemoteWriteThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_write_network_bandwidth_for_server' throttling.") \
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform all throttling settings.") \
\
M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \
\
@ -356,7 +371,9 @@ The server successfully detected this situation and will download merged part fr
M(RemoteFSCancelledPrefetches, "Number of cancelled prefecthes (because of seek)") \
M(RemoteFSUnusedPrefetches, "Number of prefetches pending at buffer destruction") \
M(RemoteFSPrefetchedReads, "Number of reads from prefecthed buffer") \
M(RemoteFSPrefetchedBytes, "Number of bytes from prefecthed buffer") \
M(RemoteFSUnprefetchedReads, "Number of reads from unprefetched buffer") \
M(RemoteFSUnprefetchedBytes, "Number of bytes from unprefetched buffer") \
M(RemoteFSLazySeeks, "Number of lazy seeks") \
M(RemoteFSSeeksWithReset, "Number of seeks which lead to a new connection") \
M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \
@ -386,6 +403,7 @@ The server successfully detected this situation and will download merged part fr
\
M(AsynchronousReadWaitMicroseconds, "Time spent in waiting for asynchronous reads.") \
M(AsynchronousRemoteReadWaitMicroseconds, "Time spent in waiting for asynchronous remote reads.") \
M(SynchronousRemoteReadWaitMicroseconds, "Time spent in waiting for synchronous remote reads.") \
\
M(ExternalDataSourceLocalCacheReadBytes, "Bytes read from local cache buffer in RemoteReadBufferCache")\
\

View File

@ -6,8 +6,6 @@
namespace DB
{
class IColumn;
struct SettingChange
{
String name;

View File

@ -212,7 +212,7 @@ bool checkPermissionsImpl()
{
/// This error happens all the time when running inside Docker - consider it ok,
/// don't create noise with this error.
LOG_DEBUG(&Poco::Logger::get(__PRETTY_FUNCTION__), "{}", getCurrentExceptionMessage(false));
LOG_DEBUG(&Poco::Logger::get(__PRETTY_FUNCTION__), getCurrentExceptionMessageAndPattern(/* with_stacktrace */ false));
}
else
{

View File

@ -1,5 +1,6 @@
#include <Common/Exception.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/QueryProfiler.h>
#include <Common/ThreadStatus.h>
#include <base/errnoToString.h>

View File

@ -6,7 +6,6 @@
#include <Common/MemoryTracker.h>
#include <Common/ProfileEvents.h>
#include <base/StringRef.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <boost/noncopyable.hpp>
@ -24,6 +23,9 @@ namespace Poco
}
template <class T>
class ConcurrentBoundedQueue;
namespace DB
{

View File

@ -38,7 +38,7 @@ Throttler::Throttler(size_t max_speed_, size_t limit_, const char * limit_exceed
, parent(parent_)
{}
void Throttler::add(size_t amount)
UInt64 Throttler::add(size_t amount)
{
// Values obtained under lock to be checked after release
size_t count_value;
@ -61,9 +61,10 @@ void Throttler::add(size_t amount)
throw Exception(limit_exceeded_exception_message + std::string(" Maximum: ") + toString(limit), ErrorCodes::LIMIT_EXCEEDED);
/// Wait unless there is positive amount of tokens - throttling
Int64 sleep_time = 0;
if (max_speed && tokens_value < 0)
{
int64_t sleep_time = static_cast<int64_t>(-tokens_value / max_speed * NS);
sleep_time = static_cast<Int64>(-tokens_value / max_speed * NS);
accumulated_sleep += sleep_time;
sleepForNanoseconds(sleep_time);
accumulated_sleep -= sleep_time;
@ -71,7 +72,9 @@ void Throttler::add(size_t amount)
}
if (parent)
parent->add(amount);
sleep_time += parent->add(amount);
return static_cast<UInt64>(sleep_time);
}
void Throttler::reset()

View File

@ -1,10 +1,12 @@
#pragma once
#include <Common/Throttler_fwd.h>
#include <Common/ProfileEvents.h>
#include <mutex>
#include <memory>
#include <base/sleep.h>
#include <base/types.h>
#include <atomic>
namespace DB
@ -32,7 +34,16 @@ public:
const std::shared_ptr<Throttler> & parent_ = nullptr);
/// Use `amount` tokens, sleeps if required or throws exception on limit overflow.
void add(size_t amount);
/// Returns duration of sleep in microseconds (to distinguish sleeping on different kinds of throttlers for metrics)
UInt64 add(size_t amount);
UInt64 add(size_t amount, ProfileEvents::Event event_amount, ProfileEvents::Event event_sleep_us)
{
UInt64 sleep_us = add(amount);
ProfileEvents::increment(event_amount, amount);
ProfileEvents::increment(event_sleep_us, sleep_us);
return sleep_us;
}
/// Not thread safe
void setParent(const std::shared_ptr<Throttler> & parent_)
@ -50,12 +61,12 @@ private:
size_t count{0};
const size_t max_speed{0}; /// in tokens per second.
const size_t max_burst{0}; /// in tokens.
const uint64_t limit{0}; /// 0 - not limited.
const UInt64 limit{0}; /// 0 - not limited.
const char * limit_exceeded_exception_message = nullptr;
std::mutex mutex;
std::atomic<uint64_t> accumulated_sleep{0}; // Accumulated sleep time over all waiting threads
std::atomic<UInt64> accumulated_sleep{0}; // Accumulated sleep time over all waiting threads
double tokens{0}; /// Amount of tokens available in token bucket. Updated in `add` method.
uint64_t prev_ns{0}; /// Previous `add` call time (in nanoseconds).
UInt64 prev_ns{0}; /// Previous `add` call time (in nanoseconds).
/// Used to implement a hierarchy of throttlers
std::shared_ptr<Throttler> parent;

View File

@ -6,6 +6,9 @@
#include <Poco/Logger.h>
#include <Poco/Message.h>
#include <Common/CurrentThread.h>
#include <Common/LoggingFormatStringHelpers.h>
namespace Poco { class Logger; }
/// This wrapper is useful to save formatted message into a String before sending it to a logger
class LogToStrImpl
@ -33,40 +36,9 @@ public:
namespace
{
template <typename... Ts> constexpr size_t numArgs(Ts &&...) { return sizeof...(Ts); }
template <typename T, typename... Ts> constexpr auto firstArg(T && x, Ts &&...) { return std::forward<T>(x); }
/// For implicit conversion of fmt::basic_runtime<> to char* for std::string ctor
template <typename T, typename... Ts> constexpr auto firstArg(fmt::basic_runtime<T> && data, Ts &&...) { return data.str.data(); }
[[maybe_unused]] const ::Poco::Logger * getLogger(const ::Poco::Logger * logger) { return logger; };
[[maybe_unused]] const ::Poco::Logger * getLogger(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); };
[[maybe_unused]] std::unique_ptr<LogToStrImpl> getLogger(std::unique_ptr<LogToStrImpl> && logger) { return logger; };
template<typename T> struct is_fmt_runtime : std::false_type {};
template<typename T> struct is_fmt_runtime<fmt::basic_runtime<T>> : std::true_type {};
/// Usually we use LOG_*(...) macros with either string literals or fmt::runtime(whatever) as a format string.
/// This function is useful to get a string_view to a static format string passed to LOG_* macro.
template <typename T> constexpr std::string_view tryGetStaticFormatString(T && x)
{
if constexpr (is_fmt_runtime<T>::value)
{
/// It definitely was fmt::runtime(something).
/// We are not sure about a lifetime of the string, so return empty view.
/// Also it can be arbitrary string, not a formatting pattern.
/// So returning empty pattern will not pollute the set of patterns.
return std::string_view();
}
else
{
/// Most likely it was a string literal.
/// Unfortunately, there's no good way to check if something is a string literal.
/// But fmtlib requires a format string to be compile-time constant unless fmt::runtime is used.
static_assert(std::is_nothrow_convertible<T, const char * const>::value);
static_assert(!std::is_pointer<T>::value);
return std::string_view(x);
}
}
}
#define LOG_IMPL_FIRST_ARG(X, ...) X

View File

@ -357,16 +357,12 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
if (read_checksum != hash.get64())
{
const auto error_string = fmt::format(
"Invalid checksum while reading state from {}. Got {}, expected {}",
path.generic_string(),
hash.get64(),
read_checksum);
constexpr auto error_format = "Invalid checksum while reading state from {}. Got {}, expected {}";
#ifdef NDEBUG
LOG_ERROR(logger, fmt::runtime(error_string));
LOG_ERROR(logger, error_format, path.generic_string(), hash.get64(), read_checksum);
return nullptr;
#else
throw Exception(ErrorCodes::CORRUPTED_DATA, error_string);
throw Exception(ErrorCodes::CORRUPTED_DATA, error_format, path.generic_string(), hash.get64(), read_checksum);
#endif
}

View File

@ -5,6 +5,7 @@
#include <Core/SettingsEnums.h>
#include <Core/Defines.h>
#include <IO/ReadSettings.h>
#include <base/unit.h>
namespace Poco::Util
@ -22,11 +23,6 @@ namespace DB
{
class IColumn;
static constexpr UInt64 operator""_GiB(unsigned long long value)
{
return value * 1024 * 1024 * 1024;
}
/** List of settings: type, name, default value, description, flags
*
* This looks rather unconvenient. It is done that way to avoid repeating settings in different places.
@ -95,6 +91,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, s3_max_get_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_get_rps`", 0) \
M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \
M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \
M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
@ -678,6 +675,13 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, optimize_sorting_by_input_stream_properties, true, "Optimize sorting by sorting properties of input stream", 0) \
M(Bool, enable_experimental_query_result_cache, false, "Store and retrieve results of SELECT queries in/from the query result cache", 0) \
M(Bool, enable_experimental_query_result_cache_passive_usage, false, "Retrieve results of SELECT queries from the query result cache", 0) \
M(Bool, query_result_cache_store_results_of_queries_with_nondeterministic_functions, false, "Store results of queries with non-deterministic functions (e.g. rand(), now()) in the query result cache", 0) \
M(UInt64, query_result_cache_min_query_runs, 0, "Minimum number a SELECT query must run before its result is stored in the query result cache", 0) \
M(Milliseconds, query_result_cache_min_query_duration, 0, "Minimum time in milliseconds for a query to run for its result to be stored in the query result cache.", 0) \
M(Seconds, query_result_cache_ttl, 60, "After this time in seconds entries in the query result cache become stale", 0) \
M(Bool, query_result_cache_share_between_users, false, "Allow other users to read entry in the query result cache", 0) \
M(UInt64, insert_keeper_max_retries, 0, "Max retries for keeper operations during insert", 0) \
M(UInt64, insert_keeper_retry_initial_backoff_ms, 100, "Initial backoff timeout for keeper operations during insert", 0) \
M(UInt64, insert_keeper_retry_max_backoff_ms, 10000, "Max backoff timeout for keeper operations during insert", 0) \

View File

@ -278,7 +278,7 @@ private:
if (next_pos != std::string_view::npos)
size = next_pos - pos;
LOG_FATAL(log, "{}", message.substr(pos, size));
LOG_FATAL(log, fmt::runtime(message.substr(pos, size)));
pos = next_pos;
}
}

View File

@ -82,7 +82,7 @@ void DatabaseAtomic::drop(ContextPtr)
}
catch (...)
{
LOG_WARNING(log, fmt::runtime(getCurrentExceptionMessage(true)));
LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true));
}
fs::remove_all(getMetadataPath());
}
@ -477,7 +477,7 @@ void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String &
}
catch (...)
{
LOG_WARNING(log, fmt::runtime(getCurrentExceptionMessage(true)));
LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true));
}
}
@ -490,7 +490,7 @@ void DatabaseAtomic::tryRemoveSymlink(const String & table_name)
}
catch (...)
{
LOG_WARNING(log, fmt::runtime(getCurrentExceptionMessage(true)));
LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true));
}
}
@ -535,7 +535,7 @@ void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new
}
catch (...)
{
LOG_WARNING(log, fmt::runtime(getCurrentExceptionMessage(true)));
LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true));
}
auto new_name_escaped = escapeForFileName(new_name);

View File

@ -100,7 +100,7 @@ ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const String & table_name, Co
if (!load_result.config)
{
if (throw_on_error)
throw Exception{"Dictionary " + backQuote(table_name) + " doesn't exist", ErrorCodes::CANNOT_GET_CREATE_DICTIONARY_QUERY};
throw Exception(ErrorCodes::CANNOT_GET_CREATE_DICTIONARY_QUERY, "Dictionary {} doesn't exist", backQuote(table_name));
return {};
}

View File

@ -102,7 +102,7 @@ time_t DatabaseLazy::getObjectMetadataModificationTime(const String & table_name
auto it = tables_cache.find(table_name);
if (it != tables_cache.end())
return it->second.metadata_modification_time;
throw Exception("Table " + backQuote(database_name) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist.", backQuote(database_name), backQuote(table_name));
}
void DatabaseLazy::alterTable(
@ -185,7 +185,7 @@ StoragePtr DatabaseLazy::detachTable(ContextPtr /* context */, const String & ta
std::lock_guard lock(mutex);
auto it = tables_cache.find(table_name);
if (it == tables_cache.end())
throw Exception("Table " + backQuote(database_name) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist.", backQuote(database_name), backQuote(table_name));
res = it->second.table;
if (it->second.expiration_iterator != cache_expiration_queue.end())
cache_expiration_queue.erase(it->second.expiration_iterator);

View File

@ -312,7 +312,7 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
}
catch (...)
{
LOG_WARNING(log, fmt::runtime(getCurrentExceptionMessage(__PRETTY_FUNCTION__)));
LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true));
attachTable(local_context, table_name, table, table_data_path_relative);
if (renamed)
fs::rename(table_metadata_path_drop, table_metadata_path);
@ -377,14 +377,14 @@ void DatabaseOnDisk::renameTable(
if (dictionary && table && !table->isDictionary())
throw Exception("Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables", ErrorCodes::INCORRECT_QUERY);
table_lock = table->lockExclusively(
local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
detachTable(local_context, table_name);
UUID prev_uuid = UUIDHelpers::Nil;
try
{
table_lock = table->lockExclusively(
local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
table_metadata_path = getObjectMetadataPath(table_name);
attach_query = parseQueryFromMetadata(log, local_context, table_metadata_path);
auto & create = attach_query->as<ASTCreateQuery &>();
@ -463,8 +463,7 @@ ASTPtr DatabaseOnDisk::getCreateTableQueryImpl(const String & table_name, Contex
catch (const Exception & e)
{
if (!has_table && e.code() == ErrorCodes::FILE_DOESNT_EXIST && throw_on_error)
throw Exception{"Table " + backQuote(table_name) + " doesn't exist",
ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY};
throw Exception(ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY, "Table {} doesn't exist", backQuote(table_name));
else if (!is_system_storage && throw_on_error)
throw;
}

View File

@ -233,6 +233,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
backQuote(database_name), backQuote(table_name));
res = it->second;
tables.erase(it);
res->is_detached = true;
auto table_id = res->getStorageID();
if (table_id.hasUUID())
@ -269,6 +270,10 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
DatabaseCatalog::instance().removeUUIDMapping(table_id.uuid);
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {} already exists.", table_id.getFullTableName());
}
/// It is important to reset is_detached here since in case of RENAME in
/// non-Atomic database the is_detached is set to true before RENAME.
table->is_detached = false;
}
void DatabaseWithOwnTablesBase::shutdown()

View File

@ -136,8 +136,7 @@ ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const String & table_name, Context
if (local_tables_cache.find(table_name) == local_tables_cache.end())
{
if (throw_on_error)
throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist..",
ErrorCodes::UNKNOWN_TABLE);
throw Exception(ErrorCodes::UNKNOWN_TABLE, "MySQL table {} doesn't exist.", database_name_in_mysql, table_name);
return nullptr;
}
@ -181,7 +180,7 @@ time_t DatabaseMySQL::getObjectMetadataModificationTime(const String & table_nam
fetchTablesIntoLocalCache(getContext());
if (local_tables_cache.find(table_name) == local_tables_cache.end())
throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception(ErrorCodes::UNKNOWN_TABLE, "MySQL table {} doesn't exist.", database_name_in_mysql, table_name);
return time_t(local_tables_cache[table_name].first);
}
@ -449,7 +448,7 @@ void DatabaseMySQL::detachTablePermanently(ContextPtr, const String & table_name
remove_or_detach_tables.erase(table_name);
throw;
}
table_iter->second.second->is_dropped = true;
table_iter->second.second->is_detached = true;
}
void DatabaseMySQL::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/)

View File

@ -113,8 +113,8 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
writeBackQuotedString(name, warning);
}
writeString(" disappeared from configuration, this change will be applied after restart of ClickHouse", warning);
LOG_WARNING(&Poco::Logger::get("DiskSelector"), fmt::runtime(warning.str()));
LOG_WARNING(&Poco::Logger::get("DiskSelector"), "{} disappeared from configuration, "
"this change will be applied after restart of ClickHouse", warning.str());
}
return result;

View File

@ -15,12 +15,15 @@ namespace CurrentMetrics
namespace ProfileEvents
{
extern const Event AsynchronousRemoteReadWaitMicroseconds;
extern const Event SynchronousRemoteReadWaitMicroseconds;
extern const Event RemoteFSSeeks;
extern const Event RemoteFSPrefetches;
extern const Event RemoteFSCancelledPrefetches;
extern const Event RemoteFSUnusedPrefetches;
extern const Event RemoteFSPrefetchedReads;
extern const Event RemoteFSUnprefetchedReads;
extern const Event RemoteFSPrefetchedBytes;
extern const Event RemoteFSUnprefetchedBytes;
extern const Event RemoteFSLazySeeks;
extern const Event RemoteFSSeeksWithReset;
extern const Event RemoteFSBuffers;
@ -131,7 +134,6 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t position)
{
/// Do not reinitialize internal state in case the new end of range is already included.
@ -141,19 +143,14 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos
/// reading and ignoring some data.
if (!read_until_position || position > *read_until_position)
{
read_until_position = position;
/// We must wait on future and reset the prefetch here, because otherwise there might be
/// a race between reading the data in the threadpool and impl->setReadUntilPosition()
/// which reinitializes internal remote read buffer (because if we have a new read range
/// then we need a new range request) and in case of reading from cache we need to request
/// and hold more file segment ranges from cache.
if (prefetch_future.valid())
{
ProfileEvents::increment(ProfileEvents::RemoteFSCancelledPrefetches);
prefetch_future.wait();
prefetch_future = {};
}
read_until_position = position;
resetPrefetch(FilesystemPrefetchState::CANCELLED_WITH_RANGE_CHANGE);
impl->setReadUntilPosition(*read_until_position);
}
}
@ -184,16 +181,18 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
prefetch_buffer.swap(memory);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size);
}
else
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::AsynchronousRemoteReadWaitMicroseconds);
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);
chassert(memory.size() == read_settings.remote_fs_buffer_size);
std::tie(size, offset) = impl->readInto(memory.data(), memory.size(), file_offset_of_buffer_end, bytes_to_ignore);
bytes_to_ignore = 0;
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedBytes, size);
}
chassert(size >= offset);
@ -257,13 +256,11 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
else if (prefetch_future.valid())
{
/// Read from prefetch buffer and recheck if the new position is valid inside.
read_from_prefetch = true;
/// Read from prefetch buffer and recheck if the new position is valid inside.
if (nextImpl())
{
read_from_prefetch = true;
continue;
}
}
/// Prefetch is cancelled because of seek.
@ -301,8 +298,11 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
else
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
impl->reset();
if (impl->initialized())
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
impl->reset();
}
file_offset_of_buffer_end = new_pos;
}
@ -312,18 +312,44 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
{
if (prefetch_future.valid())
{
ProfileEvents::increment(ProfileEvents::RemoteFSUnusedPrefetches);
prefetch_future.wait();
prefetch_future = {};
}
resetPrefetch(FilesystemPrefetchState::UNNEEDED);
}
AsynchronousReadIndirectBufferFromRemoteFS::~AsynchronousReadIndirectBufferFromRemoteFS()
{
finalize();
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void AsynchronousReadIndirectBufferFromRemoteFS::resetPrefetch(FilesystemPrefetchState state)
{
if (!prefetch_future.valid())
return;
auto [size, _] = prefetch_future.get();
prefetch_future = {};
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size);
switch (state)
{
case FilesystemPrefetchState::UNNEEDED:
ProfileEvents::increment(ProfileEvents::RemoteFSUnusedPrefetches);
break;
case FilesystemPrefetchState::CANCELLED_WITH_SEEK:
case FilesystemPrefetchState::CANCELLED_WITH_RANGE_CHANGE:
ProfileEvents::increment(ProfileEvents::RemoteFSCancelledPrefetches);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of prefetch: {}", magic_enum::enum_name(state));
}
}
}

View File

@ -64,6 +64,15 @@ private:
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size);
enum class FilesystemPrefetchState
{
USED,
CANCELLED_WITH_SEEK,
CANCELLED_WITH_RANGE_CHANGE,
UNNEEDED,
};
void resetPrefetch(FilesystemPrefetchState state);
ReadSettings read_settings;
IAsynchronousReader & reader;

View File

@ -5,6 +5,7 @@
#include <Common/logger_useful.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/Context.h>
#include <IO/SwapHelper.h>
namespace ProfileEvents
@ -21,21 +22,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace
{
class SwapHelper
{
public:
SwapHelper(WriteBuffer & b1_, WriteBuffer & b2_) : b1(b1_), b2(b2_) { b1.swap(b2); }
~SwapHelper() { b1.swap(b2); }
private:
WriteBuffer & b1;
WriteBuffer & b2;
};
}
FileSegmentRangeWriter::FileSegmentRangeWriter(
FileCache * cache_,
const FileSegment::Key & key_,

View File

@ -7,8 +7,15 @@
#include <Common/logger_useful.h>
#include <Common/Throttler.h>
#include <base/sleep.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event RemoteReadThrottlerBytes;
extern const Event RemoteReadThrottlerSleepMicroseconds;
}
namespace DB
{
@ -91,7 +98,7 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
{
bytes_read = data_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), to_read_bytes);
if (read_settings.remote_throttler)
read_settings.remote_throttler->add(bytes_read);
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
break;
}
catch (const Azure::Storage::StorageException & e)

View File

@ -38,31 +38,31 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
&& (!FileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
}
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const String & path, size_t file_size)
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
{
if (!current_file_path.empty() && !with_cache && enable_cache_log)
if (current_object && !with_cache && enable_cache_log)
{
appendFilesystemCacheLog();
}
current_file_path = path;
current_file_size = file_size;
current_object = object;
total_bytes_read_from_current_file = 0;
const auto & object_path = object.absolute_path;
size_t current_read_until_position = read_until_position ? read_until_position : file_size;
auto current_read_buffer_creator = [path, current_read_until_position, this]() { return read_buffer_creator(path, current_read_until_position); };
size_t current_read_until_position = read_until_position ? read_until_position : object.bytes_size;
auto current_read_buffer_creator = [=, this]() { return read_buffer_creator(object_path, current_read_until_position); };
if (with_cache)
{
auto cache_key = settings.remote_fs_cache->hash(path);
auto cache_key = settings.remote_fs_cache->hash(object_path);
return std::make_shared<CachedOnDiskReadBufferFromFile>(
path,
object_path,
cache_key,
settings.remote_fs_cache,
std::move(current_read_buffer_creator),
settings,
query_id,
file_size,
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt);
@ -73,12 +73,15 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
{
if (!current_object)
return;
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = current_file_path,
.file_segment_range = { 0, current_file_size },
.source_file_path = current_object->absolute_path,
.file_segment_range = { 0, current_object->bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_size = total_bytes_read_from_current_file,
.read_from_cache_attempted = false,
@ -123,7 +126,7 @@ void ReadBufferFromRemoteFSGather::initialize()
if (!current_buf || current_buf_idx != i)
{
current_buf_idx = i;
current_buf = createImplementationBuffer(object.absolute_path, object.bytes_size);
current_buf = createImplementationBuffer(object);
}
current_buf->seek(current_buf_offset, SEEK_SET);
@ -170,7 +173,7 @@ bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
++current_buf_idx;
const auto & object = blobs_to_read[current_buf_idx];
current_buf = createImplementationBuffer(object.absolute_path, object.bytes_size);
current_buf = createImplementationBuffer(object);
return true;
}
@ -242,7 +245,9 @@ void ReadBufferFromRemoteFSGather::reset()
String ReadBufferFromRemoteFSGather::getFileName() const
{
return current_file_path;
if (current_object)
return current_object->absolute_path;
return blobs_to_read[0].absolute_path;
}
size_t ReadBufferFromRemoteFSGather::getFileSize() const

View File

@ -48,7 +48,7 @@ public:
size_t getImplementationBufferOffset() const;
private:
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size);
SeekableReadBufferPtr createImplementationBuffer(const StoredObject & object);
bool nextImpl() override;
@ -71,6 +71,8 @@ private:
String current_file_path;
size_t current_file_size = 0;
std::optional<StoredObject> current_object;
bool with_cache;
String query_id;

View File

@ -22,7 +22,7 @@ namespace ProfileEvents
namespace CurrentMetrics
{
extern const Metric Read;
extern const Metric RemoteRead;
}
namespace DB
@ -42,14 +42,11 @@ ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queu
std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Request request)
{
ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderSubmit);
auto schedule = threadPoolCallbackRunner<Result>(pool, "VFSRead");
return schedule([request]() -> Result
return scheduleFromThreadPool<Result>([request]() -> Result
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead};
Stopwatch watch(CLOCK_MONOTONIC);
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
@ -57,10 +54,10 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
watch.stop();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, result.offset ? result.size - result.offset : result.size);
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, result.size);
return Result{ .size = result.size, .offset = result.offset };
}, request.priority);
}, pool, "VFSRead", request.priority);
}
}

View File

@ -8,6 +8,12 @@
#include <Common/Throttler.h>
namespace ProfileEvents
{
extern const Event RemoteWriteThrottlerBytes;
extern const Event RemoteWriteThrottlerSleepMicroseconds;
}
namespace DB
{
@ -119,7 +125,7 @@ void WriteBufferFromAzureBlobStorage::nextImpl()
uploadBlock(tmp_buffer->data(), tmp_buffer->size());
if (write_settings.remote_throttler)
write_settings.remote_throttler->add(size_to_upload);
write_settings.remote_throttler->add(size_to_upload, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);
}
}

View File

@ -105,7 +105,7 @@ StoredObjects FakeMetadataStorageFromDisk::getStorageObjects(const std::string &
std::string object_path = fs::path(object_storage_root_path) / blob_name;
size_t object_size = getFileSize(path);
auto object = StoredObject::create(*object_storage, object_path, object_size, /* exists */true);
auto object = StoredObject::create(*object_storage, object_path, object_size, path, /* exists */true);
return {std::move(object)};
}

View File

@ -145,7 +145,7 @@ StoredObjects MetadataStorageFromDisk::getStorageObjects(const std::string & pat
for (auto & [object_relative_path, size] : object_storage_relative_paths)
{
auto object_path = fs::path(metadata->getBlobsCommonPrefix()) / object_relative_path;
StoredObject object{ object_path, size, [](const String & path_){ return path_; }};
StoredObject object{ object_path, size, path, [](const String & path_){ return path_; }};
object_storage_paths.push_back(object);
}

View File

@ -108,7 +108,7 @@ StoredObjects MetadataStorageFromPlainObjectStorage::getStorageObjects(const std
{
std::string blob_name = object_storage->generateBlobNameForPath(path);
size_t object_size = getFileSize(blob_name);
auto object = StoredObject::create(*object_storage, getAbsolutePath(blob_name), object_size, /* exists */true);
auto object = StoredObject::create(*object_storage, getAbsolutePath(blob_name), object_size, path, /* exists */true);
return {std::move(object)};
}

View File

@ -11,8 +11,10 @@ namespace DB
StoredObject::StoredObject(
const std::string & absolute_path_,
uint64_t bytes_size_,
const std::string & mapped_path_,
PathKeyForCacheCreator && path_key_for_cache_creator_)
: absolute_path(absolute_path_)
, mapped_path(mapped_path_)
, bytes_size(bytes_size_)
, path_key_for_cache_creator(std::move(path_key_for_cache_creator_))
{
@ -26,8 +28,18 @@ std::string StoredObject::getPathKeyForCache() const
return path_key_for_cache_creator(absolute_path);
}
const std::string & StoredObject::getMappedPath() const
{
return mapped_path;
}
StoredObject StoredObject::create(
const IObjectStorage & object_storage, const std::string & object_path, size_t object_size, bool exists, bool object_bypasses_cache)
const IObjectStorage & object_storage,
const std::string & object_path,
size_t object_size,
const std::string & mapped_path,
bool exists,
bool object_bypasses_cache)
{
if (object_bypasses_cache)
return StoredObject(object_path, object_size, {});
@ -54,7 +66,7 @@ StoredObject StoredObject::create(
path_key_for_cache_creator = [path = path_key_for_cache_creator(object_path)](const std::string &) { return path; };
}
return StoredObject(object_path, object_size, std::move(path_key_for_cache_creator));
return StoredObject(object_path, object_size, mapped_path, std::move(path_key_for_cache_creator));
}
}

View File

@ -10,17 +10,23 @@ namespace DB
/// Object metadata: path, size, path_key_for_cache.
struct StoredObject
{
/// Absolute path of the blob in object storage.
std::string absolute_path;
/// A map which is mapped to current blob (for example, a corresponding local path as clickhouse sees it).
std::string mapped_path;
uint64_t bytes_size;
uint64_t bytes_size = 0;
std::string getPathKeyForCache() const;
const std::string & getMappedPath() const;
/// Create `StoredObject` based on metadata storage and blob name of the object.
static StoredObject create(
const IObjectStorage & object_storage,
const std::string & object_path,
size_t object_size = 0,
const std::string & mapped_path = "",
bool exists = false,
bool object_bypasses_cache = false);
@ -32,6 +38,7 @@ struct StoredObject
explicit StoredObject(
const std::string & absolute_path_,
uint64_t bytes_size_ = 0,
const std::string & mapped_path_ = "",
PathKeyForCacheCreator && path_key_for_cache_creator_ = {});
};

View File

@ -108,7 +108,7 @@ StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const s
auto fs_path = fs::path(object_storage.url) / path;
std::string remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
remote_path = remote_path.substr(object_storage.url.size());
return {StoredObject::create(object_storage, remote_path, object_storage.files.at(path).size, true)};
return {StoredObject::create(object_storage, remote_path, object_storage.files.at(path).size, path, true)};
}
std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const

View File

@ -26,7 +26,7 @@ inline void throwIfDivisionLeadsToFPE(A a, B b)
/// Is it better to use siglongjmp instead of checks?
if (unlikely(b == 0))
throw Exception("Division by zero", ErrorCodes::ILLEGAL_DIVISION);
throw Exception(ErrorCodes::ILLEGAL_DIVISION, "Division by zero");
/// http://avva.livejournal.com/2548306.html
if (unlikely(is_signed_v<A> && is_signed_v<B> && a == std::numeric_limits<A>::min() && b == -1))

View File

@ -184,15 +184,11 @@ void validateFunctionArgumentTypes(const IFunction & func,
return result;
};
throw Exception("Incorrect number of arguments for function " + func.getName()
+ " provided " + std::to_string(arguments.size())
+ (!arguments.empty() ? " (" + join_argument_types(arguments) + ")" : String{})
+ ", expected " + std::to_string(mandatory_args.size())
+ (!optional_args.empty() ? " to " + std::to_string(mandatory_args.size() + optional_args.size()) : "")
+ " (" + join_argument_types(mandatory_args)
+ (!optional_args.empty() ? ", [" + join_argument_types(optional_args) + "]" : "")
+ ")",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Incorrect number of arguments for function {} provided {}{}, expected {}{} ({}{})",
func.getName(), arguments.size(), (!arguments.empty() ? " (" + join_argument_types(arguments) + ")" : String{}),
mandatory_args.size(), (!optional_args.empty() ? " to " + std::to_string(mandatory_args.size() + optional_args.size()) : ""),
join_argument_types(mandatory_args), (!optional_args.empty() ? ", [" + join_argument_types(optional_args) + "]" : ""));
}
validateArgumentsImpl(func, arguments, 0, mandatory_args);

View File

@ -1166,8 +1166,8 @@ public:
}
catch (const Exception &)
{
throw Exception("Illegal types of arguments (" + arguments[0]->getName() + ", " + arguments[1]->getName() + ")"
" of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal types of arguments ({}, {})"
" of function {}", arguments[0]->getName(), arguments[1]->getName(), getName());
}
}

View File

@ -1116,7 +1116,7 @@ inline bool tryParseImpl<DataTypeIPv6>(DataTypeIPv6::FieldType & x, ReadBuffer &
if (isNativeNumber(result_type) && !(result_type.getName() == "IPv4" || result_type.getName() == "IPv6"))
message_buf << ". Note: there are to" << result_type.getName() << "OrZero and to" << result_type.getName() << "OrNull functions, which returns zero/NULL instead of throwing exception.";
throw Exception(message_buf.str(), ErrorCodes::CANNOT_PARSE_TEXT);
throw Exception(PreformattedMessage{message_buf.str(), "Cannot parse string {} as {}: syntax error {}"}, ErrorCodes::CANNOT_PARSE_TEXT);
}

View File

@ -7,7 +7,7 @@ namespace DB
REGISTER_FUNCTION(ExternalDictionaries)
{
const std::string dict_get_description { R"(
constexpr auto dict_get_description { R"(
Retrieves values from a dictionary.
Accepts 3 parameters:
@ -20,7 +20,7 @@ Returned value: value of the dictionary attribute parsed in the {} if key is fou
Throws an exception if cannot parse the value of the attribute or the value does not match the attribute data type.
)" };
const std::string dict_get_or_default_description { R"(
constexpr auto dict_get_or_default_description { R"(
Retrieves values from a dictionary.
Accepts 4 parameters:
@ -34,7 +34,7 @@ Returned value: value of the dictionary attribute parsed in the {} if key is fou
Throws an exception if cannot parse the value of the attribute or the value does not match the attribute data type.
)" };
const std::string dict_get_or_null_description { R"(
constexpr auto dict_get_or_null_description { R"(
Retrieves values from a dictionary.
Accepts 3 parameters:
@ -47,43 +47,43 @@ Returned value: value of the dictionary attribute parsed in the attributes da
Throws an exception if cannot parse the value of the attribute or the value does not match the attribute data type.
)" };
factory.registerFunction<FunctionDictGetNoType<DictionaryGetFunctionType::get>>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "attributes data type") });
factory.registerFunction<FunctionDictGetNoType<DictionaryGetFunctionType::getOrDefault>>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "attributes data type") });
factory.registerFunction<FunctionDictGetNoType<DictionaryGetFunctionType::get>>(Documentation{ fmt::format(dict_get_description, "attributes data type") });
factory.registerFunction<FunctionDictGetNoType<DictionaryGetFunctionType::getOrDefault>>(Documentation{ fmt::format(dict_get_or_default_description, "attributes data type") });
factory.registerFunction<FunctionDictGetOrNull>(Documentation{ dict_get_or_null_description });
factory.registerFunction<FunctionDictGetUInt8>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "UInt8") });
factory.registerFunction<FunctionDictGetUInt16>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "UInt16") });
factory.registerFunction<FunctionDictGetUInt32>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "UInt32") });
factory.registerFunction<FunctionDictGetUInt64>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "UInt64") });
factory.registerFunction<FunctionDictGetInt8>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "Int8") });
factory.registerFunction<FunctionDictGetInt16>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "Int16") });
factory.registerFunction<FunctionDictGetInt32>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "Int32") });
factory.registerFunction<FunctionDictGetInt64>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "Int64") });
factory.registerFunction<FunctionDictGetFloat32>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "Float32") });
factory.registerFunction<FunctionDictGetFloat64>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "Float64") });
factory.registerFunction<FunctionDictGetDate>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "Date") });
factory.registerFunction<FunctionDictGetDateTime>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "DateTime") });
factory.registerFunction<FunctionDictGetUUID>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "UUID") });
factory.registerFunction<FunctionDictGetIPv4>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "IPv4") });
factory.registerFunction<FunctionDictGetIPv6>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "IPv6") });
factory.registerFunction<FunctionDictGetString>(Documentation{ fmt::format(fmt::runtime(dict_get_description), "String") });
factory.registerFunction<FunctionDictGetUInt8>(Documentation{ fmt::format(dict_get_description, "UInt8") });
factory.registerFunction<FunctionDictGetUInt16>(Documentation{ fmt::format(dict_get_description, "UInt16") });
factory.registerFunction<FunctionDictGetUInt32>(Documentation{ fmt::format(dict_get_description, "UInt32") });
factory.registerFunction<FunctionDictGetUInt64>(Documentation{ fmt::format(dict_get_description, "UInt64") });
factory.registerFunction<FunctionDictGetInt8>(Documentation{ fmt::format(dict_get_description, "Int8") });
factory.registerFunction<FunctionDictGetInt16>(Documentation{ fmt::format(dict_get_description, "Int16") });
factory.registerFunction<FunctionDictGetInt32>(Documentation{ fmt::format(dict_get_description, "Int32") });
factory.registerFunction<FunctionDictGetInt64>(Documentation{ fmt::format(dict_get_description, "Int64") });
factory.registerFunction<FunctionDictGetFloat32>(Documentation{ fmt::format(dict_get_description, "Float32") });
factory.registerFunction<FunctionDictGetFloat64>(Documentation{ fmt::format(dict_get_description, "Float64") });
factory.registerFunction<FunctionDictGetDate>(Documentation{ fmt::format(dict_get_description, "Date") });
factory.registerFunction<FunctionDictGetDateTime>(Documentation{ fmt::format(dict_get_description, "DateTime") });
factory.registerFunction<FunctionDictGetUUID>(Documentation{ fmt::format(dict_get_description, "UUID") });
factory.registerFunction<FunctionDictGetIPv4>(Documentation{ fmt::format(dict_get_description, "IPv4") });
factory.registerFunction<FunctionDictGetIPv6>(Documentation{ fmt::format(dict_get_description, "IPv6") });
factory.registerFunction<FunctionDictGetString>(Documentation{ fmt::format(dict_get_description, "String") });
factory.registerFunction<FunctionDictGetUInt8OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "UInt8") });
factory.registerFunction<FunctionDictGetUInt16OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "UInt16") });
factory.registerFunction<FunctionDictGetUInt32OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "UInt32") });
factory.registerFunction<FunctionDictGetUInt64OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "UInt64") });
factory.registerFunction<FunctionDictGetInt8OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "Int8") });
factory.registerFunction<FunctionDictGetInt16OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "Int16") });
factory.registerFunction<FunctionDictGetInt32OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "Int32") });
factory.registerFunction<FunctionDictGetInt64OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "Int64") });
factory.registerFunction<FunctionDictGetFloat32OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "Float32") });
factory.registerFunction<FunctionDictGetFloat64OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "Float64") });
factory.registerFunction<FunctionDictGetDateOrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "Date") });
factory.registerFunction<FunctionDictGetDateTimeOrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "DateTime") });
factory.registerFunction<FunctionDictGetUUIDOrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "UUID") });
factory.registerFunction<FunctionDictGetIPv4OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "IPv4") });
factory.registerFunction<FunctionDictGetIPv6OrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "IPv6") });
factory.registerFunction<FunctionDictGetStringOrDefault>(Documentation{ fmt::format(fmt::runtime(dict_get_or_default_description), "String") });
factory.registerFunction<FunctionDictGetUInt8OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "UInt8") });
factory.registerFunction<FunctionDictGetUInt16OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "UInt16") });
factory.registerFunction<FunctionDictGetUInt32OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "UInt32") });
factory.registerFunction<FunctionDictGetUInt64OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "UInt64") });
factory.registerFunction<FunctionDictGetInt8OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "Int8") });
factory.registerFunction<FunctionDictGetInt16OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "Int16") });
factory.registerFunction<FunctionDictGetInt32OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "Int32") });
factory.registerFunction<FunctionDictGetInt64OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "Int64") });
factory.registerFunction<FunctionDictGetFloat32OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "Float32") });
factory.registerFunction<FunctionDictGetFloat64OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "Float64") });
factory.registerFunction<FunctionDictGetDateOrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "Date") });
factory.registerFunction<FunctionDictGetDateTimeOrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "DateTime") });
factory.registerFunction<FunctionDictGetUUIDOrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "UUID") });
factory.registerFunction<FunctionDictGetIPv4OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "IPv4") });
factory.registerFunction<FunctionDictGetIPv6OrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "IPv6") });
factory.registerFunction<FunctionDictGetStringOrDefault>(Documentation{ fmt::format(dict_get_or_default_description, "String") });
factory.registerFunction<FunctionDictHas>(Documentation{ R"(
Checks whether a key is present in a dictionary.

View File

@ -207,7 +207,7 @@ public:
if (arguments.size() < 2)
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "{}", arguments.size());
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect number of arguments: {}", arguments.size());
}
const auto * first_array_type = checkAndGetDataType<typename Impl::data_type>(arguments[1].type.get());

View File

@ -22,7 +22,7 @@ struct DivideDecimalsImpl
execute(FirstType a, SecondType b, UInt16 scale_a, UInt16 scale_b, UInt16 result_scale)
{
if (b.value == 0)
throw DB::Exception("Division by zero", ErrorCodes::ILLEGAL_DIVISION);
throw DB::Exception(ErrorCodes::ILLEGAL_DIVISION, "Division by zero");
if (a.value == 0)
return Decimal256(0);

View File

@ -78,7 +78,7 @@ struct DivideIntegralByConstantImpl
#pragma GCC diagnostic pop
if (unlikely(static_cast<A>(b) == 0))
throw Exception("Division by zero", ErrorCodes::ILLEGAL_DIVISION);
throw Exception(ErrorCodes::ILLEGAL_DIVISION, "Division by zero");
divideImpl(a_pos, b, c_pos, size);
}

View File

@ -78,7 +78,7 @@ struct ModuloByConstantImpl
#pragma GCC diagnostic pop
if (unlikely(static_cast<A>(b) == 0))
throw Exception("Division by zero", ErrorCodes::ILLEGAL_DIVISION);
throw Exception(ErrorCodes::ILLEGAL_DIVISION, "Division by zero");
/// Division by min negative value.
if (std::is_signed_v<B> && b == std::numeric_limits<B>::lowest())

Some files were not shown because too many files have changed in this diff Show More