Merge branch 'master' into space

This commit is contained in:
Robert Schulze 2023-05-25 19:56:20 +02:00 committed by GitHub
commit c4f91a1c45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
191 changed files with 4207 additions and 1773 deletions

2
.gitmodules vendored
View File

@ -267,7 +267,7 @@
url = https://github.com/ClickHouse/nats.c
[submodule "contrib/vectorscan"]
path = contrib/vectorscan
url = https://github.com/ClickHouse/vectorscan.git
url = https://github.com/VectorCamp/vectorscan.git
[submodule "contrib/c-ares"]
path = contrib/c-ares
url = https://github.com/ClickHouse/c-ares

View File

@ -23,7 +23,6 @@ curl https://clickhouse.com/ | sh
## Upcoming Events
* [**v23.5 Release Webinar**](https://clickhouse.com/company/events/v23-5-release-webinar?utm_source=github&utm_medium=social&utm_campaign=release-webinar-2023-05) - May 31 - 23.5 is rapidly approaching. Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release.
* [**ClickHouse Meetup in Berlin**](https://www.meetup.com/clickhouse-berlin-user-group/events/292892466) - May 16
* [**ClickHouse Meetup in Barcelona**](https://www.meetup.com/clickhouse-barcelona-user-group/events/292892669) - May 25
* [**ClickHouse Meetup in London**](https://www.meetup.com/clickhouse-london-user-group/events/292892824) - May 25
* [**ClickHouse Meetup in San Francisco**](https://www.meetup.com/clickhouse-silicon-valley-meetup-group/events/293426725/) - Jun 7

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit ecccfc026a42b30023289410a67024d561f4bf3e
Subproject commit ca02358dcc7ce3ab733dd4cbcc32734eecfa4ee3

2
contrib/aws-c-auth vendored

@ -1 +1 @@
Subproject commit 30df6c407e2df43bd244e2c34c9b4a4b87372bfb
Subproject commit 97133a2b5dbca1ccdf88cd6f44f39d0531d27d12

@ -1 +1 @@
Subproject commit 324fd1d973ccb25c813aa747bf1759cfde5121c5
Subproject commit 45dcb2849c891dba2100b270b4676765c92949ff

@ -1 +1 @@
Subproject commit 39bfa94a14b7126bf0c1330286ef8db452d87e66
Subproject commit 2f9b60c42f90840ec11822acda3d8cdfa97a773d

2
contrib/aws-c-http vendored

@ -1 +1 @@
Subproject commit 2c5a2a7d5556600b9782ffa6c9d7e09964df1abc
Subproject commit dd34461987947672444d0bc872c5a733dfdb9711

2
contrib/aws-c-io vendored

@ -1 +1 @@
Subproject commit 5d32c453560d0823df521a686bf7fbacde7f9be3
Subproject commit d58ed4f272b1cb4f89ac9196526ceebe5f2b0d89

2
contrib/aws-c-mqtt vendored

@ -1 +1 @@
Subproject commit 882c689561a3db1466330ccfe3b63637e0a575d3
Subproject commit 33c3455cec82b16feb940e12006cefd7b3ef4194

2
contrib/aws-c-s3 vendored

@ -1 +1 @@
Subproject commit a41255ece72a7c887bba7f9d998ca3e14f4c8a1b
Subproject commit d7bfe602d6925948f1fff95784e3613cca6a3900

@ -1 +1 @@
Subproject commit 25bf5cf225f977c3accc6a05a0a7a181ef2a4a30
Subproject commit 208a701fa01e99c7c8cc3dcebc8317da71362972

@ -1 +1 @@
Subproject commit 48e7c0e01479232f225c8044d76c84e74192889d
Subproject commit ad53be196a25bbefa3700a01187fdce573a7d2d0

View File

@ -52,8 +52,8 @@ endif()
# Directories.
SET(AWS_SDK_DIR "${ClickHouse_SOURCE_DIR}/contrib/aws")
SET(AWS_SDK_CORE_DIR "${AWS_SDK_DIR}/aws-cpp-sdk-core")
SET(AWS_SDK_S3_DIR "${AWS_SDK_DIR}/aws-cpp-sdk-s3")
SET(AWS_SDK_CORE_DIR "${AWS_SDK_DIR}/src/aws-cpp-sdk-core")
SET(AWS_SDK_S3_DIR "${AWS_SDK_DIR}/generated/src/aws-cpp-sdk-s3")
SET(AWS_AUTH_DIR "${ClickHouse_SOURCE_DIR}/contrib/aws-c-auth")
SET(AWS_CAL_DIR "${ClickHouse_SOURCE_DIR}/contrib/aws-c-cal")
@ -118,7 +118,7 @@ configure_file("${AWS_SDK_CORE_DIR}/include/aws/core/SDKConfig.h.in"
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MAJOR=1")
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MINOR=10")
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_PATCH=36")
list(APPEND AWS_SOURCES ${AWS_SDK_CORE_SRC} ${AWS_SDK_CORE_NET_SRC} ${AWS_SDK_CORE_PLATFORM_SRC})
list(APPEND AWS_PUBLIC_INCLUDES

2
contrib/aws-crt-cpp vendored

@ -1 +1 @@
Subproject commit ec0bea288f451d884c0d80d534bc5c66241c39a4
Subproject commit 8a301b7e842f1daed478090c869207300972379f

2
contrib/aws-s2n-tls vendored

@ -1 +1 @@
Subproject commit 0f1ba9e5c4a67cb3898de0c0b4f911d4194dc8de
Subproject commit 71f4794b7580cf780eb4aca77d69eded5d3c7bb4

2
contrib/libpqxx vendored

@ -1 +1 @@
Subproject commit a4e834839270a8c1f7ff1db351ba85afced3f0e2
Subproject commit bdd6540fb95ff56c813691ceb5da5a3266cf235d

View File

@ -5,8 +5,8 @@ echo "Using sparse checkout for aws"
FILES_TO_CHECKOUT=$(git rev-parse --git-dir)/info/sparse-checkout
echo '/*' > $FILES_TO_CHECKOUT
echo '!/*/*' >> $FILES_TO_CHECKOUT
echo '/aws-cpp-sdk-core/*' >> $FILES_TO_CHECKOUT
echo '/aws-cpp-sdk-s3/*' >> $FILES_TO_CHECKOUT
echo '/src/aws-cpp-sdk-core/*' >> $FILES_TO_CHECKOUT
echo '/generated/src/aws-cpp-sdk-s3/*' >> $FILES_TO_CHECKOUT
git config core.sparsecheckout true
git checkout $1

2
contrib/vectorscan vendored

@ -1 +1 @@
Subproject commit 1f4d448314e581473103187765e4c949d01b4259
Subproject commit 38431d111781843741a781a57a6381a527d900a4

View File

@ -132,6 +132,9 @@ function run_tests()
ADDITIONAL_OPTIONS+=('--report-logs-stats')
clickhouse-test "00001_select_1" > /dev/null ||:
clickhouse-client -q "insert into system.zookeeper (name, path, value) values ('auxiliary_zookeeper2', '/test/chroot/', '')" ||:
set +e
clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --hung-check --print-time \
--test-runs "$NUM_TRIES" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \

View File

@ -65,6 +65,9 @@ sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
start
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log
@ -94,6 +97,9 @@ sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
start
clickhouse-client --query="SELECT 'Server version: ', version()"

View File

@ -177,11 +177,11 @@ You can pass parameters to `clickhouse-client` (all parameters have a default va
- `--user, -u` The username. Default value: default.
- `--password` The password. Default value: empty string.
- `--ask-password` - Prompt the user to enter a password.
- `--query, -q` The query to process when using non-interactive mode. You must specify either `query` or `queries-file` option.
- `--queries-file` file path with queries to execute. You must specify either `query` or `queries-file` option.
- `--database, -d` Select the current default database. Default value: the current database from the server settings (default by default).
- `--multiline, -m` If specified, allow multiline queries (do not send the query on Enter).
- `--query, -q` The query to process when using non-interactive mode. Cannot be used simultaneously with `--queries-file`.
- `--queries-file` file path with queries to execute. Cannot be used simultaneously with `--query`.
- `--multiquery, -n` If specified, multiple queries separated by semicolons can be listed after the `--query` option. For convenience, it is also possible to omit `--query` and pass the queries directly after `--multiquery`.
- `--multiline, -m` If specified, allow multiline queries (do not send the query on Enter).
- `--database, -d` Select the current default database. Default value: the current database from the server settings (default by default).
- `--format, -f` Use the specified default format to output the result.
- `--vertical, -E` If specified, use the [Vertical format](../interfaces/formats.md#vertical) by default to output the result. This is the same as `format=Vertical`. In this format, each value is printed on a separate line, which is helpful when displaying wide tables.
- `--time, -t` If specified, print the query execution time to stderr in non-interactive mode.

View File

@ -4220,3 +4220,12 @@ Possible values:
- false — Disallow.
Default value: `false`.
## zstd_window_log_max
Allows you to select the max window log of ZSTD (it will not be used for MergeTree family)
Type: Int64
Default: 0

View File

@ -5,16 +5,18 @@ This table contains profiling on processors level (that you can find in [`EXPLAI
Columns:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — The date when the event happened.
- `event_time` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — The date and time when the event happened.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — The date and time when the event happened.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — The date and time with microseconds precision when the event happened.
- `id` ([UInt64](../../sql-reference/data-types/int-uint.md)) — ID of processor
- `parent_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Parent processors IDs
- `plan_step` ([UInt64](../../sql-reference/data-types/int-uint.md)) — ID of the query plan step which created this processor. The value is zero if the processor was not added from any step.
- `plan_group` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Group of the processor if it was created by query plan step. A group is a logical partitioning of processors added from the same query plan step. Group is used only for beautifying the result of EXPLAIN PIPELINE result.
- `initial_query_id` ([String](../../sql-reference/data-types/string.md)) — ID of the initial query (for distributed query execution).
- `query_id` ([String](../../sql-reference/data-types/string.md)) — ID of the query
- `name` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — Name of the processor.
- `elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was executed.
- `input_wait_elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was waiting for data (from other processor).
- `output_wait_elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was waiting because output port was full.
- `plan_step` ([UInt64](../../sql-reference/data-types/int-uint.md)) — ID of the query plan step which created this processor. The value is zero if the processor was not added from any step.
- `plan_group` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Group of the processor if it was created by query plan step. A group is a logical partitioning of processors added from the same query plan step. Group is used only for beautifying the result of EXPLAIN PIPELINE result.
- `input_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The number of rows consumed by processor.
- `input_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The number of bytes consumed by processor.
- `output_rows` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The number of rows generated by processor.

View File

@ -59,9 +59,10 @@ Columns:
- `query_kind` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — Type of the query.
- `databases` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the databases present in the query.
- `tables` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the tables present in the query.
- `views` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the (materialized or live) views present in the query.
- `columns` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the columns present in the query.
- `partitions` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the partitions present in the query.
- `projections` ([String](../../sql-reference/data-types/string.md)) — Names of the projections used during the query execution.
- `views` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — Names of the (materialized or live) views present in the query.
- `exception_code` ([Int32](../../sql-reference/data-types/int-uint.md)) — Code of an exception.
- `exception` ([String](../../sql-reference/data-types/string.md)) — Exception message.
- `stack_trace` ([String](../../sql-reference/data-types/string.md)) — [Stack trace](https://en.wikipedia.org/wiki/Stack_trace). An empty string, if the query was completed successfully.

View File

@ -183,12 +183,12 @@ Arguments:
- `-S`, `--structure` — table structure for input data.
- `--input-format` — input format, `TSV` by default.
- `-f`, `--file` — path to data, `stdin` by default.
- `-q`, `--query` — queries to execute with `;` as delimiter. You must specify either `query` or `queries-file` option.
- `--queries-file` - file path with queries to execute. You must specify either `query` or `queries-file` option.
- `-q`, `--query` — queries to execute with `;` as delimiter. Cannot be used simultaneously with `--queries-file`.
- `--queries-file` - file path with queries to execute. Cannot be used simultaneously with `--query`.
- `--multiquery, -n` If specified, multiple queries separated by semicolons can be listed after the `--query` option. For convenience, it is also possible to omit `--query` and pass the queries directly after `--multiquery`.
- `-N`, `--table` — table name where to put output data, `table` by default.
- `--format`, `--output-format` — output format, `TSV` by default.
- `-d`, `--database` — default database, `_local` by default.
- `--multiquery, -n` If specified, multiple queries separated by semicolons can be listed after the `--query` option. For convenience, it is also possible to omit `--query` and pass the queries directly after `--multiquery`.
- `--stacktrace` — whether to dump debug output in case of exception.
- `--echo` — print query before execution.
- `--verbose` — more details on query execution.

View File

@ -1,48 +0,0 @@
---
slug: /en/sql-reference/aggregate-functions/reference/greatest
title: greatest
---
Aggregate function that returns the greatest across a list of values. All of the list members must be of comparable types.
Examples:
```sql
SELECT
toTypeName(greatest(toUInt8(1), 2, toUInt8(3), 3.)),
greatest(1, 2, toUInt8(3), 3.)
```
```response
┌─toTypeName(greatest(toUInt8(1), 2, toUInt8(3), 3.))─┬─greatest(1, 2, toUInt8(3), 3.)─┐
│ Float64 │ 3 │
└─────────────────────────────────────────────────────┴────────────────────────────────┘
```
:::note
The type returned is a Float64 as the UInt8 must be promoted to 64 bit for the comparison.
:::
```sql
SELECT greatest(['hello'], ['there'], ['world'])
```
```response
┌─greatest(['hello'], ['there'], ['world'])─┐
│ ['world'] │
└───────────────────────────────────────────┘
```
```sql
SELECT greatest(toDateTime32(now() + toIntervalDay(1)), toDateTime64(now(), 3))
```
```response
┌─greatest(toDateTime32(plus(now(), toIntervalDay(1))), toDateTime64(now(), 3))─┐
│ 2023-05-12 01:16:59.000 │
└──---──────────────────────────────────────────────────────────────────────────┘
```
:::note
The type returned is a DateTime64 as the DataTime32 must be promoted to 64 bit for the comparison.
:::
Also see [least](/docs/en/sql-reference/aggregate-functions/reference/least.md).

View File

@ -3,7 +3,7 @@ slug: /en/sql-reference/aggregate-functions/reference/last_value
sidebar_position: 8
---
# first_value
# last_value
Selects the last encountered value, similar to `anyLast`, but could accept NULL.

View File

@ -1,48 +0,0 @@
---
slug: /en/sql-reference/aggregate-functions/reference/least
title: least
---
Aggregate function that returns the least across a list of values. All of the list members must be of comparable types.
Examples:
```sql
SELECT
toTypeName(least(toUInt8(1), 2, toUInt8(3), 3.)),
least(1, 2, toUInt8(3), 3.)
```
```response
┌─toTypeName(least(toUInt8(1), 2, toUInt8(3), 3.))─┬─least(1, 2, toUInt8(3), 3.)─┐
│ Float64 │ 1 │
└──────────────────────────────────────────────────┴─────────────────────────────┘
```
:::note
The type returned is a Float64 as the UInt8 must be promoted to 64 bit for the comparison.
:::
```sql
SELECT least(['hello'], ['there'], ['world'])
```
```response
┌─least(['hello'], ['there'], ['world'])─┐
│ ['hello'] │
└────────────────────────────────────────┘
```
```sql
SELECT least(toDateTime32(now() + toIntervalDay(1)), toDateTime64(now(), 3))
```
```response
┌─least(toDateTime32(plus(now(), toIntervalDay(1))), toDateTime64(now(), 3))─┐
│ 2023-05-12 01:16:59.000 │
└────────────────────────────────────────────────────────────────────────────┘
```
:::note
The type returned is a DateTime64 as the DataTime32 must be promoted to 64 bit for the comparison.
:::
Also see [greatest](/docs/en/sql-reference/aggregate-functions/reference/greatest.md).

View File

@ -865,16 +865,34 @@ LIFETIME(3600);
The key must have only one `String` type attribute that contains an allowed IP prefix. Other types are not supported yet.
For queries, you must use the same functions (`dictGetT` with a tuple) as for dictionaries with composite keys. The syntax is:
The syntax is:
``` sql
dictGetT('dict_name', 'attr_name', tuple(ip))
dictGetT('dict_name', 'attr_name', ip)
```
The function takes either `UInt32` for IPv4, or `FixedString(16)` for IPv6. For example:
``` sql
select dictGet('my_ip_trie_dictionary', 'asn', tuple(IPv6StringToNum('2001:db8::1')))
SELECT dictGet('my_ip_trie_dictionary', 'cca2', toIPv4('202.79.32.10')) AS result;
┌─result─┐
│ NP │
└────────┘
SELECT dictGet('my_ip_trie_dictionary', 'asn', IPv6StringToNum('2001:db8::1')) AS result;
┌─result─┐
│ 65536 │
└────────┘
SELECT dictGet('my_ip_trie_dictionary', ('asn', 'cca2'), IPv6StringToNum('2001:db8::1')) AS result;
┌─result───────┐
│ (65536,'ZZ') │
└──────────────┘
```
Other types are not supported yet. The function returns the attribute for the prefix that corresponds to this IP address. If there are overlapping prefixes, the most specific one is returned.

View File

@ -152,3 +152,85 @@ FROM LEFT_RIGHT
│ 4 │ ᴺᵁᴸᴸ │ Both equal │
└──────┴───────┴──────────────────┘
```
## greatest
Returns the greatest across a list of values. All of the list members must be of comparable types.
Examples:
```sql
SELECT greatest(1, 2, toUInt8(3), 3.) result, toTypeName(result) type;
```
```response
┌─result─┬─type────┐
│ 3 │ Float64 │
└────────┴─────────┘
```
:::note
The type returned is a Float64 as the UInt8 must be promoted to 64 bit for the comparison.
:::
```sql
SELECT greatest(['hello'], ['there'], ['world'])
```
```response
┌─greatest(['hello'], ['there'], ['world'])─┐
│ ['world'] │
└───────────────────────────────────────────┘
```
```sql
SELECT greatest(toDateTime32(now() + toIntervalDay(1)), toDateTime64(now(), 3))
```
```response
┌─greatest(toDateTime32(plus(now(), toIntervalDay(1))), toDateTime64(now(), 3))─┐
│ 2023-05-12 01:16:59.000 │
└──---──────────────────────────────────────────────────────────────────────────┘
```
:::note
The type returned is a DateTime64 as the DataTime32 must be promoted to 64 bit for the comparison.
:::
## least
Returns the least across a list of values. All of the list members must be of comparable types.
Examples:
```sql
SELECT least(1, 2, toUInt8(3), 3.) result, toTypeName(result) type;
```
```response
┌─result─┬─type────┐
│ 1 │ Float64 │
└────────┴─────────┘
```
:::note
The type returned is a Float64 as the UInt8 must be promoted to 64 bit for the comparison.
:::
```sql
SELECT least(['hello'], ['there'], ['world'])
```
```response
┌─least(['hello'], ['there'], ['world'])─┐
│ ['hello'] │
└────────────────────────────────────────┘
```
```sql
SELECT least(toDateTime32(now() + toIntervalDay(1)), toDateTime64(now(), 3))
```
```response
┌─least(toDateTime32(plus(now(), toIntervalDay(1))), toDateTime64(now(), 3))─┐
│ 2023-05-12 01:16:59.000 │
└────────────────────────────────────────────────────────────────────────────┘
```
:::note
The type returned is a DateTime64 as the DataTime32 must be promoted to 64 bit for the comparison.
:::

File diff suppressed because one or more lines are too long

View File

@ -2,11 +2,10 @@
slug: /en/sql-reference/statements/create/function
sidebar_position: 38
sidebar_label: FUNCTION
title: "CREATE FUNCTION -user defined function (UDF)"
---
# CREATE FUNCTION - user defined function (UDF)
Creates a user defined function from a lambda expression. The expression must consist of function parameters, constants, operators, or other function calls.
Creates a user defined function (UDF) from a lambda expression. The expression must consist of function parameters, constants, operators, or other function calls.
**Syntax**

View File

@ -544,6 +544,54 @@ Result:
└─────┴──────────┴───────┘
```
##Filling grouped by sorting prefix
It can be useful to fill rows which have the same values in particular columns independently, - a good example is filling missing values in time series.
Assume there is the following time series table
``` sql
CREATE TABLE timeseries
(
`sensor_id` UInt64,
`timestamp` DateTime64(3, 'UTC'),
`value` Float64
)
ENGINE = Memory;
SELECT * FROM timeseries;
┌─sensor_id─┬───────────────timestamp─┬─value─┐
│ 234 │ 2021-12-01 00:00:03.000 │ 3 │
│ 432 │ 2021-12-01 00:00:01.000 │ 1 │
│ 234 │ 2021-12-01 00:00:07.000 │ 7 │
│ 432 │ 2021-12-01 00:00:05.000 │ 5 │
└───────────┴─────────────────────────┴───────┘
```
And we'd like to fill missing values for each sensor independently with 1 second interval.
The way to achieve it is to use `sensor_id` column as sorting prefix for filling column `timestamp`
```
SELECT *
FROM timeseries
ORDER BY
sensor_id,
timestamp WITH FILL
INTERPOLATE ( value AS 9999 )
┌─sensor_id─┬───────────────timestamp─┬─value─┐
│ 234 │ 2021-12-01 00:00:03.000 │ 3 │
│ 234 │ 2021-12-01 00:00:04.000 │ 9999 │
│ 234 │ 2021-12-01 00:00:05.000 │ 9999 │
│ 234 │ 2021-12-01 00:00:06.000 │ 9999 │
│ 234 │ 2021-12-01 00:00:07.000 │ 7 │
│ 432 │ 2021-12-01 00:00:01.000 │ 1 │
│ 432 │ 2021-12-01 00:00:02.000 │ 9999 │
│ 432 │ 2021-12-01 00:00:03.000 │ 9999 │
│ 432 │ 2021-12-01 00:00:04.000 │ 9999 │
│ 432 │ 2021-12-01 00:00:05.000 │ 5 │
└───────────┴─────────────────────────┴───────┘
```
Here, the `value` column was interpolated with `9999` just to make filled rows more noticeable
This behavior is controlled by setting `use_with_fill_by_sorting_prefix` (enabled by default)
## Related content
- Blog: [Working with time series data in ClickHouse](https://clickhouse.com/blog/working-with-time-series-data-and-functions-ClickHouse)

View File

@ -13,7 +13,7 @@ sidebar_label: url
**Syntax**
``` sql
url(URL [,format] [,structure])
url(URL [,format] [,structure] [,headers])
```
**Parameters**
@ -21,6 +21,7 @@ url(URL [,format] [,structure])
- `URL` — HTTP or HTTPS server address, which can accept `GET` or `POST` requests (for `SELECT` or `INSERT` queries correspondingly). Type: [String](../../sql-reference/data-types/string.md).
- `format` — [Format](../../interfaces/formats.md#formats) of the data. Type: [String](../../sql-reference/data-types/string.md).
- `structure` — Table structure in `'UserID UInt64, Name String'` format. Determines column names and types. Type: [String](../../sql-reference/data-types/string.md).
- `headers` - Headers in `'headers('key1'='value1', 'key2'='value2')'` format. You can set headers for HTTP call.
**Returned value**
@ -31,7 +32,7 @@ A table with the specified format and structure and with data from the defined `
Getting the first 3 lines of a table that contains columns of `String` and [UInt32](../../sql-reference/data-types/int-uint.md) type from HTTP-server which answers in [CSV](../../interfaces/formats.md#csv) format.
``` sql
SELECT * FROM url('http://127.0.0.1:12345/', CSV, 'column1 String, column2 UInt32') LIMIT 3;
SELECT * FROM url('http://127.0.0.1:12345/', CSV, 'column1 String, column2 UInt32', headers('Accept'='text/csv; charset=utf-8')) LIMIT 3;
```
Inserting data from a `URL` into a table:

View File

@ -0,0 +1,62 @@
---
slug: /en/sql-reference/table-functions/urlCluster
sidebar_position: 55
sidebar_label: urlCluster
---
# urlCluster Table Function
Allows processing files from URL in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster, discloses asterics in URL file path, and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
urlCluster(cluster_name, URL, format, structure)
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- `URL` — HTTP or HTTPS server address, which can accept `GET` requests. Type: [String](../../sql-reference/data-types/string.md).
- `format` — [Format](../../interfaces/formats.md#formats) of the data. Type: [String](../../sql-reference/data-types/string.md).
- `structure` — Table structure in `'UserID UInt64, Name String'` format. Determines column names and types. Type: [String](../../sql-reference/data-types/string.md).
**Returned value**
A table with the specified format and structure and with data from the defined `URL`.
**Examples**
Getting the first 3 lines of a table that contains columns of `String` and [UInt32](../../sql-reference/data-types/int-uint.md) type from HTTP-server which answers in [CSV](../../interfaces/formats.md#csv) format.
1. Create a basic HTTP server using the standard Python 3 tools and start it:
```python
from http.server import BaseHTTPRequestHandler, HTTPServer
class CSVHTTPServer(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/csv')
self.end_headers()
self.wfile.write(bytes('Hello,1\nWorld,2\n', "utf-8"))
if __name__ == "__main__":
server_address = ('127.0.0.1', 12345)
HTTPServer(server_address, CSVHTTPServer).serve_forever()
```
``` sql
SELECT * FROM urlCluster('cluster_simple','http://127.0.0.1:12345', CSV, 'column1 String, column2 UInt32')
```
## Globs in URL
Patterns in curly brackets `{ }` are used to generate a set of shards or to specify failover addresses. Supported pattern types and examples see in the description of the [remote](remote.md#globs-in-addresses) function.
Character `|` inside patterns is used to specify failover addresses. They are iterated in the same order as listed in the pattern. The number of generated addresses is limited by [glob_expansion_max_elements](../../operations/settings/settings.md#glob_expansion_max_elements) setting.
**See Also**
- [HDFS engine](../../engines/table-engines/special/url.md)
- [URL table function](../../sql-reference/table-functions/url.md)

View File

@ -21,6 +21,7 @@ url(URL [,format] [,structure])
- `URL` — HTTP или HTTPS-адрес сервера, который может принимать запросы `GET` или `POST` (для запросов `SELECT` или `INSERT` соответственно). Тип: [String](../../sql-reference/data-types/string.md).
- `format` — [формат](../../interfaces/formats.md#formats) данных. Тип: [String](../../sql-reference/data-types/string.md).
- `structure` — структура таблицы в формате `'UserID UInt64, Name String'`. Определяет имена и типы столбцов. Тип: [String](../../sql-reference/data-types/string.md).
- `headers` - HTTP-заголовки в формате `'headers('key1'='value1', 'key2'='value2')'`. Определяет заголовки для HTTP вызова.
**Возвращаемое значение**
@ -31,7 +32,7 @@ url(URL [,format] [,structure])
Получение с HTTP-сервера первых 3 строк таблицы с данными в формате [CSV](../../interfaces/formats.md#csv), содержащей столбцы типа [String](../../sql-reference/data-types/string.md) и [UInt32](../../sql-reference/data-types/int-uint.md).
``` sql
SELECT * FROM url('http://127.0.0.1:12345/', CSV, 'column1 String, column2 UInt32') LIMIT 3;
SELECT * FROM url('http://127.0.0.1:12345/', CSV, 'column1 String, column2 UInt32', headers('Accept'='text/csv; charset=utf-8')) LIMIT 3;
```
Вставка данных в таблицу:

View File

@ -1,6 +1,7 @@
#include <Backups/BackupEntryFromAppendOnlyFile.h>
#include <Disks/IDisk.h>
#include <IO/LimitSeekableReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
namespace DB

View File

@ -1,5 +1,7 @@
#include <Backups/BackupEntryFromImmutableFile.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Disks/IDisk.h>
#include <city.h>
namespace DB

View File

@ -3,7 +3,7 @@
#include <Disks/IDisk.h>
#include <IO/copyData.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>

View File

@ -15,7 +15,7 @@
#include <IO/Archives/createArchiveWriter.h>
#include <IO/ConcatSeekableReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>

View File

@ -174,7 +174,7 @@ void HedgedConnections::sendQuery(
modified_settings.group_by_two_level_threshold_bytes = 0;
}
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas;
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas == 0;
if (offset_states.size() > 1 && enable_sample_offset_parallel_processing)
{

View File

@ -142,7 +142,7 @@ void MultiplexedConnections::sendQuery(
}
}
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas;
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas == 0;
size_t num_replicas = replica_states.size();
if (num_replicas > 1)

View File

@ -113,8 +113,8 @@ private:
void createFiber();
void destroyFiber();
Fiber fiber;
FiberStack fiber_stack;
Fiber fiber;
std::mutex fiber_lock;
std::exception_ptr exception;

View File

@ -3,6 +3,7 @@
#include <base/types.h>
#include <Common/Exception.h>
#include <Coordination/KeeperConstants.h>
#include <Poco/Net/SocketAddress.h>
#include <vector>
#include <memory>
@ -466,7 +467,7 @@ public:
/// Useful to check owner of ephemeral node.
virtual int64_t getSessionID() const = 0;
virtual String getConnectedAddress() const = 0;
virtual Poco::Net::SocketAddress getConnectedAddress() const = 0;
/// If the method will throw an exception, callbacks won't be called.
///

View File

@ -39,7 +39,7 @@ public:
bool isExpired() const override { return expired; }
int64_t getSessionID() const override { return 0; }
String getConnectedAddress() const override { return connected_zk_address; }
Poco::Net::SocketAddress getConnectedAddress() const override { return connected_zk_address; }
void create(
@ -127,7 +127,7 @@ private:
zkutil::ZooKeeperArgs args;
String connected_zk_address;
Poco::Net::SocketAddress connected_zk_address;
std::mutex push_request_mutex;
std::atomic<bool> expired{false};

View File

@ -112,11 +112,10 @@ void ZooKeeper::init(ZooKeeperArgs args_)
else
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
String address = impl->getConnectedAddress();
Poco::Net::SocketAddress address = impl->getConnectedAddress();
size_t colon_pos = address.find(':');
connected_zk_host = address.substr(0, colon_pos);
connected_zk_port = address.substr(colon_pos + 1);
connected_zk_host = address.host().toString();
connected_zk_port = address.port();
connected_zk_index = 0;
@ -124,7 +123,7 @@ void ZooKeeper::init(ZooKeeperArgs args_)
{
for (size_t i = 0; i < args.hosts.size(); i++)
{
if (args.hosts[i] == address)
if (args.hosts[i] == address.toString())
{
connected_zk_index = i;
break;

View File

@ -524,7 +524,7 @@ public:
void setServerCompletelyStarted();
String getConnectedZooKeeperHost() const { return connected_zk_host; }
String getConnectedZooKeeperPort() const { return connected_zk_port; }
UInt16 getConnectedZooKeeperPort() const { return connected_zk_port; }
size_t getConnectedZooKeeperIndex() const { return connected_zk_index; }
private:
@ -591,7 +591,7 @@ private:
ZooKeeperArgs args;
String connected_zk_host;
String connected_zk_port;
UInt16 connected_zk_port;
size_t connected_zk_index;
std::mutex mutex;

View File

@ -433,7 +433,7 @@ void ZooKeeper::connect(
}
connected = true;
connected_zk_address = node.address.toString();
connected_zk_address = node.address;
break;
}
@ -450,7 +450,7 @@ void ZooKeeper::connect(
if (!connected)
{
WriteBufferFromOwnString message;
connected_zk_address = "";
connected_zk_address = Poco::Net::SocketAddress();
message << "All connection tries failed while connecting to ZooKeeper. nodes: ";
bool first = true;

View File

@ -125,7 +125,7 @@ public:
/// Useful to check owner of ephemeral node.
int64_t getSessionID() const override { return session_id; }
String getConnectedAddress() const override { return connected_zk_address; }
Poco::Net::SocketAddress getConnectedAddress() const override { return connected_zk_address; }
void executeGenericRequest(
const ZooKeeperRequestPtr & request,
@ -203,7 +203,7 @@ public:
private:
ACLs default_acls;
String connected_zk_address;
Poco::Net::SocketAddress connected_zk_address;
zkutil::ZooKeeperArgs args;

View File

@ -6,6 +6,7 @@
#include <Common/Exception.h>
#include <Common/isLocalAddress.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/logger_useful.h>

View File

@ -63,7 +63,7 @@ namespace DB
\
M(Bool, disable_internal_dns_cache, false, "Disable internal DNS caching at all.", 0) \
M(Int32, dns_cache_update_period, 15, "Internal DNS cache update period in seconds.", 0) \
M(UInt32, dns_max_consecutive_failures, 1024, "Max server connections.", 0) \
M(UInt32, dns_max_consecutive_failures, 1024, "Max connection failures before dropping host from ClickHouse DNS cache.", 0) \
\
M(UInt64, max_table_size_to_drop, 50000000000lu, "If size of a table is greater than this value (in bytes) than table could not be dropped with any DROP query.", 0) \
M(UInt64, max_partition_size_to_drop, 50000000000lu, "Same as max_table_size_to_drop, but for the partitions.", 0) \

View File

@ -154,7 +154,7 @@ class IColumn;
M(ParallelReplicasCustomKeyFilterType, parallel_replicas_custom_key_filter_type, ParallelReplicasCustomKeyFilterType::DEFAULT, "Type of filter to use with custom key for parallel replicas. default - use modulo operation on the custom key, range - use range filter on custom key using all possible values for the value type of custom key.", 0) \
\
M(String, cluster_for_parallel_replicas, "default", "Cluster for a shard in which current server is located", 0) \
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
M(UInt64, allow_experimental_parallel_reading_from_replicas, 0, "Use all the replicas from a shard for SELECT query execution. Reading is parallelized and coordinated dynamically. 0 - disabled, 1 - enabled, silently disable them in case of failure, 2 - enabled, throw an exception in case of failure", 0) \
M(Float, parallel_replicas_single_task_marks_count_multiplier, 2, "A multiplier which will be added during calculation for minimal number of marks to retrieve from coordinator. This will be applied only for remote replicas.", 0) \
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
\
@ -729,6 +729,7 @@ class IColumn;
M(UInt64, http_max_request_param_data_size, 10_MiB, "Limit on size of request data used as a query parameter in predefined HTTP requests.", 0) \
M(Bool, function_json_value_return_type_allow_nullable, false, "Allow function JSON_VALUE to return nullable type.", 0) \
M(Bool, function_json_value_return_type_allow_complex, false, "Allow function JSON_VALUE to return complex type, such as: struct, array, map.", 0) \
M(Bool, use_with_fill_by_sorting_prefix, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \

View File

@ -82,6 +82,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{
{"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reader to reorder rows for better parallelism."},
{"parallelize_output_from_storages", false, true, "Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows."},
{"use_with_fill_by_sorting_prefix", false, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently"},
{"output_format_parquet_compliant_nested_types", false, true, "Change an internal field name in output Parquet file schema."}}},
{"23.4", {{"allow_suspicious_indices", true, false, "If true, index can defined with identical expressions"},
{"connect_timeout_with_failover_ms", 50, 1000, "Increase default connect timeout because of async connect"},

View File

@ -1455,7 +1455,16 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context,
}
if (query_ptr->as<const ASTDeleteQuery>() != nullptr)
return !is_keeper_map_table(query_ptr);
{
if (is_keeper_map_table(query_ptr))
return false;
/// If there is only 1 shard then there is no need to replicate DELETE query.
auto current_cluster = tryGetCluster();
return
!current_cluster || /// Couldn't get the cluster, so we don't know how many shards there are.
current_cluster->getShardsInfo().size() > 1;
}
return true;
}

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context_fwd.h>
#include <Core/Defines.h>
#include <Core/Names.h>
#include <base/types.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
@ -20,6 +21,7 @@
#include <boost/noncopyable.hpp>
#include <Poco/Timestamp.h>
#include <filesystem>
#include <sys/stat.h>
namespace fs = std::filesystem;

View File

@ -4,6 +4,7 @@
#include <vector>
#include <boost/noncopyable.hpp>
#include <Disks/IDisk.h>
#include <sys/types.h>
namespace DB
{

View File

@ -1,4 +1,4 @@
#include "AsynchronousReadIndirectBufferFromRemoteFS.h"
#include "AsynchronousBoundedReadBuffer.h"
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
@ -43,105 +43,77 @@ namespace ErrorCodes
}
AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRemoteFS(
AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
ImplPtr impl_,
IAsynchronousReader & reader_,
const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_)
AsyncReadCountersPtr async_read_counters_,
FilesystemReadPrefetchesLogPtr prefetches_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, impl(std::move(impl_))
, read_settings(settings_)
, reader(reader_)
, base_priority(settings_.priority)
, impl(impl_)
, prefetch_buffer(settings_.prefetch_buffer_size)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
#ifndef NDEBUG
, log(&Poco::Logger::get("AsynchronousBufferFromRemoteFS"))
#else
, log(&Poco::Logger::get("AsyncBuffer(" + impl->getFileName() + ")"))
#endif
, log(&Poco::Logger::get("AsynchronousBoundedReadBuffer"))
, async_read_counters(async_read_counters_)
, prefetches_log(prefetches_log_)
{
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
}
String AsynchronousReadIndirectBufferFromRemoteFS::getFileName() const
bool AsynchronousBoundedReadBuffer::hasPendingDataToRead()
{
return impl->getFileName();
}
String AsynchronousReadIndirectBufferFromRemoteFS::getInfoForLog()
{
return impl->getInfoForLog();
}
size_t AsynchronousReadIndirectBufferFromRemoteFS::getFileSize()
{
return impl->getFileSize();
}
bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
{
/**
* Note: read_until_position here can be std::nullopt only for non-MergeTree tables.
* For mergeTree tables it must be guaranteed that setReadUntilPosition() or
* setReadUntilEnd() is called before any read or prefetch.
* setReadUntilEnd() always sets read_until_position to file size.
* setReadUntilPosition(pos) always has pos > 0, because if
* right_offset_in_compressed_file is 0, then setReadUntilEnd() is used.
*/
if (read_until_position)
{
/// Everything is already read.
if (file_offset_of_buffer_end == *read_until_position)
if (file_offset_of_buffer_end == *read_until_position) /// Everything is already read.
return false;
if (file_offset_of_buffer_end > *read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {}, info: {})",
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Read beyond last offset ({} > {}, info: {})",
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
}
}
return true;
}
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size, int64_t priority)
std::future<IAsynchronousReader::Result>
AsynchronousBoundedReadBuffer::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, async_read_counters);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = base_priority + priority;
request.priority = read_settings.priority + priority;
request.ignore = bytes_to_ignore;
return reader.submit(request);
}
void AsynchronousReadIndirectBufferFromRemoteFS::prefetch(int64_t priority)
void AsynchronousBoundedReadBuffer::prefetch(int64_t priority)
{
if (prefetch_future.valid())
return;
/// Check boundary, which was set in readUntilPosition().
if (!hasPendingDataToRead())
return;
last_prefetch_info.submit_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
last_prefetch_info.submit_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
last_prefetch_info.priority = priority;
/// Prefetch even in case hasPendingData() == true.
chassert(prefetch_buffer.size() == read_settings.prefetch_buffer_size || prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
chassert(prefetch_buffer.size() == read_settings.prefetch_buffer_size
|| prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t position)
void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
{
if (!read_until_position || position != *read_until_position)
{
@ -157,21 +129,16 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos
}
}
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilEnd()
void AsynchronousBoundedReadBuffer::appendToPrefetchLog(
FilesystemPrefetchState state,
int64_t size,
const std::unique_ptr<Stopwatch> & execution_watch)
{
setReadUntilPosition(impl->getFileSize());
}
void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemPrefetchState state, int64_t size, const std::unique_ptr<Stopwatch> & execution_watch)
{
const auto & object = impl->getCurrentObject();
FilesystemReadPrefetchesLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.path = object.local_path,
.path = impl->getFileName(),
.offset = file_offset_of_buffer_end,
.size = size,
.prefetch_submit_time = last_prefetch_info.submit_time,
@ -187,7 +154,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemP
}
bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
bool AsynchronousBoundedReadBuffer::nextImpl()
{
if (!hasPendingDataToRead())
return false;
@ -245,14 +212,14 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
/// In case of multiple files for the same file in clickhouse (i.e. log family)
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
chassert(file_offset_of_buffer_end >= impl->getImplementationBufferOffset());
chassert(file_offset_of_buffer_end >= impl->getFileOffsetOfBufferEnd());
chassert(file_offset_of_buffer_end <= impl->getFileSize());
return bytes_read;
}
off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeks);
@ -268,7 +235,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
else
{
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence");
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Expected SEEK_SET or SEEK_CUR as whence");
}
/// Position is unchanged.
@ -322,9 +289,8 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
if (read_until_position && new_pos > *read_until_position)
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
impl->reset();
file_offset_of_buffer_end = new_pos = *read_until_position; /// read_until_position is a non-included boundary.
impl->seek(file_offset_of_buffer_end, SEEK_SET);
return new_pos;
}
@ -332,8 +298,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
* Note: we read in range [file_offset_of_buffer_end, read_until_position).
*/
if (impl->initialized()
&& read_until_position && new_pos < *read_until_position
if (read_until_position && new_pos < *read_until_position
&& new_pos > file_offset_of_buffer_end
&& new_pos < file_offset_of_buffer_end + read_settings.remote_read_min_bytes_for_seek)
{
@ -342,31 +307,21 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
else
{
if (impl->initialized())
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
impl->reset();
}
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
file_offset_of_buffer_end = new_pos;
impl->seek(file_offset_of_buffer_end, SEEK_SET);
}
return new_pos;
}
off_t AsynchronousReadIndirectBufferFromRemoteFS::getPosition()
{
return file_offset_of_buffer_end - available() + bytes_to_ignore;
}
void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
void AsynchronousBoundedReadBuffer::finalize()
{
resetPrefetch(FilesystemPrefetchState::UNNEEDED);
}
AsynchronousReadIndirectBufferFromRemoteFS::~AsynchronousReadIndirectBufferFromRemoteFS()
AsynchronousBoundedReadBuffer::~AsynchronousBoundedReadBuffer()
{
try
{
@ -378,7 +333,7 @@ AsynchronousReadIndirectBufferFromRemoteFS::~AsynchronousReadIndirectBufferFromR
}
}
void AsynchronousReadIndirectBufferFromRemoteFS::resetPrefetch(FilesystemPrefetchState state)
void AsynchronousBoundedReadBuffer::resetPrefetch(FilesystemPrefetchState state)
{
if (!prefetch_future.valid())
return;

View File

@ -0,0 +1,96 @@
#pragma once
#include "config.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/AsynchronousReader.h>
#include <IO/ReadSettings.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <utility>
namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
using AsyncReadCountersPtr = std::shared_ptr<AsyncReadCounters>;
class ReadBufferFromRemoteFSGather;
class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase
{
public:
using Impl = ReadBufferFromFileBase;
using ImplPtr = std::unique_ptr<Impl>;
explicit AsynchronousBoundedReadBuffer(
ImplPtr impl_,
IAsynchronousReader & reader_,
const ReadSettings & settings_,
AsyncReadCountersPtr async_read_counters_ = nullptr,
FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr);
~AsynchronousBoundedReadBuffer() override;
String getFileName() const override { return impl->getFileName(); }
size_t getFileSize() override { return impl->getFileSize(); }
String getInfoForLog() override { return impl->getInfoForLog(); }
off_t seek(off_t offset_, int whence) override;
void prefetch(int64_t priority) override;
void setReadUntilPosition(size_t position) override; /// [..., position).
void setReadUntilEnd() override { return setReadUntilPosition(getFileSize()); }
off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }
private:
const ImplPtr impl;
const ReadSettings read_settings;
IAsynchronousReader & reader;
size_t file_offset_of_buffer_end = 0;
std::optional<size_t> read_until_position;
/// If nonzero then working_buffer is empty.
/// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes.
size_t bytes_to_ignore = 0;
Memory<> prefetch_buffer;
std::future<IAsynchronousReader::Result> prefetch_future;
const std::string query_id;
const std::string current_reader_id;
Poco::Logger * log;
AsyncReadCountersPtr async_read_counters;
FilesystemReadPrefetchesLogPtr prefetches_log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;
size_t priority = 0;
};
LastPrefetchInfo last_prefetch_info;
bool nextImpl() override;
void finalize();
bool hasPendingDataToRead();
void appendToPrefetchLog(
FilesystemPrefetchState state,
int64_t size,
const std::unique_ptr<Stopwatch> & execution_watch);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
void resetPrefetch(FilesystemPrefetchState state);
};
}

View File

@ -1,111 +0,0 @@
#pragma once
#include "config.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/AsynchronousReader.h>
#include <IO/ReadSettings.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <utility>
namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
class ReadBufferFromRemoteFSGather;
/**
* Reads data from S3/HDFS/Web using stored paths in metadata.
* This class is an asynchronous version of ReadIndirectBufferFromRemoteFS.
*
* Buffers chain for diskS3:
* AsynchronousIndirectReadBufferFromRemoteFS -> ReadBufferFromRemoteFS ->
* -> ReadBufferFromS3 -> ReadBufferFromIStream.
*
* Buffers chain for diskWeb:
* AsynchronousIndirectReadBufferFromRemoteFS -> ReadBufferFromRemoteFS ->
* -> ReadIndirectBufferFromWebServer -> ReadBufferFromHTTP -> ReadBufferFromIStream.
*
* We pass either `memory` or `prefetch_buffer` through all this chain and return it back.
*/
class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{
public:
explicit AsynchronousReadIndirectBufferFromRemoteFS(
IAsynchronousReader & reader_, const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_);
~AsynchronousReadIndirectBufferFromRemoteFS() override;
off_t seek(off_t offset_, int whence) override;
off_t getPosition() override;
String getFileName() const override;
void prefetch(int64_t priority) override;
void setReadUntilPosition(size_t position) override; /// [..., position).
void setReadUntilEnd() override;
String getInfoForLog() override;
size_t getFileSize() override;
bool isIntegratedWithFilesystemCache() const override { return true; }
private:
bool nextImpl() override;
void finalize();
bool hasPendingDataToRead();
void appendToPrefetchLog(FilesystemPrefetchState state, int64_t size, const std::unique_ptr<Stopwatch> & execution_watch);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
void resetPrefetch(FilesystemPrefetchState state);
ReadSettings read_settings;
IAsynchronousReader & reader;
int64_t base_priority;
std::shared_ptr<ReadBufferFromRemoteFSGather> impl;
std::future<IAsynchronousReader::Result> prefetch_future;
size_t file_offset_of_buffer_end = 0;
Memory<> prefetch_buffer;
std::string query_id;
std::string current_reader_id;
/// If nonzero then working_buffer is empty.
/// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes.
size_t bytes_to_ignore = 0;
std::optional<size_t> read_until_position;
Poco::Logger * log;
std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;
size_t priority = 0;
};
LastPrefetchInfo last_prefetch_info;
};
}

View File

@ -5,6 +5,7 @@
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
#include <Common/logger_useful.h>
#include <IO/SwapHelper.h>
#include <iostream>
#include <base/hex.h>
#include <Interpreters/FilesystemCacheLog.h>
@ -12,22 +13,24 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
}
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBuffer(nullptr, 0)
, read_buffer_creator(std::move(read_buffer_creator_))
, blobs_to_read(blobs_to_read_)
: ReadBufferFromFileBase(0, nullptr, 0)
, settings(settings_)
, blobs_to_read(blobs_to_read_)
, read_buffer_creator(std::move(read_buffer_creator_))
, cache_log(settings.enable_filesystem_cache_log ? cache_log_ : nullptr)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
{
if (cache_log_ && settings.enable_filesystem_cache_log)
cache_log = cache_log_;
if (!blobs_to_read.empty())
current_object = blobs_to_read.front();
@ -38,13 +41,12 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
{
if (current_buf != nullptr && !with_cache)
if (current_buf && !with_cache)
{
appendFilesystemCacheLog();
appendUncachedReadInfo();
}
current_object = object;
total_bytes_read_from_current_file = 0;
const auto & object_path = object.remote_path;
size_t current_read_until_position = read_until_position ? read_until_position : object.bytes_size;
@ -70,7 +72,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
return current_read_buffer_creator();
}
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
void ReadBufferFromRemoteFSGather::appendUncachedReadInfo()
{
if (!cache_log || current_object.remote_path.empty())
return;
@ -82,7 +84,7 @@ void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
.source_file_path = current_object.remote_path,
.file_segment_range = { 0, current_object.bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_size = total_bytes_read_from_current_file,
.file_segment_size = current_object.bytes_size,
.read_from_cache_attempted = false,
};
cache_log->add(elem);
@ -174,7 +176,7 @@ bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
bool ReadBufferFromRemoteFSGather::readImpl()
{
swap(*current_buf);
SwapHelper swap(*this, *current_buf);
bool result = false;
@ -185,7 +187,6 @@ bool ReadBufferFromRemoteFSGather::readImpl()
*/
if (bytes_to_ignore)
{
total_bytes_read_from_current_file += bytes_to_ignore;
current_buf->ignore(bytes_to_ignore);
result = current_buf->hasPendingData();
file_offset_of_buffer_end += bytes_to_ignore;
@ -205,57 +206,41 @@ bool ReadBufferFromRemoteFSGather::readImpl()
file_offset_of_buffer_end += current_buf->available();
}
swap(*current_buf);
/// Required for non-async reads.
if (result)
{
assert(available());
nextimpl_working_buffer_offset = offset();
total_bytes_read_from_current_file += available();
assert(current_buf->available());
nextimpl_working_buffer_offset = current_buf->offset();
}
return result;
}
size_t ReadBufferFromRemoteFSGather::getFileOffsetOfBufferEnd() const
{
return file_offset_of_buffer_end;
}
void ReadBufferFromRemoteFSGather::setReadUntilPosition(size_t position)
{
if (position != read_until_position)
{
read_until_position = position;
reset();
}
if (position == read_until_position)
return;
reset();
read_until_position = position;
}
void ReadBufferFromRemoteFSGather::reset()
{
current_object = {};
current_buf_idx = {};
current_buf.reset();
bytes_to_ignore = 0;
}
String ReadBufferFromRemoteFSGather::getFileName() const
off_t ReadBufferFromRemoteFSGather::seek(off_t offset, int whence)
{
return current_object.remote_path;
}
if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only seeking with SEEK_SET is allowed");
size_t ReadBufferFromRemoteFSGather::getFileSize() const
{
size_t size = 0;
for (const auto & object : blobs_to_read)
size += object.bytes_size;
return size;
}
String ReadBufferFromRemoteFSGather::getInfoForLog()
{
if (!current_buf)
return "";
return current_buf->getInfoForLog();
reset();
file_offset_of_buffer_end = offset;
return file_offset_of_buffer_end;
}
size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
@ -269,7 +254,7 @@ size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
ReadBufferFromRemoteFSGather::~ReadBufferFromRemoteFSGather()
{
if (!with_cache)
appendFilesystemCacheLog();
appendUncachedReadInfo();
}
}

View File

@ -10,12 +10,13 @@ namespace Poco { class Logger; }
namespace DB
{
class FilesystemCacheLog;
/**
* Remote disk might need to split one clickhouse file into multiple files in remote fs.
* This class works like a proxy to allow transition from one file into multiple.
*/
class ReadBufferFromRemoteFSGather final : public ReadBuffer
class ReadBufferFromRemoteFSGather final : public ReadBufferFromFileBase
{
friend class ReadIndirectBufferFromRemoteFS;
@ -30,25 +31,25 @@ public:
~ReadBufferFromRemoteFSGather() override;
String getFileName() const;
String getFileName() const override { return current_object.remote_path; }
void reset();
String getInfoForLog() override { return current_buf ? current_buf->getInfoForLog() : ""; }
void setReadUntilPosition(size_t position) override;
IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore) override;
size_t getFileSize() const;
size_t getFileSize() override { return getTotalSize(blobs_to_read); }
size_t getFileOffsetOfBufferEnd() const;
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
bool initialized() const { return current_buf != nullptr; }
String getInfoForLog();
size_t getImplementationBufferOffset() const;
const StoredObject & getCurrentObject() const { return current_object; }
off_t seek(off_t offset, int whence) override;
off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }
private:
SeekableReadBufferPtr createImplementationBuffer(const StoredObject & object);
@ -61,40 +62,26 @@ private:
bool moveToNextBuffer();
void appendFilesystemCacheLog();
void appendUncachedReadInfo();
ReadBufferCreator read_buffer_creator;
StoredObjects blobs_to_read;
ReadSettings settings;
size_t read_until_position = 0;
StoredObject current_object;
void reset();
const ReadSettings settings;
const StoredObjects blobs_to_read;
const ReadBufferCreator read_buffer_creator;
const std::shared_ptr<FilesystemCacheLog> cache_log;
const String query_id;
bool with_cache;
String query_id;
Poco::Logger * log;
SeekableReadBufferPtr current_buf;
size_t current_buf_idx = 0;
size_t read_until_position = 0;
size_t file_offset_of_buffer_end = 0;
/**
* File: |___________________|
* Buffer: |~~~~~~~|
* file_offset_of_buffer_end: ^
*/
size_t bytes_to_ignore = 0;
size_t total_bytes_read_from_current_file = 0;
StoredObject current_object;
size_t current_buf_idx = 0;
SeekableReadBufferPtr current_buf;
std::shared_ptr<FilesystemCacheLog> cache_log;
Poco::Logger * log;
};
}

View File

@ -82,7 +82,7 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
else
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET or SEEK_CUR modes are allowed.");
impl->reset();
impl->seek(impl->file_offset_of_buffer_end, SEEK_SET);
resetWorkingBuffer();
file_offset_of_buffer_end = impl->file_offset_of_buffer_end;

View File

@ -31,8 +31,6 @@ public:
void setReadUntilEnd() override;
bool isIntegratedWithFilesystemCache() const override { return true; }
size_t getFileSize() override;
private:

View File

@ -8,6 +8,7 @@
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Interpreters/Context.h>
@ -112,8 +113,8 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
if (disk_read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(reader_impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(reader_impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -5,7 +5,6 @@
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/MultiVersion.h>

View File

@ -2,6 +2,7 @@
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <base/getFQDNOrHostName.h>
#include <future>
namespace DB
{

View File

@ -7,7 +7,6 @@
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Common/getRandomASCIIString.h>

View File

@ -3,6 +3,7 @@
#include <Common/getRandomASCIIString.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/copyData.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/Context.h>

View File

@ -12,12 +12,14 @@
#include <Common/Exception.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
#include <IO/copyData.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/StoredObject.h>
#include <Disks/DiskType.h>
#include <Common/ThreadPool_fwd.h>
#include <Disks/WriteMode.h>
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
namespace DB

View File

@ -7,6 +7,7 @@
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
@ -63,12 +64,12 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
global_context->getFilesystemCacheLog());
/// We use `remove_fs_method` (not `local_fs_method`) because we are about to use
/// AsynchronousReadIndirectBufferFromRemoteFS which works by the remote_fs_* settings.
/// AsynchronousBoundedReadBuffer which works by the remote_fs_* settings.
if (modified_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, modified_settings, std::move(impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, modified_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -6,7 +6,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <IO/WriteBufferFromS3.h>
@ -127,8 +127,8 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(s3_impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(s3_impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -0,0 +1,14 @@
#include <Disks/ObjectStorages/StoredObject.h>
namespace DB
{
size_t getTotalSize(const StoredObjects & objects)
{
size_t size = 0;
for (const auto & object : objects)
size += object.bytes_size;
return size;
}
}

View File

@ -29,4 +29,6 @@ struct StoredObject
using StoredObjects = std::vector<StoredObject>;
size_t getTotalSize(const StoredObjects & objects);
}

View File

@ -9,6 +9,7 @@
#include <IO/WriteHelpers.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ReadBufferFromWebServer.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
@ -189,8 +190,8 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, read_settings, std::move(web_impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(web_impl), reader, read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -27,5 +27,6 @@ struct AsyncReadCounters
void dumpToMapColumn(IColumn * column) const;
};
using AsyncReadCountersPtr = std::shared_ptr<AsyncReadCounters>;
}

View File

@ -27,8 +27,6 @@ public:
ReadBuffer & getWrappedReadBuffer() { return *impl; }
bool isIntegratedWithFilesystemCache() const override { return impl->isIntegratedWithFilesystemCache(); }
size_t getFileSize() override;
protected:

View File

@ -49,8 +49,6 @@ public:
/// If true, setReadUntilPosition() guarantees that eof will be reported at the given position.
virtual bool supportsRightBoundedReads() const { return false; }
virtual bool isIntegratedWithFilesystemCache() const { return false; }
/// Returns true if seek() actually works, false if seek() will always throw (or make subsequent
/// nextImpl() calls throw).
///

View File

@ -1386,6 +1386,20 @@ void Context::addQueryAccessInfo(
query_access_info.views.emplace(view_name);
}
void Context::addQueryAccessInfo(const Names & partition_names)
{
if (isGlobalContext())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have query access info");
}
std::lock_guard<std::mutex> lock(query_access_info.mutex);
for (const auto & partition_name : partition_names)
{
query_access_info.partitions.emplace(partition_name);
}
}
void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String & created_object) const
{
if (isGlobalContext())
@ -2796,11 +2810,7 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
std::map<String, zkutil::ZooKeeperPtr> Context::getAuxiliaryZooKeepers() const
{
std::lock_guard lock(shared->auxiliary_zookeepers_mutex);
if (!shared->auxiliary_zookeepers.empty())
return shared->auxiliary_zookeepers;
else
return std::map<String, zkutil::ZooKeeperPtr>();
return shared->auxiliary_zookeepers;
}
#if USE_ROCKSDB
@ -4314,7 +4324,7 @@ Context::ParallelReplicasMode Context::getParallelReplicasMode() const
if (!settings_.parallel_replicas_custom_key.value.empty())
return CUSTOM_KEY;
if (settings_.allow_experimental_parallel_reading_from_replicas
if (settings_.allow_experimental_parallel_reading_from_replicas > 0
&& !settings_.use_hedged_requests)
return READ_TASKS;

View File

@ -295,6 +295,7 @@ private:
databases = rhs.databases;
tables = rhs.tables;
columns = rhs.columns;
partitions = rhs.partitions;
projections = rhs.projections;
views = rhs.views;
}
@ -312,6 +313,7 @@ private:
std::swap(databases, rhs.databases);
std::swap(tables, rhs.tables);
std::swap(columns, rhs.columns);
std::swap(partitions, rhs.partitions);
std::swap(projections, rhs.projections);
std::swap(views, rhs.views);
}
@ -321,6 +323,7 @@ private:
std::set<std::string> databases{};
std::set<std::string> tables{};
std::set<std::string> columns{};
std::set<std::string> partitions{};
std::set<std::string> projections{};
std::set<std::string> views{};
};
@ -629,6 +632,7 @@ public:
const Names & column_names,
const String & projection_name = {},
const String & view_name = {});
void addQueryAccessInfo(const Names & partition_names);
/// Supported factories for records in query_log

View File

@ -45,4 +45,6 @@ public:
using SystemLog<FilesystemReadPrefetchesLogElement>::SystemLog;
};
using FilesystemReadPrefetchesLogPtr = std::shared_ptr<FilesystemReadPrefetchesLog>;
}

View File

@ -50,7 +50,16 @@ bool FillingRow::operator>=(const FillingRow & other) const
return !(*this < other);
}
bool FillingRow::next(const FillingRow & to_row)
bool FillingRow::isNull() const
{
for (const auto & field : row)
if (!field.isNull())
return false;
return true;
}
std::pair<bool, bool> FillingRow::next(const FillingRow & to_row)
{
const size_t row_size = size();
size_t pos = 0;
@ -61,22 +70,24 @@ bool FillingRow::next(const FillingRow & to_row)
break;
if (pos == row_size || less(to_row.row[pos], row[pos], getDirection(pos)))
return false;
return {false, false};
/// If we have any 'fill_to' value at position greater than 'pos',
/// we need to generate rows up to 'fill_to' value.
for (size_t i = row_size - 1; i > pos; --i)
{
if (getFillDescription(i).fill_to.isNull() || row[i].isNull())
auto & fill_column_desc = getFillDescription(i);
if (fill_column_desc.fill_to.isNull() || row[i].isNull())
continue;
auto next_value = row[i];
getFillDescription(i).step_func(next_value);
if (less(next_value, getFillDescription(i).fill_to, getDirection(i)))
Field next_value = row[i];
fill_column_desc.step_func(next_value);
if (less(next_value, fill_column_desc.fill_to, getDirection(i)))
{
row[i] = next_value;
initFromDefaults(i + 1);
return true;
return {true, true};
}
}
@ -84,14 +95,13 @@ bool FillingRow::next(const FillingRow & to_row)
getFillDescription(pos).step_func(next_value);
if (less(to_row.row[pos], next_value, getDirection(pos)) || equals(next_value, getFillDescription(pos).fill_to))
return false;
return {false, false};
row[pos] = next_value;
if (equals(row[pos], to_row.row[pos]))
{
bool is_less = false;
size_t i = pos + 1;
for (; i < row_size; ++i)
for (size_t i = pos + 1; i < row_size; ++i)
{
const auto & fill_from = getFillDescription(i).fill_from;
if (!fill_from.isNull())
@ -101,11 +111,11 @@ bool FillingRow::next(const FillingRow & to_row)
is_less |= less(row[i], to_row.row[i], getDirection(i));
}
return is_less;
return {is_less, true};
}
initFromDefaults(pos + 1);
return true;
return {true, true};
}
void FillingRow::initFromDefaults(size_t from_pos)

View File

@ -19,7 +19,10 @@ public:
explicit FillingRow(const SortDescription & sort_description);
/// Generates next row according to fill 'from', 'to' and 'step' values.
bool next(const FillingRow & to_row);
/// Return pair of boolean
/// apply - true if filling values should be inserted into result set
/// value_changed - true if filling row value was changed
std::pair<bool, bool> next(const FillingRow & to_row);
void initFromDefaults(size_t from_pos = 0);
@ -29,9 +32,11 @@ public:
bool operator<(const FillingRow & other) const;
bool operator==(const FillingRow & other) const;
bool operator>=(const FillingRow & other) const;
bool isNull() const;
int getDirection(size_t index) const { return sort_description[index].direction; }
FillColumnDescription & getFillDescription(size_t index) { return sort_description[index].fill_description; }
const FillColumnDescription & getFillDescription(size_t index) const { return sort_description[index].fill_description; }
String dump() const;

View File

@ -571,7 +571,13 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
size_t bucket_idx = current_bucket->idx;
hash_join = makeInMemoryJoin();
size_t prev_keys_num = 0;
// If there is only one bucket, don't take this check.
if (hash_join && buckets.size() > 1)
{
// Use previous hash_join's keys number to estimate next hash_join's size is reasonable.
prev_keys_num = hash_join->getTotalRowCount();
}
for (bucket_idx = bucket_idx + 1; bucket_idx < buckets.size(); ++bucket_idx)
{
@ -585,6 +591,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
continue;
}
hash_join = makeInMemoryJoin(prev_keys_num);
auto right_reader = current_bucket->startJoining();
size_t num_rows = 0; /// count rows that were written and rehashed
while (Block block = right_reader.read())
@ -604,9 +611,9 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
return nullptr;
}
GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin()
GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin(size_t reserve_num)
{
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row);
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row, reserve_num);
}
Block GraceHashJoin::prepareRightBlock(const Block & block)
@ -646,6 +653,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
if (!current_block.rows())
return;
}
auto prev_keys_num = hash_join->getTotalRowCount();
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
if (!hasMemoryOverflow(hash_join))
@ -654,7 +662,6 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = {};
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
hash_join = nullptr;
buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);
@ -674,7 +681,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = concatenateBlocks(current_blocks);
}
hash_join = makeInMemoryJoin();
hash_join = makeInMemoryJoin(prev_keys_num);
if (current_block.rows() > 0)
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);

View File

@ -90,7 +90,8 @@ public:
private:
void initBuckets();
/// Create empty join for in-memory processing.
InMemoryJoinPtr makeInMemoryJoin();
/// reserve_num for reserving space in hash table.
InMemoryJoinPtr makeInMemoryJoin(size_t reserve_num = 0);
/// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(Block block);

View File

@ -217,7 +217,7 @@ static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nulla
JoinCommon::removeColumnNullability(column);
}
HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_)
HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_, size_t reserve_num)
: table_join(table_join_)
, kind(table_join->kind())
, strictness(table_join->strictness())
@ -302,7 +302,7 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
}
for (auto & maps : data->maps)
dataMapInit(maps);
dataMapInit(maps, reserve_num);
}
HashJoin::Type HashJoin::chooseMethod(JoinKind kind, const ColumnRawPtrs & key_columns, Sizes & key_sizes)
@ -454,13 +454,15 @@ struct KeyGetterForType
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
};
void HashJoin::dataMapInit(MapsVariant & map)
void HashJoin::dataMapInit(MapsVariant & map, size_t reserve_num)
{
if (kind == JoinKind::Cross)
return;
joinDispatchInit(kind, strictness, map);
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { map_.create(data->type); });
if (reserve_num)
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { map_.reserve(data->type, reserve_num); });
}
bool HashJoin::empty() const

View File

@ -146,7 +146,7 @@ public:
class HashJoin : public IJoin
{
public:
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false, size_t reserve_num = 0);
~HashJoin() override;
@ -217,6 +217,16 @@ public:
M(keys256) \
M(hashed)
/// Only for maps using hash table.
#define APPLY_FOR_HASH_JOIN_VARIANTS(M) \
M(key32) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(hashed)
/// Used for reading from StorageJoin and applying joinGet function
#define APPLY_FOR_JOIN_VARIANTS_LIMITED(M) \
@ -266,6 +276,22 @@ public:
}
}
void reserve(Type which, size_t num)
{
switch (which)
{
case Type::EMPTY: break;
case Type::CROSS: break;
case Type::key8: break;
case Type::key16: break;
#define M(NAME) \
case Type::NAME: NAME->reserve(num); break;
APPLY_FOR_HASH_JOIN_VARIANTS(M)
#undef M
}
}
size_t getTotalRowCount(Type which) const
{
switch (which)
@ -409,7 +435,7 @@ private:
/// If set HashJoin instance is not available for modification (addJoinedBlock)
TableLockHolder storage_join_lock = nullptr;
void dataMapInit(MapsVariant &);
void dataMapInit(MapsVariant &, size_t);
void initRightBlockStructure(Block & saved_block_sample);

View File

@ -116,6 +116,7 @@ namespace ErrorCodes
extern const int ACCESS_DENIED;
extern const int UNKNOWN_IDENTIFIER;
extern const int BAD_ARGUMENTS;
extern const int SUPPORT_IS_DISABLED;
}
/// Assumes `storage` is set and the table filter (row-level security) is not empty.
@ -385,6 +386,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.ignore_projections = options.ignore_projections;
query_info.is_projection_query = options.is_projection_query;
query_info.is_internal = options.is_internal;
initSettings();
const Settings & settings = context->getSettingsRef();
@ -408,6 +410,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
ApplyWithSubqueryVisitor().visit(query_ptr);
}
query_info.query = query_ptr->clone();
query_info.original_query = query_ptr->clone();
if (settings.count_distinct_optimization)
@ -455,25 +458,35 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
if (joined_tables.tablesCount() > 1 && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
/// Check support for JOINs for parallel replicas
if (joined_tables.tablesCount() > 1 && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas > 0))
{
LOG_WARNING(log, "Joins are not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
context->setSetting("parallel_replicas_custom_key", String{""});
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_WARNING(log, "JOINs are not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("parallel_replicas_custom_key", String{""});
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOINs are not supported with parallel replicas");
}
}
/// Try to execute query without parallel replicas if we find that there is a FINAL modifier there.
bool is_query_with_final = false;
if (query_info.table_expression_modifiers)
is_query_with_final = query_info.table_expression_modifiers->hasFinal();
else if (query_info.query)
is_query_with_final = query_info.query->as<ASTSelectQuery &>().final();
if (is_query_with_final && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
/// Check support for FINAL for parallel replicas
bool is_query_with_final = isQueryWithFinal(query_info);
if (is_query_with_final && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas > 0))
{
LOG_WARNING(log, "FINAL modifier is supported with parallel replicas. Will try to execute the query without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
context->setSetting("parallel_replicas_custom_key", String{""});
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_WARNING(log, "FINAL modifier is not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("parallel_replicas_custom_key", String{""});
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FINAL modifier is not supported with parallel replicas");
}
}
/// Rewrite JOINs
@ -2994,20 +3007,27 @@ void InterpreterSelectQuery::executeWithFill(QueryPlan & query_plan)
auto & query = getSelectQuery();
if (query.orderBy())
{
SortDescription order_descr = getSortDescription(query, context);
SortDescription fill_descr;
for (auto & desc : order_descr)
SortDescription sort_description = getSortDescription(query, context);
SortDescription fill_description;
for (auto & desc : sort_description)
{
if (desc.with_fill)
fill_descr.push_back(desc);
fill_description.push_back(desc);
}
if (fill_descr.empty())
if (fill_description.empty())
return;
InterpolateDescriptionPtr interpolate_descr =
getInterpolateDescription(query, source_header, result_header, syntax_analyzer_result->aliases, context);
auto filling_step = std::make_unique<FillingStep>(query_plan.getCurrentDataStream(), std::move(fill_descr), interpolate_descr);
const Settings & settings = context->getSettingsRef();
auto filling_step = std::make_unique<FillingStep>(
query_plan.getCurrentDataStream(),
std::move(sort_description),
std::move(fill_description),
interpolate_descr,
settings.use_with_fill_by_sorting_prefix);
query_plan.addStep(std::move(filling_step));
}
}
@ -3126,4 +3146,14 @@ void InterpreterSelectQuery::initSettings()
}
}
bool InterpreterSelectQuery::isQueryWithFinal(const SelectQueryInfo & info)
{
bool result = info.query->as<ASTSelectQuery &>().final();
if (info.table_expression_modifiers)
result |= info.table_expression_modifiers->hasFinal();
return result;
}
}

View File

@ -131,6 +131,8 @@ public:
static SortDescription getSortDescription(const ASTSelectQuery & query, const ContextPtr & context);
static UInt64 getLimitForSorting(const ASTSelectQuery & query, const ContextPtr & context);
static bool isQueryWithFinal(const SelectQueryInfo & info);
private:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,

View File

@ -0,0 +1,121 @@
#include <Interpreters/OptimizeDateFilterVisitor.h>
#include <Common/DateLUT.h>
#include <Common/DateLUTImpl.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
namespace DB
{
ASTPtr generateOptimizedDateFilterAST(const String & comparator, const String & converter, const String & column, UInt64 year)
{
const DateLUTImpl & date_lut = DateLUT::instance();
if (converter != "toYear") return {};
String start_date = date_lut.dateToString(date_lut.makeDayNum(year, 1, 1));
String end_date = date_lut.dateToString(date_lut.makeDayNum(year, 12, 31));
if (comparator == "equals")
{
return makeASTFunction("and",
makeASTFunction("greaterOrEquals",
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(start_date)
),
makeASTFunction("lessOrEquals",
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(end_date)
)
);
}
else if (comparator == "notEquals")
{
return makeASTFunction("or",
makeASTFunction("less",
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(start_date)
),
makeASTFunction("greater",
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(end_date)
)
);
}
else if (comparator == "less" || comparator == "greaterOrEquals")
{
return makeASTFunction(comparator,
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(start_date)
);
}
else
{
return makeASTFunction(comparator,
std::make_shared<ASTIdentifier>(column),
std::make_shared<ASTLiteral>(end_date)
);
}
}
bool rewritePredicateInPlace(ASTFunction & function, ASTPtr & ast)
{
const static std::unordered_map<String, String> swap_relations = {
{"equals", "equals"},
{"notEquals", "notEquals"},
{"less", "greater"},
{"greater", "less"},
{"lessOrEquals", "greaterOrEquals"},
{"greaterOrEquals", "lessOrEquals"},
};
if (!swap_relations.contains(function.name)) return false;
if (!function.arguments || function.arguments->children.size() != 2) return false;
size_t func_id = function.arguments->children.size();
for (size_t i = 0; i < function.arguments->children.size(); i++)
{
if (const auto * func = function.arguments->children[i]->as<ASTFunction>(); func)
{
if (func->name == "toYear")
{
func_id = i;
}
}
}
if (func_id == function.arguments->children.size()) return false;
size_t literal_id = 1 - func_id;
const auto * literal = function.arguments->children[literal_id]->as<ASTLiteral>();
if (!literal || literal->value.getType() != Field::Types::UInt64) return false;
UInt64 compare_to = literal->value.get<UInt64>();
String comparator = literal_id > func_id ? function.name : swap_relations.at(function.name);
const auto * func = function.arguments->children[func_id]->as<ASTFunction>();
const auto * column_id = func->arguments->children.at(0)->as<ASTIdentifier>();
if (!column_id) return false;
String column = column_id->name();
const auto new_ast = generateOptimizedDateFilterAST(comparator, func->name, column, compare_to);
if (!new_ast) return false;
ast = new_ast;
return true;
}
void OptimizeDateFilterInPlaceData::visit(ASTFunction & function, ASTPtr & ast) const
{
rewritePredicateInPlace(function, ast);
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
class ASTFunction;
/// Rewrite the predicates in place
class OptimizeDateFilterInPlaceData
{
public:
using TypeToVisit = ASTFunction;
void visit(ASTFunction & function, ASTPtr & ast) const;
};
using OptimizeDateFilterInPlaceMatcher = OneTypeMatcher<OptimizeDateFilterInPlaceData>;
using OptimizeDateFilterInPlaceVisitor = InDepthNodeVisitor<OptimizeDateFilterInPlaceMatcher, true>;
}

View File

@ -29,6 +29,7 @@ NamesAndTypesList ProcessorProfileLogElement::getNamesAndTypes()
{"plan_step", std::make_shared<DataTypeUInt64>()},
{"plan_group", std::make_shared<DataTypeUInt64>()},
{"initial_query_id", std::make_shared<DataTypeString>()},
{"query_id", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"elapsed_us", std::make_shared<DataTypeUInt64>()},
@ -60,6 +61,7 @@ void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(plan_step);
columns[i++]->insert(plan_group);
columns[i++]->insertData(initial_query_id.data(), initial_query_id.size());
columns[i++]->insertData(query_id.data(), query_id.size());
columns[i++]->insertData(processor_name.data(), processor_name.size());
columns[i++]->insert(elapsed_us);

View File

@ -19,6 +19,7 @@ struct ProcessorProfileLogElement
UInt64 plan_step{};
UInt64 plan_group{};
String initial_query_id;
String query_id;
String processor_name;

View File

@ -70,6 +70,7 @@ NamesAndTypesList QueryLogElement::getNamesAndTypes()
{"databases", array_low_cardinality_string},
{"tables", array_low_cardinality_string},
{"columns", array_low_cardinality_string},
{"partitions", array_low_cardinality_string},
{"projections", array_low_cardinality_string},
{"views", array_low_cardinality_string},
{"exception_code", std::make_shared<DataTypeInt32>()},
@ -176,6 +177,7 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
auto & column_databases = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_tables = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_columns = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_partitions = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_projections = typeid_cast<ColumnArray &>(*columns[i++]);
auto & column_views = typeid_cast<ColumnArray &>(*columns[i++]);
@ -194,6 +196,7 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
fill_column(query_databases, column_databases);
fill_column(query_tables, column_tables);
fill_column(query_columns, column_columns);
fill_column(query_partitions, column_partitions);
fill_column(query_projections, column_projections);
fill_column(query_views, column_views);
}

View File

@ -65,6 +65,7 @@ struct QueryLogElement
std::set<String> query_databases;
std::set<String> query_tables;
std::set<String> query_columns;
std::set<String> query_partitions;
std::set<String> query_projections;
std::set<String> query_views;

View File

@ -25,6 +25,7 @@
#include <Interpreters/GatherFunctionQuantileVisitor.h>
#include <Interpreters/RewriteSumIfFunctionVisitor.h>
#include <Interpreters/RewriteArrayExistsFunctionVisitor.h>
#include <Interpreters/OptimizeDateFilterVisitor.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
@ -677,6 +678,21 @@ void optimizeInjectiveFunctionsInsideUniq(ASTPtr & query, ContextPtr context)
RemoveInjectiveFunctionsVisitor(data).visit(query);
}
void optimizeDateFilters(ASTSelectQuery * select_query)
{
/// Predicates in HAVING clause has been moved to WHERE clause.
if (select_query->where())
{
OptimizeDateFilterInPlaceVisitor::Data data;
OptimizeDateFilterInPlaceVisitor(data).visit(select_query->refWhere());
}
if (select_query->prewhere())
{
OptimizeDateFilterInPlaceVisitor::Data data;
OptimizeDateFilterInPlaceVisitor(data).visit(select_query->refPrewhere());
}
}
void transformIfStringsIntoEnum(ASTPtr & query)
{
std::unordered_set<String> function_names = {"if", "transform"};
@ -780,6 +796,9 @@ void TreeOptimizer::apply(ASTPtr & query, TreeRewriterResult & result,
tables_with_columns, result.storage_snapshot->metadata, result.storage);
}
/// Rewrite date filters to avoid the calls of converters such as toYear, toYYYYMM, toISOWeek, etc.
optimizeDateFilters(select_query);
/// GROUP BY injective function elimination.
optimizeGroupBy(select_query, context);

View File

@ -837,6 +837,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.query_databases = info.databases;
elem.query_tables = info.tables;
elem.query_columns = info.columns;
elem.query_partitions = info.partitions;
elem.query_projections = info.projections;
elem.query_views = info.views;
}
@ -901,6 +902,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
element.query_databases.insert(access_info.databases.begin(), access_info.databases.end());
element.query_tables.insert(access_info.tables.begin(), access_info.tables.end());
element.query_columns.insert(access_info.columns.begin(), access_info.columns.end());
element.query_partitions.insert(access_info.partitions.begin(), access_info.partitions.end());
element.query_projections.insert(access_info.projections.begin(), access_info.projections.end());
element.query_views.insert(access_info.views.begin(), access_info.views.end());
@ -1003,6 +1005,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = elem.event_time;
processor_elem.event_time_microseconds = elem.event_time_microseconds;
processor_elem.initial_query_id = elem.client_info.initial_query_id;
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64

View File

@ -113,7 +113,7 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
}
/// We don't want to execute reading for subqueries in parallel
subquery_context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
subquery_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
return std::make_shared<InterpreterSelectWithUnionQuery>(query, subquery_context, subquery_options, required_source_columns);
}

View File

@ -83,6 +83,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int TOO_DEEP_SUBQUERIES;
extern const int NOT_IMPLEMENTED;
extern const int SUPPORT_IS_DISABLED;
}
/** ClickHouse query planner.
@ -622,7 +623,14 @@ void addWithFillStepIfNeeded(QueryPlan & query_plan,
interpolate_description = std::make_shared<InterpolateDescription>(std::move(interpolate_actions_dag), empty_aliases);
}
auto filling_step = std::make_unique<FillingStep>(query_plan.getCurrentDataStream(), std::move(fill_description), interpolate_description);
const auto & query_context = planner_context->getQueryContext();
const Settings & settings = query_context->getSettingsRef();
auto filling_step = std::make_unique<FillingStep>(
query_plan.getCurrentDataStream(),
sort_description,
std::move(fill_description),
interpolate_description,
settings.use_with_fill_by_sorting_prefix);
query_plan.addStep(std::move(filling_step));
}
@ -1185,16 +1193,25 @@ void Planner::buildPlanForQueryNode()
const auto & settings = query_context->getSettingsRef();
if (planner_context->getTableExpressionNodeToData().size() > 1
&& (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
&& (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas > 0))
{
LOG_WARNING(
&Poco::Logger::get("Planner"), "Joins are not supported with parallel replicas. Query will be executed without using them.");
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_WARNING(
&Poco::Logger::get("Planner"), "JOINs are not supported with parallel replicas. Query will be executed without using them.");
auto & mutable_context = planner_context->getMutableQueryContext();
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
mutable_context->setSetting("parallel_replicas_custom_key", String{""});
auto & mutable_context = planner_context->getMutableQueryContext();
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
mutable_context->setSetting("parallel_replicas_custom_key", String{""});
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOINs are not supported with parallel replicas");
}
}
/// TODO: Also disable parallel replicas in case of FINAL
auto top_level_identifiers = collectTopLevelColumnIdentifiers(query_tree, planner_context);
auto join_tree_query_plan = buildJoinTreeQueryPlan(query_tree,
select_query_info,
@ -1432,7 +1449,8 @@ void Planner::buildPlanForQueryNode()
addLimitByStep(query_plan, limit_by_analysis_result, query_node);
}
addWithFillStepIfNeeded(query_plan, query_analysis_result, planner_context, query_node);
if (query_node.hasOrderBy())
addWithFillStepIfNeeded(query_plan, query_analysis_result, planner_context, query_node);
bool apply_offset = query_processing_info.getToStage() != QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;

View File

@ -27,9 +27,17 @@ static ITransformingStep::Traits getTraits()
};
}
FillingStep::FillingStep(const DataStream & input_stream_, SortDescription sort_description_, InterpolateDescriptionPtr interpolate_description_)
FillingStep::FillingStep(
const DataStream & input_stream_,
SortDescription sort_description_,
SortDescription fill_description_,
InterpolateDescriptionPtr interpolate_description_,
bool use_with_fill_by_sorting_prefix_)
: ITransformingStep(input_stream_, FillingTransform::transformHeader(input_stream_.header, sort_description_), getTraits())
, sort_description(std::move(sort_description_)), interpolate_description(interpolate_description_)
, sort_description(std::move(sort_description_))
, fill_description(std::move(fill_description_))
, interpolate_description(interpolate_description_)
, use_with_fill_by_sorting_prefix(use_with_fill_by_sorting_prefix_)
{
if (!input_stream_.has_single_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FillingStep expects single input");
@ -40,9 +48,10 @@ void FillingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return std::make_shared<FillingNoopTransform>(header, sort_description);
return std::make_shared<FillingNoopTransform>(header, fill_description);
return std::make_shared<FillingTransform>(header, sort_description, std::move(interpolate_description));
return std::make_shared<FillingTransform>(
header, sort_description, fill_description, std::move(interpolate_description), use_with_fill_by_sorting_prefix);
});
}

View File

@ -10,7 +10,12 @@ namespace DB
class FillingStep : public ITransformingStep
{
public:
FillingStep(const DataStream & input_stream_, SortDescription sort_description_, InterpolateDescriptionPtr interpolate_description_);
FillingStep(
const DataStream & input_stream_,
SortDescription sort_description_,
SortDescription fill_description_,
InterpolateDescriptionPtr interpolate_description_,
bool use_with_fill_by_sorting_prefix);
String getName() const override { return "Filling"; }
@ -25,7 +30,9 @@ private:
void updateOutputStream() override;
SortDescription sort_description;
SortDescription fill_description;
InterpolateDescriptionPtr interpolate_description;
const bool use_with_fill_by_sorting_prefix;
};
}

Some files were not shown because too many files have changed in this diff Show More