Merge remote-tracking branch 'origin/master' into parallel-replicas-remote

This commit is contained in:
Igor Nikonov 2023-12-21 18:43:18 +00:00
commit 751e582403
140 changed files with 1187 additions and 648 deletions

10
.gitmessage Normal file
View File

@ -0,0 +1,10 @@
## To avoid merge commit in CI run (add a leading space to apply):
#no-merge-commit
## Running specified job (add a leading space to apply):
#job_<JOB NAME>
#job_stateless_tests_release
#job_package_debug
#job_integration_tests_asan

2
contrib/azure vendored

@ -1 +1 @@
Subproject commit 352ff0a61cb319ac1cc38c4058443ddf70147530
Subproject commit a852d81f92f153e109de165ee08546741e3f2a68

View File

@ -12,26 +12,20 @@ file(GLOB AZURE_SDK_CORE_SRC
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/cryptography/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/http/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/http/curl/*.hpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/http/curl/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/winhttp/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/io/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/core/azure-core/src/private/*.hpp"
)
file(GLOB AZURE_SDK_IDENTITY_SRC
"${AZURE_SDK_LIBRARY_DIR}/identity/azure-identity/src/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/identity/azure-identity/src/private/*.hpp"
)
file(GLOB AZURE_SDK_STORAGE_COMMON_SRC
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-common/src/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-common/src/private/*.cpp"
)
file(GLOB AZURE_SDK_STORAGE_BLOBS_SRC
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-blobs/src/*.cpp"
"${AZURE_SDK_LIBRARY_DIR}/storage/azure-storage-blobs/src/private/*.hpp"
)
file(GLOB AZURE_SDK_UNIFIED_SRC

2
contrib/boringssl vendored

@ -1 +1 @@
Subproject commit 8061ac62d67953e61b793042e33baf1352e67510
Subproject commit aa6d2f865a2eab01cf94f197e11e36b6de47b5b4

View File

@ -34,7 +34,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.11.2.11"
ARG VERSION="23.11.3.23"
ARG PACKAGES="clickhouse-keeper"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.11.2.11"
ARG VERSION="23.11.3.23"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -30,7 +30,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.11.2.11"
ARG VERSION="23.11.3.23"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -216,11 +216,11 @@ export -f run_tests
if [ "$NUM_TRIES" -gt "1" ]; then
# We don't run tests with Ordinary database in PRs, only in master.
# So run new/changed tests with Ordinary at least once in flaky check.
timeout "$MAX_RUN_TIME" bash -c 'NUM_TRIES=1; USE_DATABASE_ORDINARY=1; run_tests' \
timeout_with_logging "$MAX_RUN_TIME" bash -c 'NUM_TRIES=1; USE_DATABASE_ORDINARY=1; run_tests' \
| sed 's/All tests have finished//' | sed 's/No tests were run//' ||:
fi
timeout "$MAX_RUN_TIME" bash -c run_tests ||:
timeout_with_logging "$MAX_RUN_TIME" bash -c run_tests ||:
echo "Files in current directory"
ls -la ./

View File

@ -35,4 +35,17 @@ function fn_exists() {
declare -F "$1" > /dev/null;
}
function timeout_with_logging() {
local exit_code=0
timeout "${@}" || exit_code="${?}"
if [[ "${exit_code}" -eq "124" ]]
then
echo "The command 'timeout ${*}' has been killed by timeout"
fi
return $exit_code
}
# vi: ft=bash

View File

@ -0,0 +1,26 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.11.3.23-stable (a14ab450b0e) FIXME as compared to v23.11.2.11-stable (6e5411358c8)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix invalid memory access in BLAKE3 (Rust) [#57876](https://github.com/ClickHouse/ClickHouse/pull/57876) ([Raúl Marín](https://github.com/Algunenano)).
* Normalize function names in CREATE INDEX [#57906](https://github.com/ClickHouse/ClickHouse/pull/57906) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Fix handling of unavailable replicas before first request happened [#57933](https://github.com/ClickHouse/ClickHouse/pull/57933) ([Nikita Taranov](https://github.com/nickitat)).
* Revert "Fix bug window functions: revert [#39631](https://github.com/ClickHouse/ClickHouse/issues/39631)" [#58031](https://github.com/ClickHouse/ClickHouse/pull/58031) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
#### NO CL CATEGORY
* Backported in [#57918](https://github.com/ClickHouse/ClickHouse/issues/57918):. [#57909](https://github.com/ClickHouse/ClickHouse/pull/57909) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Remove heavy rust stable toolchain [#57905](https://github.com/ClickHouse/ClickHouse/pull/57905) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Fix docker image for integration tests (fixes CI) [#57952](https://github.com/ClickHouse/ClickHouse/pull/57952) ([Azat Khuzhin](https://github.com/azat)).
* Always use `pread` for reading cache segments [#57970](https://github.com/ClickHouse/ClickHouse/pull/57970) ([Nikita Taranov](https://github.com/nickitat)).

View File

@ -212,5 +212,5 @@ ORDER BY key ASC
```
### More information on Joins
- [`join_algorithm` setting](/docs/en/operations/settings/settings.md#settings-join_algorithm)
- [`join_algorithm` setting](/docs/en/operations/settings/settings.md#join_algorithm)
- [JOIN clause](/docs/en/sql-reference/statements/select/join.md)

View File

@ -236,7 +236,7 @@ libhdfs3 support HDFS namenode HA.
## Storage Settings {#storage-settings}
- [hdfs_truncate_on_insert](/docs/en/operations/settings/settings.md#hdfs-truncate-on-insert) - allows to truncate file before insert into it. Disabled by default.
- [hdfs_truncate_on_insert](/docs/en/operations/settings/settings.md#hdfs_truncate_on_insert) - allows to truncate file before insert into it. Disabled by default.
- [hdfs_create_multiple_files](/docs/en/operations/settings/settings.md#hdfs_allow_create_multiple_files) - allows to create a new file on each insert if format has suffix. Disabled by default.
- [hdfs_skip_empty_files](/docs/en/operations/settings/settings.md#hdfs_skip_empty_files) - allows to skip empty files while reading. Disabled by default.

View File

@ -54,7 +54,7 @@ Optional parameters:
- `kafka_schema` — Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` — The number of consumers per table. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition, and must not be greater than the number of physical cores on the server where ClickHouse is deployed. Default: `1`.
- `kafka_max_block_size` — The maximum batch size (in messages) for poll. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `kafka_max_block_size` — The maximum batch size (in messages) for poll. Default: [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size).
- `kafka_skip_broken_messages` — Kafka message parser tolerance to schema-incompatible messages per block. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data). Default: `0`.
- `kafka_commit_every_batch` — Commit every consumed and handled batch instead of a single commit after writing a whole block. Default: `0`.
- `kafka_client_id` — Client identifier. Empty by default.
@ -151,7 +151,7 @@ Example:
SELECT level, sum(total) FROM daily GROUP BY level;
```
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#settings-max_insert_block_size). If the block wasnt formed within [stream_flush_interval_ms](../../../operations/settings/settings.md/#stream-flush-interval-ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block.
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size). If the block wasnt formed within [stream_flush_interval_ms](../../../operations/settings/settings.md/#stream-flush-interval-ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block.
To stop receiving topic data or to change the conversion logic, detach the materialized view:

View File

@ -58,7 +58,7 @@ Optional parameters:
- `nats_reconnect_wait` Amount of time in milliseconds to sleep between each reconnect attempt. Default: `5000`.
- `nats_server_list` - Server list for connection. Can be specified to connect to NATS cluster.
- `nats_skip_broken_messages` - NATS message parser tolerance to schema-incompatible messages per block. Default: `0`. If `nats_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `nats_max_block_size` - Number of row collected by poll(s) for flushing data from NATS. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `nats_max_block_size` - Number of row collected by poll(s) for flushing data from NATS. Default: [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size).
- `nats_flush_interval_ms` - Timeout for flushing data read from NATS. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `nats_username` - NATS username.
- `nats_password` - NATS password.

View File

@ -65,7 +65,7 @@ Optional parameters:
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
- `rabbitmq_skip_broken_messages` RabbitMQ message parser tolerance to schema-incompatible messages per block. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data). Default: `0`.
- `rabbitmq_max_block_size` - Number of row collected before flushing data from RabbitMQ. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `rabbitmq_max_block_size` - Number of row collected before flushing data from RabbitMQ. Default: [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size).
- `rabbitmq_flush_interval_ms` - Timeout for flushing data from RabbitMQ. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `rabbitmq_queue_settings_list` - allows to set RabbitMQ settings when creating a queue. Available settings: `x-max-length`, `x-max-length-bytes`, `x-message-ttl`, `x-expires`, `x-priority`, `x-max-priority`, `x-overflow`, `x-dead-letter-exchange`, `x-queue-type`. The `durable` setting is enabled automatically for the queue.
- `rabbitmq_address` - Address for connection. Use ether this setting or `rabbitmq_host_port`.

View File

@ -222,7 +222,7 @@ CREATE TABLE table_with_asterisk (name String, value UInt32)
## Storage Settings {#storage-settings}
- [s3_truncate_on_insert](/docs/en/operations/settings/settings.md#s3-truncate-on-insert) - allows to truncate file before insert into it. Disabled by default.
- [s3_truncate_on_insert](/docs/en/operations/settings/settings.md#s3_truncate_on_insert) - allows to truncate file before insert into it. Disabled by default.
- [s3_create_multiple_files](/docs/en/operations/settings/settings.md#s3_allow_create_multiple_files) - allows to create a new file on each insert if format has suffix. Disabled by default.
- [s3_skip_empty_files](/docs/en/operations/settings/settings.md#s3_skip_empty_files) - allows to skip empty files while reading. Disabled by default.

View File

@ -112,7 +112,7 @@ Specifying the `sharding_key` is necessary for the following:
For **Insert limit settings** (`..._insert`) see also:
- [distributed_foreground_insert](../../../operations/settings/settings.md#distributed_foreground_insert) setting
- [prefer_localhost_replica](../../../operations/settings/settings.md#settings-prefer-localhost-replica) setting
- [prefer_localhost_replica](../../../operations/settings/settings.md#prefer-localhost-replica) setting
- `bytes_to_throw_insert` handled before `bytes_to_delay_insert`, so you should not set it to the value less then `bytes_to_delay_insert`
:::
@ -198,7 +198,7 @@ The parameters `host`, `port`, and optionally `user`, `password`, `secure`, `com
- `secure` - Whether to use a secure SSL/TLS connection. Usually also requires specifying the port (the default secure port is `9440`). The server should listen on `<tcp_port_secure>9440</tcp_port_secure>` and be configured with correct certificates.
- `compression` - Use data compression. Default value: `true`.
When specifying replicas, one of the available replicas will be selected for each of the shards when reading. You can configure the algorithm for load balancing (the preference for which replica to access) see the [load_balancing](../../../operations/settings/settings.md#settings-load_balancing) setting. If the connection with the server is not established, there will be an attempt to connect with a short timeout. If the connection failed, the next replica will be selected, and so on for all the replicas. If the connection attempt failed for all the replicas, the attempt will be repeated the same way, several times. This works in favour of resiliency, but does not provide complete fault tolerance: a remote server might accept the connection, but might not work, or work poorly.
When specifying replicas, one of the available replicas will be selected for each of the shards when reading. You can configure the algorithm for load balancing (the preference for which replica to access) see the [load_balancing](../../../operations/settings/settings.md#load_balancing) setting. If the connection with the server is not established, there will be an attempt to connect with a short timeout. If the connection failed, the next replica will be selected, and so on for all the replicas. If the connection attempt failed for all the replicas, the attempt will be repeated the same way, several times. This works in favour of resiliency, but does not provide complete fault tolerance: a remote server might accept the connection, but might not work, or work poorly.
You can specify just one of the shards (in this case, query processing should be called remote, rather than distributed) or up to any number of shards. In each shard, you can specify from one to any number of replicas. You can specify a different number of replicas for each shard.
@ -243,7 +243,7 @@ If the server ceased to exist or had a rough restart (for example, due to a hard
When querying a `Distributed` table, `SELECT` queries are sent to all shards and work regardless of how data is distributed across the shards (they can be distributed completely randomly). When you add a new shard, you do not have to transfer old data into it. Instead, you can write new data to it by using a heavier weight the data will be distributed slightly unevenly, but queries will work correctly and efficiently.
When the `max_parallel_replicas` option is enabled, query processing is parallelized across all replicas within a single shard. For more information, see the section [max_parallel_replicas](../../../operations/settings/settings.md#settings-max_parallel_replicas).
When the `max_parallel_replicas` option is enabled, query processing is parallelized across all replicas within a single shard. For more information, see the section [max_parallel_replicas](../../../operations/settings/settings.md#max_parallel_replicas).
To learn more about how distributed `in` and `global in` queries are processed, refer to [this](../../../sql-reference/operators/in.md#select-distributed-subqueries) documentation.

View File

@ -101,8 +101,8 @@ For partitioning by month, use the `toYYYYMM(date_column)` expression, where `da
## Settings {#settings}
- [engine_file_empty_if_not_exists](/docs/en/operations/settings/settings.md#engine-file-emptyif-not-exists) - allows to select empty data from a file that doesn't exist. Disabled by default.
- [engine_file_empty_if_not_exists](/docs/en/operations/settings/settings.md#engine-file-empty_if-not-exists) - allows to select empty data from a file that doesn't exist. Disabled by default.
- [engine_file_truncate_on_insert](/docs/en/operations/settings/settings.md#engine-file-truncate-on-insert) - allows to truncate file before insert into it. Disabled by default.
- [engine_file_allow_create_multiple_files](/docs/en/operations/settings/settings.md#engine_file_allow_create_multiple_files) - allows to create a new file on each insert if format has suffix. Disabled by default.
- [engine_file_skip_empty_files](/docs/en/operations/settings/settings.md#engine_file_skip_empty_files) - allows to skip empty files while reading. Disabled by default.
- [storage_file_read_method](/docs/en/operations/settings/settings.md#engine-file-emptyif-not-exists) - method of reading data from storage file, one of: `read`, `pread`, `mmap`. The mmap method does not apply to clickhouse-server (it's intended for clickhouse-local). Default value: `pread` for clickhouse-server, `mmap` for clickhouse-local.
- [storage_file_read_method](/docs/en/operations/settings/settings.md#engine-file-empty_if-not-exists) - method of reading data from storage file, one of: `read`, `pread`, `mmap`. The mmap method does not apply to clickhouse-server (it's intended for clickhouse-local). Default value: `pread` for clickhouse-server, `mmap` for clickhouse-local.

View File

@ -41,7 +41,7 @@ Optional parameters:
- `poll_timeout_ms` - Timeout for single poll from log file. Default: [stream_poll_timeout_ms](../../../operations/settings/settings.md#stream_poll_timeout_ms).
- `poll_max_batch_size` — Maximum amount of records to be polled in a single poll. Default: [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
- `max_block_size` — The maximum batch size (in records) for poll. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `max_block_size` — The maximum batch size (in records) for poll. Default: [max_insert_block_size](../../../operations/settings/settings.md#max_insert_block_size).
- `max_threads` - Number of max threads to parse files, default is 0, which means the number will be max(1, physical_cpu_cores / 4).
- `poll_directory_watch_events_backoff_init` - The initial sleep value for watch directory thread. Default: `500`.
- `poll_directory_watch_events_backoff_max` - The max sleep value for watch directory thread. Default: `32000`.

View File

@ -167,7 +167,7 @@ For successful requests that do not return a data table, an empty response body
You can use compression to reduce network traffic when transmitting a large amount of data or for creating dumps that are immediately compressed.
You can use the internal ClickHouse compression format when transmitting data. The compressed data has a non-standard format, and you need `clickhouse-compressor` program to work with it. It is installed with the `clickhouse-client` package. To increase the efficiency of data insertion, you can disable server-side checksum verification by using the [http_native_compression_disable_checksumming_on_decompress](../operations/settings/settings.md#settings-http_native_compression_disable_checksumming_on_decompress) setting.
You can use the internal ClickHouse compression format when transmitting data. The compressed data has a non-standard format, and you need `clickhouse-compressor` program to work with it. It is installed with the `clickhouse-client` package. To increase the efficiency of data insertion, you can disable server-side checksum verification by using the [http_native_compression_disable_checksumming_on_decompress](../operations/settings/settings.md#http_native_compression_disable_checksumming_on_decompress) setting.
If you specify `compress=1` in the URL, the server will compress the data it sends to you. If you specify `decompress=1` in the URL, the server will decompress the data which you pass in the `POST` method.
@ -183,7 +183,7 @@ You can also choose to use [HTTP compression](https://en.wikipedia.org/wiki/HTTP
- `snappy`
To send a compressed `POST` request, append the request header `Content-Encoding: compression_method`.
In order for ClickHouse to compress the response, enable compression with [enable_http_compression](../operations/settings/settings.md#settings-enable_http_compression) setting and append `Accept-Encoding: compression_method` header to the request. You can configure the data compression level in the [http_zlib_compression_level](../operations/settings/settings.md#settings-http_zlib_compression_level) setting for all compression methods.
In order for ClickHouse to compress the response, enable compression with [enable_http_compression](../operations/settings/settings.md#enable_http_compression) setting and append `Accept-Encoding: compression_method` header to the request. You can configure the data compression level in the [http_zlib_compression_level](../operations/settings/settings.md#http_zlib_compression_level) setting for all compression methods.
:::info
Some HTTP clients might decompress data from the server by default (with `gzip` and `deflate`) and you might get decompressed data even if you use the compression settings correctly.
@ -285,7 +285,7 @@ For information about other parameters, see the section “SET”.
Similarly, you can use ClickHouse sessions in the HTTP protocol. To do this, you need to add the `session_id` GET parameter to the request. You can use any string as the session ID. By default, the session is terminated after 60 seconds of inactivity. To change this timeout, modify the `default_session_timeout` setting in the server configuration, or add the `session_timeout` GET parameter to the request. To check the session status, use the `session_check=1` parameter. Only one query at a time can be executed within a single session.
You can receive information about the progress of a query in `X-ClickHouse-Progress` response headers. To do this, enable [send_progress_in_http_headers](../operations/settings/settings.md#settings-send_progress_in_http_headers). Example of the header sequence:
You can receive information about the progress of a query in `X-ClickHouse-Progress` response headers. To do this, enable [send_progress_in_http_headers](../operations/settings/settings.md#send_progress_in_http_headers). Example of the header sequence:
``` text
X-ClickHouse-Progress: {"read_rows":"2752512","read_bytes":"240570816","total_rows_to_read":"8880128","elapsed_ns":"662334"}
@ -496,7 +496,7 @@ Next are the configuration methods for different `type`.
`query` value is a predefined query of `predefined_query_handler`, which is executed by ClickHouse when an HTTP request is matched and the result of the query is returned. It is a must configuration.
The following example defines the values of [max_threads](../operations/settings/settings.md#settings-max_threads) and `max_final_threads` settings, then queries the system table to check whether these settings were set successfully.
The following example defines the values of [max_threads](../operations/settings/settings.md#max_threads) and `max_final_threads` settings, then queries the system table to check whether these settings were set successfully.
:::note
To keep the default `handlers` such as` query`, `play`,` ping`, add the `<defaults/>` rule.
@ -539,7 +539,7 @@ In `dynamic_query_handler`, the query is written in the form of parameter of the
ClickHouse extracts and executes the value corresponding to the `query_param_name` value in the URL of the HTTP request. The default value of `query_param_name` is `/query` . It is an optional configuration. If there is no definition in the configuration file, the parameter is not passed in.
To experiment with this functionality, the example defines the values of [max_threads](../operations/settings/settings.md#settings-max_threads) and `max_final_threads` and `queries` whether the settings were set successfully.
To experiment with this functionality, the example defines the values of [max_threads](../operations/settings/settings.md#max_threads) and `max_final_threads` and `queries` whether the settings were set successfully.
Example:

View File

@ -64,4 +64,4 @@ You can configure ClickHouse to export metrics to [Prometheus](https://prometheu
Additionally, you can monitor server availability through the HTTP API. Send the `HTTP GET` request to `/ping`. If the server is available, it responds with `200 OK`.
To monitor servers in a cluster configuration, you should set the [max_replica_delay_for_distributed_queries](../operations/settings/settings.md#settings-max_replica_delay_for_distributed_queries) parameter and use the HTTP resource `/replicas_status`. A request to `/replicas_status` returns `200 OK` if the replica is available and is not delayed behind the other replicas. If a replica is delayed, it returns `503 HTTP_SERVICE_UNAVAILABLE` with information about the gap.
To monitor servers in a cluster configuration, you should set the [max_replica_delay_for_distributed_queries](../operations/settings/settings.md#max_replica_delay_for_distributed_queries) parameter and use the HTTP resource `/replicas_status`. A request to `/replicas_status` returns `200 OK` if the replica is available and is not delayed behind the other replicas. If a replica is delayed, it returns `503 HTTP_SERVICE_UNAVAILABLE` with information about the gap.

View File

@ -42,7 +42,7 @@ To analyze the `trace_log` system table:
- Install the `clickhouse-common-static-dbg` package. See [Install from DEB Packages](../../getting-started/install.md#install-from-deb-packages).
- Allow introspection functions by the [allow_introspection_functions](../../operations/settings/settings.md#settings-allow_introspection_functions) setting.
- Allow introspection functions by the [allow_introspection_functions](../../operations/settings/settings.md#allow_introspection_functions) setting.
For security reasons, introspection functions are disabled by default.

View File

@ -103,7 +103,7 @@ It is also possible to limit the cache usage of individual users using [settings
constraints](settings/constraints-on-settings.md). More specifically, you can restrict the maximum amount of memory (in bytes) a user may
allocate in the query cache and the maximum number of stored query results. For that, first provide configurations
[query_cache_max_size_in_bytes](settings/settings.md#query-cache-max-size-in-bytes) and
[query_cache_max_entries](settings/settings.md#query-cache-size-max-entries) in a user profile in `users.xml`, then make both settings
[query_cache_max_entries](settings/settings.md#query-cache-max-entries) in a user profile in `users.xml`, then make both settings
readonly:
``` xml
@ -144,7 +144,7 @@ value can be specified at session, profile or query level using setting [query_c
Entries in the query cache are compressed by default. This reduces the overall memory consumption at the cost of slower writes into / reads
from the query cache. To disable compression, use setting [query_cache_compress_entries](settings/settings.md#query-cache-compress-entries).
ClickHouse reads table data in blocks of [max_block_size](settings/settings.md#settings-max_block_size) rows. Due to filtering, aggregation,
ClickHouse reads table data in blocks of [max_block_size](settings/settings.md#setting-max_block_size) rows. Due to filtering, aggregation,
etc., result blocks are typically much smaller than 'max_block_size' but there are also cases where they are much bigger. Setting
[query_cache_squash_partial_results](settings/settings.md#query-cache-squash-partial-results) (enabled by default) controls if result blocks
are squashed (if they are tiny) or split (if they are large) into blocks of 'max_block_size' size before insertion into the query result

View File

@ -2009,7 +2009,7 @@ Data for the query cache is allocated in DRAM. If memory is scarce, make sure to
## query_thread_log {#query_thread_log}
Setting for logging threads of queries received with the [log_query_threads=1](../../operations/settings/settings.md#settings-log-query-threads) setting.
Setting for logging threads of queries received with the [log_query_threads=1](../../operations/settings/settings.md#log-query-threads) setting.
Queries are logged in the [system.query_thread_log](../../operations/system-tables/query_thread_log.md#system_tables-query_thread_log) table, not in a separate file. You can change the name of the table in the `table` parameter (see below).
@ -2051,7 +2051,7 @@ If the table does not exist, ClickHouse will create it. If the structure of the
## query_views_log {#query_views_log}
Setting for logging views (live, materialized etc) dependant of queries received with the [log_query_views=1](../../operations/settings/settings.md#settings-log-query-views) setting.
Setting for logging views (live, materialized etc) dependant of queries received with the [log_query_views=1](../../operations/settings/settings.md#log-query-views) setting.
Queries are logged in the [system.query_views_log](../../operations/system-tables/query_views_log.md#system_tables-query_views_log) table, not in a separate file. You can change the name of the table in the `table` parameter (see below).
@ -2331,7 +2331,7 @@ For the value of the `incl` attribute, see the section “[Configuration files](
**See Also**
- [skip_unavailable_shards](../../operations/settings/settings.md#settings-skip_unavailable_shards)
- [skip_unavailable_shards](../../operations/settings/settings.md#skip_unavailable_shards)
- [Cluster Discovery](../../operations/cluster-discovery.md)
- [Replicated database engine](../../engines/database-engines/replicated.md)

View File

@ -139,7 +139,7 @@ Limit on the number of bytes in the result. The same as the previous setting.
What to do if the volume of the result exceeds one of the limits: throw or break. By default, throw.
Using break is similar to using LIMIT. `Break` interrupts execution only at the block level. This means that amount of returned rows is greater than [max_result_rows](#setting-max_result_rows), multiple of [max_block_size](../../operations/settings/settings.md#setting-max_block_size) and depends on [max_threads](../../operations/settings/settings.md#settings-max_threads).
Using break is similar to using LIMIT. `Break` interrupts execution only at the block level. This means that amount of returned rows is greater than [max_result_rows](#setting-max_result_rows), multiple of [max_block_size](../../operations/settings/settings.md#setting-max_block_size) and depends on [max_threads](../../operations/settings/settings.md#max_threads).
Example:

View File

@ -1716,7 +1716,7 @@ Default value: `1`
## query_cache_squash_partial_results {#query-cache-squash-partial-results}
Squash partial result blocks to blocks of size [max_block_size](#setting-max_block_size). Reduces performance of inserts into the [query cache](../query-cache.md) but improves the compressability of cache entries (see [query_cache_compress-entries](#query_cache_compress_entries)).
Squash partial result blocks to blocks of size [max_block_size](#setting-max_block_size). Reduces performance of inserts into the [query cache](../query-cache.md) but improves the compressability of cache entries (see [query_cache_compress-entries](#query-cache-compress-entries)).
Possible values:
@ -2486,7 +2486,7 @@ See also:
- [load_balancing](#load_balancing-round_robin)
- [Table engine Distributed](../../engines/table-engines/special/distributed.md)
- [distributed_replica_error_cap](#distributed_replica_error_cap)
- [distributed_replica_error_half_life](#settings-distributed_replica_error_half_life)
- [distributed_replica_error_half_life](#distributed_replica_error_half_life)
## distributed_background_insert_sleep_time_ms {#distributed_background_insert_sleep_time_ms}
@ -4715,7 +4715,7 @@ Possible values:
Default value: `false`.
## rename_files_after_processing
## rename_files_after_processing {#rename_files_after_processing}
- **Type:** String

View File

@ -78,5 +78,5 @@ is_active: NULL
**See Also**
- [Table engine Distributed](../../engines/table-engines/special/distributed.md)
- [distributed_replica_error_cap setting](../../operations/settings/settings.md#settings-distributed_replica_error_cap)
- [distributed_replica_error_half_life setting](../../operations/settings/settings.md#settings-distributed_replica_error_half_life)
- [distributed_replica_error_cap setting](../../operations/settings/settings.md#distributed_replica_error_cap)
- [distributed_replica_error_half_life setting](../../operations/settings/settings.md#distributed_replica_error_half_life)

View File

@ -11,7 +11,7 @@ This table does not contain the ingested data for `INSERT` queries.
You can change settings of queries logging in the [query_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query-log) section of the server configuration.
You can disable queries logging by setting [log_queries = 0](../../operations/settings/settings.md#settings-log-queries). We do not recommend to turn off logging because information in this table is important for solving issues.
You can disable queries logging by setting [log_queries = 0](../../operations/settings/settings.md#log-queries). We do not recommend to turn off logging because information in this table is important for solving issues.
The flushing period of data is set in `flush_interval_milliseconds` parameter of the [query_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query-log) server settings section. To force flushing, use the [SYSTEM FLUSH LOGS](../../sql-reference/statements/system.md#query_language-system-flush_logs) query.
@ -30,7 +30,7 @@ Each query creates one or two rows in the `query_log` table, depending on the st
You can use the [log_queries_probability](../../operations/settings/settings.md#log-queries-probability) setting to reduce the number of queries, registered in the `query_log` table.
You can use the [log_formatted_queries](../../operations/settings/settings.md#settings-log-formatted-queries) setting to log formatted queries to the `formatted_query` column.
You can use the [log_formatted_queries](../../operations/settings/settings.md#log-formatted-queries) setting to log formatted queries to the `formatted_query` column.
Columns:
@ -101,7 +101,7 @@ Columns:
- `revision` ([UInt32](../../sql-reference/data-types/int-uint.md)) — ClickHouse revision.
- `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/map.md)) — ProfileEvents that measure different metrics. The description of them could be found in the table [system.events](../../operations/system-tables/events.md#system_tables-events)
- `Settings` ([Map(String, String)](../../sql-reference/data-types/map.md)) — Settings that were changed when the client ran the query. To enable logging changes to settings, set the `log_query_settings` parameter to 1.
- `log_comment` ([String](../../sql-reference/data-types/string.md)) — Log comment. It can be set to arbitrary string no longer than [max_query_size](../../operations/settings/settings.md#settings-max_query_size). An empty string if it is not defined.
- `log_comment` ([String](../../sql-reference/data-types/string.md)) — Log comment. It can be set to arbitrary string no longer than [max_query_size](../../operations/settings/settings.md#max_query_size). An empty string if it is not defined.
- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Thread ids that are participating in query execution. These threads may not have run simultaneously.
- `peak_threads_usage` ([UInt64)](../../sql-reference/data-types/int-uint.md)) — Maximum count of simultaneous threads executing the query.
- `used_aggregate_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions`, which were used during query execution.

View File

@ -8,7 +8,7 @@ Contains information about threads that execute queries, for example, thread nam
To start logging:
1. Configure parameters in the [query_thread_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log) section.
2. Set [log_query_threads](../../operations/settings/settings.md#settings-log-query-threads) to 1.
2. Set [log_query_threads](../../operations/settings/settings.md#log-query-threads) to 1.
The flushing period of data is set in `flush_interval_milliseconds` parameter of the [query_thread_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log) server settings section. To force flushing, use the [SYSTEM FLUSH LOGS](../../sql-reference/statements/system.md#query_language-system-flush_logs) query.

View File

@ -8,7 +8,7 @@ Contains information about the dependent views executed when running a query, fo
To start logging:
1. Configure parameters in the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) section.
2. Set [log_query_views](../../operations/settings/settings.md#settings-log-query-views) to 1.
2. Set [log_query_views](../../operations/settings/settings.md#log-query-views) to 1.
The flushing period of data is set in `flush_interval_milliseconds` parameter of the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) server settings section. To force flushing, use the [SYSTEM FLUSH LOGS](../../sql-reference/statements/system.md#query_language-system-flush_logs) query.

View File

@ -14,7 +14,7 @@ This table contains the following columns (the column type is shown in brackets)
- `supports_sort_order` (UInt8) — Flag that indicates if table engine supports clauses `PARTITION_BY`, `PRIMARY_KEY`, `ORDER_BY` and `SAMPLE_BY`.
- `supports_replication` (UInt8) — Flag that indicates if table engine supports [data replication](../../engines/table-engines/mergetree-family/replication.md).
- `supports_duduplication` (UInt8) — Flag that indicates if table engine supports data deduplication.
- `supports_parallel_insert` (UInt8) — Flag that indicates if table engine supports parallel insert (see [`max_insert_threads`](../../operations/settings/settings.md#settings-max-insert-threads) setting).
- `supports_parallel_insert` (UInt8) — Flag that indicates if table engine supports parallel insert (see [`max_insert_threads`](../../operations/settings/settings.md#max-insert-threads) setting).
Example:

View File

@ -28,7 +28,7 @@ In both cases the type of the returned value is [UInt64](../../../sql-reference/
**Details**
ClickHouse supports the `COUNT(DISTINCT ...)` syntax. The behavior of this construction depends on the [count_distinct_implementation](../../../operations/settings/settings.md#settings-count_distinct_implementation) setting. It defines which of the [uniq\*](../../../sql-reference/aggregate-functions/reference/uniq.md#agg_function-uniq) functions is used to perform the operation. The default is the [uniqExact](../../../sql-reference/aggregate-functions/reference/uniqexact.md#agg_function-uniqexact) function.
ClickHouse supports the `COUNT(DISTINCT ...)` syntax. The behavior of this construction depends on the [count_distinct_implementation](../../../operations/settings/settings.md#count_distinct_implementation) setting. It defines which of the [uniq\*](../../../sql-reference/aggregate-functions/reference/uniq.md#agg_function-uniq) functions is used to perform the operation. The default is the [uniqExact](../../../sql-reference/aggregate-functions/reference/uniqexact.md#agg_function-uniqexact) function.
The `SELECT count() FROM table` query is optimized by default using metadata from MergeTree. If you need to use row-level security, disable optimization using the [optimize_trivial_count_query](../../../operations/settings/settings.md#optimize-trivial-count-query) setting.

View File

@ -143,7 +143,7 @@ range([start, ] end [, step])
**Implementation details**
- All arguments `start`, `end`, `step` must be below data types: `UInt8`, `UInt16`, `UInt32`, `UInt64`,`Int8`, `Int16`, `Int32`, `Int64`, as well as elements of the returned array, which's type is a super type of all arguments.
- An exception is thrown if query results in arrays with a total length of more than number of elements specified by the [function_range_max_elements_in_block](../../operations/settings/settings.md#settings-function_range_max_elements_in_block) setting.
- An exception is thrown if query results in arrays with a total length of more than number of elements specified by the [function_range_max_elements_in_block](../../operations/settings/settings.md#function_range_max_elements_in_block) setting.
- Returns Null if any argument has Nullable(Nothing) type. An exception is thrown if any argument has Null value (Nullable(T) type).
**Examples**

View File

@ -16,7 +16,7 @@ For proper operation of introspection functions:
- Install the `clickhouse-common-static-dbg` package.
- Set the [allow_introspection_functions](../../operations/settings/settings.md#settings-allow_introspection_functions) setting to 1.
- Set the [allow_introspection_functions](../../operations/settings/settings.md#allow_introspection_functions) setting to 1.
For security reasons introspection functions are disabled by default.

View File

@ -0,0 +1,22 @@
---
slug: /en/sql-reference/statements/alter/apply-deleted-mask
sidebar_position: 46
sidebar_label: APPLY DELETED MASK
---
# Apply mask of deleted rows
``` sql
ALTER TABLE [db].name [ON CLUSTER cluster] APPLY DELETED MASK [IN PARTITION partition_id]
```
The command applies mask created by [lightweight delete](/docs/en/sql-reference/statements/delete) and forcefully removes rows marked as deleted from disk. This command is a heavyweight mutation and it semantically equals to query ```ALTER TABLE [db].name DELETE WHERE _row_exists = 0```.
:::note
It only works for tables in the [`MergeTree`](../../../engines/table-engines/mergetree-family/mergetree.md) family (including [replicated](../../../engines/table-engines/mergetree-family/replication.md) tables).
:::
**See also**
- [Lightweight deletes](/docs/en/sql-reference/statements/delete)
- [Heavyweight deletes](/docs/en/sql-reference/statements/alter/delete.md)

View File

@ -17,8 +17,9 @@ Most `ALTER TABLE` queries modify table settings or data:
- [CONSTRAINT](/docs/en/sql-reference/statements/alter/constraint.md)
- [TTL](/docs/en/sql-reference/statements/alter/ttl.md)
- [STATISTIC](/docs/en/sql-reference/statements/alter/statistic.md)
- [APPLY DELETED MASK](/docs/en/sql-reference/statements/alter/apply-deleted-mask.md)
:::note
:::note
Most `ALTER TABLE` queries are supported only for [\*MergeTree](/docs/en/engines/table-engines/mergetree-family/index.md) tables, as well as [Merge](/docs/en/engines/table-engines/special/merge.md) and [Distributed](/docs/en/engines/table-engines/special/distributed.md).
:::
@ -59,7 +60,7 @@ For all `ALTER` queries, you can use the [alter_sync](/docs/en/operations/settin
You can specify how long (in seconds) to wait for inactive replicas to execute all `ALTER` queries with the [replication_wait_for_inactive_replica_timeout](/docs/en/operations/settings/settings.md/#replication-wait-for-inactive-replica-timeout) setting.
:::note
:::note
For all `ALTER` queries, if `alter_sync = 2` and some replicas are not active for more than the time, specified in the `replication_wait_for_inactive_replica_timeout` setting, then an exception `UNFINISHED` is thrown.
:::

View File

@ -45,20 +45,20 @@ Additional join types available in ClickHouse:
- `ASOF JOIN` and `LEFT ASOF JOIN`, joining sequences with a non-exact match. `ASOF JOIN` usage is described below.
:::note
When [join_algorithm](../../../operations/settings/settings.md#settings-join_algorithm) is set to `partial_merge`, `RIGHT JOIN` and `FULL JOIN` are supported only with `ALL` strictness (`SEMI`, `ANTI`, `ANY`, and `ASOF` are not supported).
When [join_algorithm](../../../operations/settings/settings.md#join_algorithm) is set to `partial_merge`, `RIGHT JOIN` and `FULL JOIN` are supported only with `ALL` strictness (`SEMI`, `ANTI`, `ANY`, and `ASOF` are not supported).
:::
## Settings
The default join type can be overridden using [join_default_strictness](../../../operations/settings/settings.md#settings-join_default_strictness) setting.
The default join type can be overridden using [join_default_strictness](../../../operations/settings/settings.md#join_default_strictness) setting.
The behavior of ClickHouse server for `ANY JOIN` operations depends on the [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys) setting.
**See also**
- [join_algorithm](../../../operations/settings/settings.md#settings-join_algorithm)
- [join_any_take_last_row](../../../operations/settings/settings.md#settings-join_any_take_last_row)
- [join_algorithm](../../../operations/settings/settings.md#join_algorithm)
- [join_any_take_last_row](../../../operations/settings/settings.md#join_any_take_last_row)
- [join_use_nulls](../../../operations/settings/settings.md#join_use_nulls)
- [partial_merge_join_optimizations](../../../operations/settings/settings.md#partial_merge_join_optimizations)
- [partial_merge_join_rows_in_right_blocks](../../../operations/settings/settings.md#partial_merge_join_rows_in_right_blocks)
@ -352,7 +352,7 @@ If you need a `JOIN` for joining with dimension tables (these are relatively sma
### Memory Limitations
By default, ClickHouse uses the [hash join](https://en.wikipedia.org/wiki/Hash_join) algorithm. ClickHouse takes the right_table and creates a hash table for it in RAM. If `join_algorithm = 'auto'` is enabled, then after some threshold of memory consumption, ClickHouse falls back to [merge](https://en.wikipedia.org/wiki/Sort-merge_join) join algorithm. For `JOIN` algorithms description see the [join_algorithm](../../../operations/settings/settings.md#settings-join_algorithm) setting.
By default, ClickHouse uses the [hash join](https://en.wikipedia.org/wiki/Hash_join) algorithm. ClickHouse takes the right_table and creates a hash table for it in RAM. If `join_algorithm = 'auto'` is enabled, then after some threshold of memory consumption, ClickHouse falls back to [merge](https://en.wikipedia.org/wiki/Sort-merge_join) join algorithm. For `JOIN` algorithms description see the [join_algorithm](../../../operations/settings/settings.md#join_algorithm) setting.
If you need to restrict `JOIN` operation memory consumption use the following settings:

View File

@ -16,7 +16,7 @@ INSERT INTO t VALUES (1, 'Hello, world'), (2, 'abc'), (3, 'def')
The `INSERT INTO t VALUES` fragment is parsed by the full parser, and the data `(1, 'Hello, world'), (2, 'abc'), (3, 'def')` is parsed by the fast stream parser. You can also turn on the full parser for the data by using the [input_format_values_interpret_expressions](../operations/settings/settings-formats.md#input_format_values_interpret_expressions) setting. When `input_format_values_interpret_expressions = 1`, ClickHouse first tries to parse values with the fast stream parser. If it fails, ClickHouse tries to use the full parser for the data, treating it like an SQL [expression](#expressions).
Data can have any format. When a query is received, the server calculates no more than [max_query_size](../operations/settings/settings.md#settings-max_query_size) bytes of the request in RAM (by default, 1 MB), and the rest is stream parsed.
Data can have any format. When a query is received, the server calculates no more than [max_query_size](../operations/settings/settings.md#max_query_size) bytes of the request in RAM (by default, 1 MB), and the rest is stream parsed.
It allows for avoiding issues with large `INSERT` queries.
When using the `Values` format in an `INSERT` query, it may seem that data is parsed the same as expressions in a `SELECT` query, but this is not true. The `Values` format is much more limited.

View File

@ -55,5 +55,5 @@ Connection settings like `host`, `port`, `user`, `password`, `compression`, `sec
**See Also**
- [skip_unavailable_shards](../../operations/settings/settings.md#settings-skip_unavailable_shards)
- [load_balancing](../../operations/settings/settings.md#settings-load_balancing)
- [skip_unavailable_shards](../../operations/settings/settings.md#skip_unavailable_shards)
- [load_balancing](../../operations/settings/settings.md#load_balancing)

View File

@ -199,11 +199,11 @@ SELECT count(*) FROM file('big_dir/**/file002', 'CSV', 'name String, value UInt3
## Settings {#settings}
- [engine_file_empty_if_not_exists](/docs/en/operations/settings/settings.md#engine-file-emptyif-not-exists) - allows to select empty data from a file that doesn't exist. Disabled by default.
- [engine_file_empty_if_not_exists](/docs/en/operations/settings/settings.md#engine-file-empty_if-not-exists) - allows to select empty data from a file that doesn't exist. Disabled by default.
- [engine_file_truncate_on_insert](/docs/en/operations/settings/settings.md#engine-file-truncate-on-insert) - allows to truncate file before insert into it. Disabled by default.
- [engine_file_allow_create_multiple_files](/docs/en/operations/settings/settings.md#engine_file_allow_create_multiple_files) - allows to create a new file on each insert if format has suffix. Disabled by default.
- [engine_file_skip_empty_files](/docs/en/operations/settings/settings.md#engine_file_skip_empty_files) - allows to skip empty files while reading. Disabled by default.
- [storage_file_read_method](/docs/en/operations/settings/settings.md#engine-file-emptyif-not-exists) - method of reading data from storage file, one of: read, pread, mmap (only for clickhouse-local). Default value: `pread` for clickhouse-server, `mmap` for clickhouse-local.
- [storage_file_read_method](/docs/en/operations/settings/settings.md#engine-file-empty_if-not-exists) - method of reading data from storage file, one of: read, pread, mmap (only for clickhouse-local). Default value: `pread` for clickhouse-server, `mmap` for clickhouse-local.
**See Also**

View File

@ -100,7 +100,7 @@ FROM hdfs('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name Strin
## Storage Settings {#storage-settings}
- [hdfs_truncate_on_insert](/docs/en/operations/settings/settings.md#hdfs-truncate-on-insert) - allows to truncate file before insert into it. Disabled by default.
- [hdfs_truncate_on_insert](/docs/en/operations/settings/settings.md#hdfs_truncate_on_insert) - allows to truncate file before insert into it. Disabled by default.
- [hdfs_create_multiple_files](/docs/en/operations/settings/settings.md#hdfs_allow_create_multiple_files) - allows to create a new file on each insert if format has suffix. Disabled by default.
- [hdfs_skip_empty_files](/docs/en/operations/settings/settings.md#hdfs_skip_empty_files) - allows to skip empty files while reading. Disabled by default.
- [ignore_access_denied_multidirectory_globs](/docs/en/operations/settings/settings.md#ignore_access_denied_multidirectory_globs) - allows to ignore permission denied errors for multi-directory globs.

View File

@ -165,5 +165,5 @@ The following pattern types are supported.
- `{0n..0m}` - A range of numbers with leading zeroes. This pattern preserves leading zeroes in indices. For instance, `example{01..03}-1` generates `example01-1`, `example02-1` and `example03-1`.
- `{a|b}` - Any number of variants separated by a `|`. The pattern specifies replicas. For instance, `example01-{1|2}` generates replicas `example01-1` and `example01-2`.
The query will be sent to the first healthy replica. However, for `remote` the replicas are iterated in the order currently set in the [load_balancing](../../operations/settings/settings.md#settings-load_balancing) setting.
The query will be sent to the first healthy replica. However, for `remote` the replicas are iterated in the order currently set in the [load_balancing](../../operations/settings/settings.md#load_balancing) setting.
The number of generated addresses is limited by [table_function_remote_max_addresses](../../operations/settings/settings.md#table_function_remote_max_addresses) setting.

View File

@ -16,7 +16,7 @@ When using the `s3 table function` with [`INSERT INTO...SELECT`](../../sql-refer
**Syntax**
``` sql
s3(path [, NOSIGN | aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
s3(path [, NOSIGN | aws_access_key_id, aws_secret_access_key [,session_token]] [,format] [,structure] [,compression])
```
:::tip GCS
@ -38,6 +38,8 @@ For GCS, substitute your HMAC key and HMAC secret where you see `aws_access_key_
:::
- `NOSIGN` - If this keyword is provided in place of credentials, all the requests will not be signed.
- `access_key_id`, `secret_access_key` — Keys that specify credentials to use with given endpoint. Optional.
- `session_token` - Session token to use with the given keys. Optional when passing keys.
- `format` — The [format](../../interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
- `compression` — Parameter is optional. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. By default, it will autodetect compression by file extension.
@ -236,7 +238,7 @@ LIMIT 5;
## Storage Settings {#storage-settings}
- [s3_truncate_on_insert](/docs/en/operations/settings/settings.md#s3-truncate-on-insert) - allows to truncate file before insert into it. Disabled by default.
- [s3_truncate_on_insert](/docs/en/operations/settings/settings.md#s3_truncate_on_insert) - allows to truncate file before insert into it. Disabled by default.
- [s3_create_multiple_files](/docs/en/operations/settings/settings.md#s3_allow_create_multiple_files) - allows to create a new file on each insert if format has suffix. Disabled by default.
- [s3_skip_empty_files](/docs/en/operations/settings/settings.md#s3_skip_empty_files) - allows to skip empty files while reading. Disabled by default.

View File

@ -10,14 +10,15 @@ Allows processing files from [Amazon S3](https://aws.amazon.com/s3/) and Google
**Syntax**
``` sql
s3Cluster(cluster_name, source, [,access_key_id, secret_access_key] [,format] [,structure])
s3Cluster(cluster_name, source, [,access_key_id, secret_access_key, [session_token]] [,format] [,structure])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- `source` — URL to a file or a bunch of files. Supports following wildcards in readonly mode: `*`, `**`, `?`, `{'abc','def'}` and `{N..M}` where `N`, `M` — numbers, `abc`, `def` — strings. For more information see [Wildcards In Path](../../engines/table-engines/integrations/s3.md#wildcards-in-path).
- `access_key_id` and `secret_access_key` — Keys that specify credentials to use with given endpoint. Optional.
- `access_key_id`, `secret_access_key` — Keys that specify credentials to use with given endpoint. Optional.
- `session_token` - Session token to use with the given keys. Optional when passing keys.
- `format` — The [format](../../interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.

View File

@ -11,7 +11,7 @@ sidebar_label: s3
**Синтаксис**
``` sql
s3(path [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
s3(path [,access_key_id, secret_access_key [,session_token]] [,format] [,structure] [,compression])
```
**Aргументы**

View File

@ -11,14 +11,14 @@ sidebar_label: s3Cluster
**Синтаксис**
``` sql
s3Cluster(cluster_name, source, [,access_key_id, secret_access_key] [,format] [,structure])
s3Cluster(cluster_name, source, [,access_key_id, secret_access_key [,session_token]] [,format] [,structure])
```
**Аргументы**
- `cluster_name` — имя кластера, используемое для создания набора адресов и параметров подключения к удаленным и локальным серверам.
- `source` — URL файла или нескольких файлов. Поддерживает следующие символы подстановки: `*`, `?`, `{'abc','def'}` и `{N..M}`, где `N`, `M` — числа, `abc`, `def` — строки. Подробнее смотрите в разделе [Символы подстановки](../../engines/table-engines/integrations/s3.md#wildcards-in-path).
- `access_key_id` и `secret_access_key` — ключи, указывающие на учетные данные для использования с точкой приема запроса. Необязательные параметры.
- `access_key_id`, `secret_access_key` и `session_token` — ключи, указывающие на учетные данные для использования с точкой приема запроса. Необязательные параметры.
- `format` — [формат](../../interfaces/formats.md#formats) файла.
- `structure` — структура таблицы. Формат `'column1_name column1_type, column2_name column2_type, ...'`.

View File

@ -11,7 +11,7 @@ sidebar_label: s3
**语法**
``` sql
s3(path, [aws_access_key_id, aws_secret_access_key,] format, structure, [compression])
s3(path [,access_key_id, secret_access_key [,session_token]] ,format, structure, [compression])
```
**参数**

View File

@ -157,11 +157,16 @@ BackupImpl::~BackupImpl()
void BackupImpl::open()
{
std::lock_guard lock{mutex};
LOG_INFO(log, "{} backup: {}", ((open_mode == OpenMode::WRITE) ? "Writing" : "Reading"), backup_name_for_logging);
ProfileEvents::increment((open_mode == OpenMode::WRITE) ? ProfileEvents::BackupsOpenedForWrite : ProfileEvents::BackupsOpenedForRead);
if (open_mode == OpenMode::WRITE)
if (open_mode == OpenMode::READ)
{
ProfileEvents::increment(ProfileEvents::BackupsOpenedForRead);
LOG_INFO(log, "Reading backup: {}", backup_name_for_logging);
}
else
{
ProfileEvents::increment(ProfileEvents::BackupsOpenedForWrite);
LOG_INFO(log, "Writing backup: {}", backup_name_for_logging);
timestamp = std::time(nullptr);
if (!uuid)
uuid = UUIDHelpers::generateV4();

View File

@ -78,13 +78,16 @@ BackupInfo BackupInfo::fromAST(const IAST & ast)
}
}
res.args.reserve(list->children.size() - index);
for (; index < list->children.size(); ++index)
size_t args_size = list->children.size();
res.args.reserve(args_size - index);
for (; index < args_size; ++index)
{
const auto & elem = list->children[index];
const auto * lit = elem->as<const ASTLiteral>();
if (!lit)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected literal, got {}", serializeAST(*elem));
}
res.args.push_back(lit->value);
}
}

View File

@ -43,14 +43,6 @@ namespace Stage = BackupCoordinationStage;
namespace
{
/// Uppercases the first character of a passed string.
String toUpperFirst(const String & str)
{
String res = str;
res[0] = std::toupper(res[0]);
return res;
}
/// Outputs "table <name>" or "temporary table <name>"
String tableNameWithTypeToString(const String & database_name, const String & table_name, bool first_upper)
{
@ -145,7 +137,7 @@ RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
void RestorerFromBackup::setStage(const String & new_stage, const String & message)
{
LOG_TRACE(log, fmt::runtime(toUpperFirst(new_stage)));
LOG_TRACE(log, "Setting stage: {}", new_stage);
current_stage = new_stage;
if (restore_coordination)

View File

@ -18,16 +18,37 @@ template <typename T>
static inline String formatQuoted(T x)
{
WriteBufferFromOwnString wb;
writeQuoted(x, wb);
return wb.str();
}
template <typename T>
static inline void writeQuoted(const DecimalField<T> & x, WriteBuffer & buf)
{
writeChar('\'', buf);
writeText(x.getValue(), x.getScale(), buf, {});
writeChar('\'', buf);
if constexpr (is_decimal_field<T>)
{
writeChar('\'', wb);
writeText(x.getValue(), x.getScale(), wb, {});
writeChar('\'', wb);
}
else if constexpr (is_big_int_v<T>)
{
writeChar('\'', wb);
writeText(x, wb);
writeChar('\'', wb);
}
else
{
/// While `writeQuoted` sounds like it will always write the value in quotes,
/// in fact it means: write according to the rules of the quoted format, like VALUES,
/// where strings, dates, date-times, UUID are in quotes, and numbers are not.
/// That's why we take extra care to put Decimal and big integers inside quotes
/// when formatting literals in SQL language,
/// because it is different from the quoted formats like VALUES.
/// In fact, there are no Decimal and big integer literals in SQL,
/// but they can appear if we format the query from a modified AST.
/// We can fix this idiosyncrasy later.
writeQuoted(x, wb);
}
return wb.str();
}
/** In contrast to writeFloatText (and writeQuoted),

View File

@ -290,6 +290,11 @@ bool ZooKeeperWithFaultInjection::exists(const std::string & path, Coordination:
return executeWithFaultSync(__func__, path, [&]() { return keeper->exists(path, stat, watch); });
}
bool ZooKeeperWithFaultInjection::anyExists(const std::vector<std::string> & paths)
{
return executeWithFaultSync(__func__, !paths.empty() ? paths.front() : "", [&]() { return keeper->anyExists(paths); });
}
zkutil::ZooKeeper::MultiExistsResponse ZooKeeperWithFaultInjection::exists(const std::vector<std::string> & paths)
{
return executeWithFaultSync(__func__, !paths.empty() ? paths.front() : "", [&]() { return keeper->exists(paths); });

View File

@ -59,6 +59,7 @@ private:
class ZooKeeperWithFaultInjection
{
zkutil::ZooKeeper::Ptr keeper;
std::unique_ptr<RandomFaultInjection> fault_policy;
std::string name;
Poco::Logger * logger = nullptr;
@ -203,6 +204,8 @@ public:
zkutil::ZooKeeper::MultiExistsResponse exists(const std::vector<std::string> & paths);
bool anyExists(const std::vector<std::string> & paths);
std::string create(const std::string & path, const std::string & data, int32_t mode);
Coordination::Error tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);

View File

@ -859,6 +859,10 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
initial_batch_committed = true;
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::PreAppendLogLeader:
{
return nuraft::cb_func::ReturnCode::ReturnNull;
}
case nuraft::cb_func::PreAppendLogFollower:
{
const auto & entry = *static_cast<LogEntryPtr *>(param->ctx);

View File

@ -76,7 +76,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
LOG_INFO(log, "S3 configuration was updated");
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key, auth_settings.session_token);
auto headers = auth_settings.headers;
static constexpr size_t s3_max_redirects = 10;

View File

@ -1204,7 +1204,7 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
}
void DatabaseReplicated::dropReplica(
DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica)
DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, bool throw_if_noop)
{
assert(!database || database_zookeeper_path == database->zookeeper_path);
@ -1215,14 +1215,21 @@ void DatabaseReplicated::dropReplica(
auto zookeeper = Context::getGlobalContextInstance()->getZooKeeper();
String database_mark = zookeeper->get(database_zookeeper_path);
String database_mark;
bool db_path_exists = zookeeper->tryGet(database_zookeeper_path, database_mark);
if (!db_path_exists && !throw_if_noop)
return;
if (database_mark != REPLICATED_DATABASE_MARK)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path {} does not look like a path of Replicated database", database_zookeeper_path);
String database_replica_path = fs::path(database_zookeeper_path) / "replicas" / full_replica_name;
if (!zookeeper->exists(database_replica_path))
{
if (!throw_if_noop)
return;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica {} does not exist (database path: {})",
full_replica_name, database_zookeeper_path);
}
if (zookeeper->exists(database_replica_path + "/active"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica {} is active, cannot drop it (database path: {})",

View File

@ -79,7 +79,7 @@ public:
bool shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const override;
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica);
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, bool throw_if_noop);
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;

View File

@ -373,7 +373,7 @@ void DiskLocal::removeDirectory(const String & path)
{
auto fs_path = fs::path(disk_path) / path;
if (0 != rmdir(fs_path.c_str()))
ErrnoException::throwFromPath(ErrorCodes::CANNOT_RMDIR, fs_path, "Cannot rmdir {}", fs_path);
ErrnoException::throwFromPath(ErrorCodes::CANNOT_RMDIR, fs_path, "Cannot remove directory {}", fs_path);
}
void DiskLocal::removeRecursive(const String & path)

View File

@ -23,7 +23,7 @@
#include <Functions/FunctionIfBase.h>
#include <Interpreters/castColumn.h>
#include <Functions/FunctionFactory.h>
#include <type_traits>
namespace DB
{
@ -42,7 +42,8 @@ using namespace GatherUtils;
/** Selection function by condition: if(cond, then, else).
* cond - UInt8
* then, else - numeric types for which there is a general type, or dates, datetimes, or strings, or arrays of these types.
*/
* For better performance, try to use branch free code for numeric types(i.e. cond ? a : b --> !!cond * a + !cond * b), except floating point types because of Inf or NaN.
*/
template <typename ArrayCond, typename ArrayA, typename ArrayB, typename ArrayResult, typename ResultType>
inline void fillVectorVector(const ArrayCond & cond, const ArrayA & a, const ArrayB & b, ArrayResult & res)
@ -55,24 +56,48 @@ inline void fillVectorVector(const ArrayCond & cond, const ArrayA & a, const Arr
{
size_t a_index = 0, b_index = 0;
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a[a_index++]) : static_cast<ResultType>(b[b_index++]);
{
if constexpr (std::is_integral_v<ResultType>)
{
res[i] = !!cond[i] * static_cast<ResultType>(a[a_index]) + (!cond[i]) * static_cast<ResultType>(b[b_index]);
a_index += !!cond[i];
b_index += !cond[i];
}
else
res[i] = cond[i] ? static_cast<ResultType>(a[a_index++]) : static_cast<ResultType>(b[b_index++]);
}
}
else if (a_is_short)
{
size_t a_index = 0;
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a[a_index++]) : static_cast<ResultType>(b[i]);
if constexpr (std::is_integral_v<ResultType>)
{
res[i] = !!cond[i] * static_cast<ResultType>(a[a_index]) + (!cond[i]) * static_cast<ResultType>(b[i]);
a_index += !!cond[i];
}
else
res[i] = cond[i] ? static_cast<ResultType>(a[a_index++]) : static_cast<ResultType>(b[i]);
}
else if (b_is_short)
{
size_t b_index = 0;
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a[i]) : static_cast<ResultType>(b[b_index++]);
if constexpr (std::is_integral_v<ResultType>)
{
res[i] = !!cond[i] * static_cast<ResultType>(a[i]) + (!cond[i]) * static_cast<ResultType>(b[b_index]);
b_index += !cond[i];
}
else
res[i] = cond[i] ? static_cast<ResultType>(a[i]) : static_cast<ResultType>(b[b_index++]);
}
else
{
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a[i]) : static_cast<ResultType>(b[i]);
if constexpr (std::is_integral_v<ResultType>)
res[i] = !!cond[i] * static_cast<ResultType>(a[i]) + (!cond[i]) * static_cast<ResultType>(b[i]);
else
res[i] = cond[i] ? static_cast<ResultType>(a[i]) : static_cast<ResultType>(b[i]);
}
}
@ -85,12 +110,21 @@ inline void fillVectorConstant(const ArrayCond & cond, const ArrayA & a, B b, Ar
{
size_t a_index = 0;
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a[a_index++]) : static_cast<ResultType>(b);
if constexpr (std::is_integral_v<ResultType>)
{
res[i] = !!cond[i] * static_cast<ResultType>(a[a_index]) + (!cond[i]) * static_cast<ResultType>(b);
a_index += !!cond[i];
}
else
res[i] = cond[i] ? static_cast<ResultType>(a[a_index++]) : static_cast<ResultType>(b);
}
else
{
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a[i]) : static_cast<ResultType>(b);
if constexpr (std::is_integral_v<ResultType>)
res[i] = !!cond[i] * static_cast<ResultType>(a[i]) + (!cond[i]) * static_cast<ResultType>(b);
else
res[i] = cond[i] ? static_cast<ResultType>(a[i]) : static_cast<ResultType>(b);
}
}
@ -103,12 +137,21 @@ inline void fillConstantVector(const ArrayCond & cond, A a, const ArrayB & b, Ar
{
size_t b_index = 0;
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a) : static_cast<ResultType>(b[b_index++]);
if constexpr (std::is_integral_v<ResultType>)
{
res[i] = !!cond[i] * static_cast<ResultType>(a) + (!cond[i]) * static_cast<ResultType>(b[b_index]);
b_index += !cond[i];
}
else
res[i] = cond[i] ? static_cast<ResultType>(a) : static_cast<ResultType>(b[b_index++]);
}
else
{
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a) : static_cast<ResultType>(b[i]);
if constexpr (std::is_integral_v<ResultType>)
res[i] = !!cond[i] * static_cast<ResultType>(a) + (!cond[i]) * static_cast<ResultType>(b[i]);
else
res[i] = cond[i] ? static_cast<ResultType>(a) : static_cast<ResultType>(b[i]);
}
}

View File

@ -109,6 +109,8 @@ AuthSettings AuthSettings::loadFromConfig(const std::string & config_elem, const
{
auto access_key_id = config.getString(config_elem + ".access_key_id", "");
auto secret_access_key = config.getString(config_elem + ".secret_access_key", "");
auto session_token = config.getString(config_elem + ".session_token", "");
auto region = config.getString(config_elem + ".region", "");
auto server_side_encryption_customer_key_base64 = config.getString(config_elem + ".server_side_encryption_customer_key_base64", "");
@ -133,7 +135,7 @@ AuthSettings AuthSettings::loadFromConfig(const std::string & config_elem, const
return AuthSettings
{
std::move(access_key_id), std::move(secret_access_key),
std::move(access_key_id), std::move(secret_access_key), std::move(session_token),
std::move(region),
std::move(server_side_encryption_customer_key_base64),
std::move(sse_kms_config),
@ -155,6 +157,8 @@ void AuthSettings::updateFrom(const AuthSettings & from)
access_key_id = from.access_key_id;
if (!from.secret_access_key.empty())
secret_access_key = from.secret_access_key;
if (!from.session_token.empty())
session_token = from.session_token;
headers = from.headers;
region = from.region;

View File

@ -80,6 +80,7 @@ struct AuthSettings
std::string access_key_id;
std::string secret_access_key;
std::string session_token;
std::string region;
std::string server_side_encryption_customer_key_base64;
ServerSideEncryptionKMSConfig server_side_encryption_kms_config;

View File

@ -382,7 +382,6 @@ void executeQueryWithParallelReplicas(
shard_num = column->getUInt(0);
}
size_t all_replicas_count = 0;
ClusterPtr new_cluster;
/// if got valid shard_num from query initiator, then parallel replicas scope is the specified shard
/// shards are numbered in order of appearance in the cluster config
@ -406,16 +405,14 @@ void executeQueryWithParallelReplicas(
// shard_num is 1-based, but getClusterWithSingleShard expects 0-based index
auto single_shard_cluster = not_optimized_cluster->getClusterWithSingleShard(shard_num - 1);
// convert cluster to representation expected by parallel replicas
new_cluster = single_shard_cluster->getClusterWithReplicasAsShards(settings);
new_cluster = single_shard_cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas);
}
else
{
new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings);
new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas);
}
all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), new_cluster->getShardCount());
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(all_replicas_count);
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(new_cluster->getShardCount());
auto external_tables = new_context->getExternalTables();
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,

View File

@ -155,6 +155,7 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong parameter type in ALTER query");
if (!getContext()->getSettings().allow_experimental_statistic && (
command_ast->type == ASTAlterCommand::ADD_STATISTIC ||
command_ast->type == ASTAlterCommand::DROP_STATISTIC ||
@ -407,6 +408,7 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
break;
}
case ASTAlterCommand::DELETE:
case ASTAlterCommand::APPLY_DELETED_MASK:
case ASTAlterCommand::DROP_PARTITION:
case ASTAlterCommand::DROP_DETACHED_PARTITION:
{

View File

@ -6,6 +6,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Parsers/ASTCreateFunctionQuery.h>
@ -19,6 +20,7 @@ namespace ErrorCodes
BlockIO InterpreterCreateFunctionQuery::execute()
{
FunctionNameNormalizer().visit(query_ptr.get());
const auto updated_query_ptr = removeOnClusterClauseIfNeeded(query_ptr, getContext());
ASTCreateFunctionQuery & create_function_query = updated_query_ptr->as<ASTCreateFunctionQuery &>();

View File

@ -4,6 +4,7 @@
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIndexDeclaration.h>
@ -22,6 +23,7 @@ namespace ErrorCodes
BlockIO InterpreterCreateIndexQuery::execute()
{
FunctionNameNormalizer().visit(query_ptr.get());
auto current_context = getContext();
const auto & create_index = query_ptr->as<ASTCreateIndexQuery &>();

View File

@ -964,7 +964,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(database.get()))
{
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, /*throw_if_noop*/ true);
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database {} is not Replicated, cannot drop replica", query.getDatabase());
@ -989,7 +989,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
}
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, /*throw_if_noop*/ false);
LOG_TRACE(log, "Dropped replica {} of Replicated database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
}
@ -1002,7 +1002,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(elem.second.get()))
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica);
DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica, /*throw_if_noop*/ true);
LOG_INFO(log, "Dropped replica {} of Replicated database with path {}", query.replica, query.replica_zk_path);
}
else

View File

@ -26,6 +26,7 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/queryToString.h>
#include <IO/WriteHelpers.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <DataTypes/NestedUtils.h>
@ -153,19 +154,29 @@ bool isStorageTouchedByMutations(
return false;
bool all_commands_can_be_skipped = true;
for (const MutationCommand & command : commands)
for (const auto & command : commands)
{
if (!command.predicate) /// The command touches all rows.
return true;
if (command.partition)
if (command.type == MutationCommand::APPLY_DELETED_MASK)
{
const String partition_id = storage.getPartitionIDFromQuery(command.partition, context);
if (partition_id == source_part->info.partition_id)
all_commands_can_be_skipped = false;
if (source_part->hasLightweightDelete())
return true;
}
else
all_commands_can_be_skipped = false;
{
if (!command.predicate) /// The command touches all rows.
return true;
if (command.partition)
{
const String partition_id = storage.getPartitionIDFromQuery(command.partition, context);
if (partition_id == source_part->info.partition_id)
all_commands_can_be_skipped = false;
}
else
{
all_commands_can_be_skipped = false;
}
}
}
if (all_commands_can_be_skipped)
@ -211,7 +222,6 @@ bool isStorageTouchedByMutations(
return count != 0;
}
ASTPtr getPartitionAndPredicateExpressionForMutationCommand(
const MutationCommand & command,
const StoragePtr & storage,
@ -244,6 +254,32 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand(
return command.predicate ? command.predicate->clone() : partition_predicate_as_ast_func;
}
MutationCommand createCommandToApplyDeletedMask(const MutationCommand & command)
{
if (command.type != MutationCommand::APPLY_DELETED_MASK)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected APPLY_DELETED_MASK mutation command, got: {}", magic_enum::enum_name(command.type));
auto alter_command = std::make_shared<ASTAlterCommand>();
alter_command->type = ASTAlterCommand::DELETE;
alter_command->partition = command.partition;
auto row_exists_predicate = makeASTFunction("equals",
std::make_shared<ASTIdentifier>(LightweightDeleteDescription::FILTER_COLUMN.name),
std::make_shared<ASTLiteral>(Field(0)));
if (command.predicate)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Mutation command APPLY DELETED MASK does not support WHERE clause");
alter_command->predicate = row_exists_predicate;
auto mutation_command = MutationCommand::parse(alter_command.get());
if (!mutation_command)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to parse command {}. It's a bug", queryToString(alter_command));
return *mutation_command;
}
MutationsInterpreter::Source::Source(StoragePtr storage_) : storage(std::move(storage_))
{
}
@ -517,15 +553,18 @@ void MutationsInterpreter::prepare(bool dry_run)
NameSet updated_columns;
bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly();
for (const MutationCommand & command : commands)
for (auto & command : commands)
{
if (command.type == MutationCommand::Type::UPDATE
|| command.type == MutationCommand::Type::DELETE)
if (command.type == MutationCommand::Type::APPLY_DELETED_MASK)
command = createCommandToApplyDeletedMask(command);
if (command.type == MutationCommand::Type::UPDATE || command.type == MutationCommand::Type::DELETE)
materialize_ttl_recalculate_only = false;
for (const auto & [name, _] : command.column_to_update_expression)
{
if (!available_columns_set.contains(name) && name != LightweightDeleteDescription::FILTER_COLUMN.name
if (!available_columns_set.contains(name)
&& name != LightweightDeleteDescription::FILTER_COLUMN.name
&& name != BlockNumberColumn::name)
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN,
"Column {} is updated but not requested to read", name);
@ -574,7 +613,7 @@ void MutationsInterpreter::prepare(bool dry_run)
std::vector<String> read_columns;
/// First, break a sequence of commands into stages.
for (auto & command : commands)
for (const auto & command : commands)
{
// we can return deleted rows only if it's the only present command
assert(command.type == MutationCommand::DELETE || command.type == MutationCommand::UPDATE || !settings.return_mutated_rows);
@ -585,7 +624,7 @@ void MutationsInterpreter::prepare(bool dry_run)
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
auto predicate = getPartitionAndPredicateExpressionForMutationCommand(command);
auto predicate = getPartitionAndPredicateExpressionForMutationCommand(command);
if (!settings.return_mutated_rows)
predicate = makeASTFunction("isZeroOrNull", predicate);
@ -605,16 +644,12 @@ void MutationsInterpreter::prepare(bool dry_run)
NameSet affected_materialized;
for (const auto & kv : command.column_to_update_expression)
for (const auto & [column_name, update_expr] : command.column_to_update_expression)
{
const String & column = kv.first;
auto materialized_it = column_to_affected_materialized.find(column);
auto materialized_it = column_to_affected_materialized.find(column_name);
if (materialized_it != column_to_affected_materialized.end())
{
for (const String & mat_column : materialized_it->second)
for (const auto & mat_column : materialized_it->second)
affected_materialized.emplace(mat_column);
}
/// When doing UPDATE column = expression WHERE condition
/// we will replace column to the result of the following expression:
@ -627,33 +662,39 @@ void MutationsInterpreter::prepare(bool dry_run)
/// Outer CAST is added just in case if we don't trust the returning type of 'if'.
DataTypePtr type;
if (auto physical_column = columns_desc.tryGetPhysical(column))
if (auto physical_column = columns_desc.tryGetPhysical(column_name))
{
type = physical_column->type;
else if (column == LightweightDeleteDescription::FILTER_COLUMN.name)
}
else if (column_name == LightweightDeleteDescription::FILTER_COLUMN.name)
{
type = LightweightDeleteDescription::FILTER_COLUMN.type;
else if (column == BlockNumberColumn::name)
deleted_mask_updated = true;
}
else if (column_name == BlockNumberColumn::name)
{
type = BlockNumberColumn::type;
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column);
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column_name);
}
auto type_literal = std::make_shared<ASTLiteral>(type->getName());
const auto & update_expr = kv.second;
ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command);
/// And new check validateNestedArraySizes for Nested subcolumns
if (isArray(type) && !Nested::splitName(column).second.empty())
if (isArray(type) && !Nested::splitName(column_name).second.empty())
{
std::shared_ptr<ASTFunction> function = nullptr;
auto nested_update_exprs = getExpressionsOfUpdatedNestedSubcolumns(column, all_columns, command.column_to_update_expression);
auto nested_update_exprs = getExpressionsOfUpdatedNestedSubcolumns(column_name, all_columns, command.column_to_update_expression);
if (!nested_update_exprs)
{
function = makeASTFunction("validateNestedArraySizes",
condition,
update_expr->clone(),
std::make_shared<ASTIdentifier>(column));
std::make_shared<ASTIdentifier>(column_name));
condition = makeASTFunction("and", condition, function);
}
else if (nested_update_exprs->size() > 1)
@ -675,10 +716,10 @@ void MutationsInterpreter::prepare(bool dry_run)
makeASTFunction("_CAST",
update_expr->clone(),
type_literal),
std::make_shared<ASTIdentifier>(column)),
std::make_shared<ASTIdentifier>(column_name)),
type_literal);
stages.back().column_to_updated.emplace(column, updated_column);
stages.back().column_to_updated.emplace(column_name, updated_column);
if (condition && settings.return_mutated_rows)
stages.back().filters.push_back(condition);
@ -986,27 +1027,42 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
auto all_columns = storage_snapshot->getColumnsByNames(options, available_columns);
/// Add _row_exists column if it is present in the part
if (source.hasLightweightDeleteMask())
all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN});
if (source.hasLightweightDeleteMask() || deleted_mask_updated)
all_columns.push_back(LightweightDeleteDescription::FILTER_COLUMN);
bool has_filters = false;
/// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < prepared_stages.size(); ++i)
{
if (settings.return_all_columns || !prepared_stages[i].filters.empty())
{
for (const auto & column : all_columns)
{
if (column.name == LightweightDeleteDescription::FILTER_COLUMN.name && !deleted_mask_updated)
continue;
prepared_stages[i].output_columns.insert(column.name);
continue;
}
has_filters = true;
settings.apply_deleted_mask = true;
}
else
{
if (i > 0)
prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns;
if (i > 0)
prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns;
/// Make sure that all updated columns are included into output_columns set.
/// This is important for a "hidden" column like _row_exists gets because it is a virtual column
/// and so it is not in the list of AllPhysical columns.
for (const auto & [column_name, _] : prepared_stages[i].column_to_updated)
{
if (column_name == LightweightDeleteDescription::FILTER_COLUMN.name && has_filters && !deleted_mask_updated)
continue;
/// Make sure that all updated columns are included into output_columns set.
/// This is important for a "hidden" column like _row_exists gets because it is a virtual column
/// and so it is not in the list of AllPhysical columns.
for (const auto & kv : prepared_stages[i].column_to_updated)
prepared_stages[i].output_columns.insert(kv.first);
prepared_stages[i].output_columns.insert(column_name);
}
}
}
/// Now, calculate `expressions_chain` for each stage except the first.
@ -1024,7 +1080,7 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
all_asts->children.push_back(kv.second);
/// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns.
for (const String & column : stage.output_columns)
for (const auto & column : stage.output_columns)
all_asts->children.push_back(std::make_shared<ASTIdentifier>(column));
/// Executing scalar subquery on that stage can lead to deadlock
@ -1081,7 +1137,6 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
actions_chain.getLastStep().addRequiredOutput(name);
actions_chain.getLastActions();
actions_chain.finalize();
if (i)
@ -1224,7 +1279,7 @@ void MutationsInterpreter::Source::read(
VirtualColumns virtual_columns(std::move(required_columns), part);
createMergeTreeSequentialSource(
createReadFromPartStep(
plan, *data, storage_snapshot, part,
std::move(virtual_columns.columns_to_read),
apply_deleted_mask_, filter, context_,

View File

@ -32,6 +32,8 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand(
ContextPtr context
);
MutationCommand createCommandToApplyDeletedMask(const MutationCommand & command);
/// Create an input stream that will read data from storage and apply mutation commands (UPDATEs, DELETEs, MATERIALIZEs)
/// to this data.
class MutationsInterpreter
@ -213,6 +215,7 @@ private:
std::unique_ptr<Block> updated_header;
std::vector<Stage> stages;
bool is_prepared = false; /// Has the sequence of stages been prepared.
bool deleted_mask_updated = false;
NameSet materialized_indices;
NameSet materialized_projections;

View File

@ -466,6 +466,16 @@ void ASTAlterCommand::formatImpl(const FormatSettings & settings, FormatState &
settings.ostr << (settings.hilite ? hilite_keyword : "") << " TO ";
rename_to->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::APPLY_DELETED_MASK)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "APPLY DELETED MASK" << (settings.hilite ? hilite_none : "");
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
}
else
throw Exception(ErrorCodes::UNEXPECTED_AST_STRUCTURE, "Unexpected type of ALTER");
}

View File

@ -71,6 +71,7 @@ public:
DELETE,
UPDATE,
APPLY_DELETED_MASK,
NO_TYPE,

View File

@ -111,6 +111,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_remove_ttl("REMOVE TTL");
ParserKeyword s_remove_sample_by("REMOVE SAMPLE BY");
ParserKeyword s_apply_deleted_mask("APPLY DELETED MASK");
ParserCompoundIdentifier parser_name;
ParserStringLiteral parser_string_literal;
@ -823,6 +824,16 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->type = ASTAlterCommand::MODIFY_COMMENT;
}
else if (s_apply_deleted_mask.ignore(pos, expected))
{
command->type = ASTAlterCommand::APPLY_DELETED_MASK;
if (s_in_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
}
}
else
return false;
}

View File

@ -28,6 +28,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
ParserKeyword s_partition("PARTITION");
ParserKeyword s_final("FINAL");
ParserKeyword s_deduplicate("DEDUPLICATE");
ParserKeyword s_cleanup("CLEANUP");
ParserKeyword s_by("BY");
ParserToken s_dot(TokenType::Dot);
ParserIdentifier name_p(true);
@ -76,6 +77,9 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
return false;
}
/// Obsolete feature, ignored for backward compatibility.
s_cleanup.ignore(pos, expected);
auto query = std::make_shared<ASTOptimizeQuery>();
node = query;

View File

@ -740,7 +740,7 @@ void DWARFBlockInputFormat::parseFilenameTable(UnitState & unit, uint64_t offset
auto error = prologue.parse(*debug_line_extractor, &offset, /*RecoverableErrorHandler*/ [&](auto e)
{
if (++seen_debug_line_warnings < 10)
LOG_INFO(&Poco::Logger::get("DWARF"), "{}", llvm::toString(std::move(e)));
LOG_INFO(&Poco::Logger::get("DWARF"), "Parsing error: {}", llvm::toString(std::move(e)));
}, *dwarf_context, unit.dwarf_unit);
if (error)

View File

@ -367,7 +367,9 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
IConnections::ReplicaInfo replica_info
{
.all_replicas_count = all_replicas_count,
.number_of_current_replica = 0
/// `shard_num` will be equal to the number of the given replica in the cluster (set by `Cluster::getClusterWithReplicasAsShards`).
/// we should use this number specifically because efficiency of data distribution by consistent hash depends on it.
.number_of_current_replica = shard.shard_num - 1,
};
addPipeForSingeReplica(pipes, shard.pool, replica_info);
@ -386,7 +388,9 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
IConnections::ReplicaInfo replica_info
{
.all_replicas_count = all_replicas_count,
.number_of_current_replica = pipes.size()
/// `shard_num` will be equal to the number of the given replica in the cluster (set by `Cluster::getClusterWithReplicasAsShards`).
/// we should use this number specifically because efficiency of data distribution by consistent hash depends on it.
.number_of_current_replica = current_shard->shard_num - 1,
};
addPipeForSingeReplica(pipes, current_shard->pool, replica_info);

View File

@ -67,7 +67,7 @@ public:
// Must insert the result for current_row.
virtual void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) = 0;
size_t function_index) const = 0;
virtual std::optional<WindowFrame> getDefaultFrame() const { return {}; }
};
@ -1463,7 +1463,7 @@ struct WindowFunctionRank final : public WindowFunction
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
size_t function_index) const override
{
IColumn & to = *transform->blockAt(transform->current_row)
.output_columns[function_index];
@ -1482,7 +1482,7 @@ struct WindowFunctionDenseRank final : public WindowFunction
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
size_t function_index) const override
{
IColumn & to = *transform->blockAt(transform->current_row)
.output_columns[function_index];
@ -1561,7 +1561,7 @@ struct StatefulWindowFunction : public WindowFunction
bool hasTrivialDestructor() const override { return std::is_trivially_destructible_v<State>; }
State & getState(const WindowFunctionWorkspace & workspace)
State & getState(const WindowFunctionWorkspace & workspace) const
{
return *static_cast<State *>(static_cast<void *>(workspace.aggregate_function_state.data()));
}
@ -1626,7 +1626,7 @@ struct WindowFunctionExponentialTimeDecayedSum final : public StatefulWindowFunc
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
size_t function_index) const override
{
const auto & workspace = transform->workspaces[function_index];
auto & state = getState(workspace);
@ -1723,7 +1723,7 @@ struct WindowFunctionExponentialTimeDecayedMax final : public WindowFunction
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
size_t function_index) const override
{
Float64 result = std::numeric_limits<Float64>::quiet_NaN();
@ -1790,7 +1790,7 @@ struct WindowFunctionExponentialTimeDecayedCount final : public StatefulWindowFu
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
size_t function_index) const override
{
const auto & workspace = transform->workspaces[function_index];
auto & state = getState(workspace);
@ -1884,7 +1884,7 @@ struct WindowFunctionExponentialTimeDecayedAvg final : public StatefulWindowFunc
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
size_t function_index) const override
{
const auto & workspace = transform->workspaces[function_index];
auto & state = getState(workspace);
@ -1962,7 +1962,7 @@ struct WindowFunctionRowNumber final : public WindowFunction
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
size_t function_index) const override
{
IColumn & to = *transform->blockAt(transform->current_row)
.output_columns[function_index];
@ -2015,7 +2015,7 @@ struct WindowFunctionNtile final : public StatefulWindowFunction<NtileState>
}
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
size_t function_index) const override
{
const auto & workspace = transform->workspaces[function_index];
auto & state = getState(workspace);
@ -2207,7 +2207,7 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
size_t function_index) const override
{
const auto & current_block = transform->blockAt(transform->current_row);
IColumn & to = *current_block.output_columns[function_index];
@ -2297,7 +2297,7 @@ struct WindowFunctionNthValue final : public WindowFunction
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
size_t function_index) const override
{
const auto & current_block = transform->blockAt(transform->current_row);
IColumn & to = *current_block.output_columns[function_index];
@ -2425,7 +2425,7 @@ struct WindowFunctionNonNegativeDerivative final : public StatefulWindowFunction
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
size_t function_index) const override
{
const auto & current_block = transform->blockAt(transform->current_row);
const auto & workspace = transform->workspaces[function_index];

View File

@ -729,8 +729,8 @@ void HTTPHandler::processQuery(
/// to some other value.
const auto & settings = context->getSettingsRef();
/// Only readonly queries are allowed for HTTP GET requests.
if (request.getMethod() == HTTPServerRequest::HTTP_GET)
/// Anything else beside HTTP POST should be readonly queries.
if (request.getMethod() != HTTPServerRequest::HTTP_POST)
{
if (settings.readonly == 0)
context->setSetting("readonly", 2);

View File

@ -593,23 +593,6 @@ UInt64 IMergeTreeDataPart::getMarksCount() const
return index_granularity.getMarksCount();
}
UInt64 IMergeTreeDataPart::getExistingBytesOnDisk() const
{
if (!supportLightweightDeleteMutate() || !hasLightweightDelete() || !rows_count
|| !storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge)
return bytes_on_disk;
/// Uninitialized existing_rows_count
/// (if existing_rows_count equals rows_count, it means that previously we failed to read existing_rows_count)
if (existing_rows_count > rows_count)
readExistingRowsCount();
if (existing_rows_count < rows_count)
return bytes_on_disk * existing_rows_count / rows_count;
else /// Load failed
return bytes_on_disk;
}
size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
{
auto checksum = checksums.files.find(file_name);
@ -1304,85 +1287,6 @@ void IMergeTreeDataPart::loadRowsCount()
}
}
void IMergeTreeDataPart::readExistingRowsCount() const
{
if (!supportLightweightDeleteMutate() || !hasLightweightDelete() || !storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge
|| existing_rows_count < rows_count || !getMarksCount())
return;
std::lock_guard lock(existing_rows_count_mutex);
/// Already read by another thread
if (existing_rows_count < rows_count)
return;
NamesAndTypesList cols;
cols.push_back(LightweightDeleteDescription::FILTER_COLUMN);
StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr();
StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr);
MergeTreeReaderPtr reader = getReader(
cols,
storage_snapshot_ptr,
MarkRanges{MarkRange(0, getMarksCount())},
nullptr,
storage.getContext()->getMarkCache().get(),
std::make_shared<AlterConversions>(),
MergeTreeReaderSettings{},
ValueSizeMap{},
ReadBufferFromFileBase::ProfileCallback{});
if (!reader)
{
LOG_WARNING(storage.log, "Create reader failed while reading existing rows count");
existing_rows_count = rows_count;
return;
}
size_t current_mark = 0;
const size_t total_mark = getMarksCount();
bool continue_reading = false;
size_t current_row = 0;
size_t existing_count = 0;
while (current_row < rows_count)
{
size_t rows_to_read = index_granularity.getMarkRows(current_mark);
continue_reading = (current_mark != 0);
Columns result;
result.resize(1);
size_t rows_read = reader->readRows(current_mark, total_mark, continue_reading, rows_to_read, result);
if (!rows_read)
{
LOG_WARNING(storage.log, "Part {} has lightweight delete, but _row_exists column not found", name);
existing_rows_count = rows_count;
return;
}
current_row += rows_read;
current_mark += (rows_to_read == rows_read);
const ColumnUInt8 * row_exists_col = typeid_cast<const ColumnUInt8 *>(result[0].get());
if (!row_exists_col)
{
LOG_WARNING(storage.log, "Part {} _row_exists column type is not UInt8", name);
existing_rows_count = rows_count;
return;
}
for (UInt8 row_exists : row_exists_col->getData())
if (row_exists)
existing_count++;
}
existing_rows_count = existing_count;
LOG_DEBUG(storage.log, "Part {} existing_rows_count = {}", name, existing_rows_count);
}
void IMergeTreeDataPart::appendFilesOfRowsCount(Strings & files)
{
files.push_back("count.txt");

View File

@ -229,13 +229,6 @@ public:
size_t rows_count = 0;
/// Existing rows count (excluding lightweight deleted rows)
/// UINT64_MAX -> uninitialized
/// 0 -> all rows were deleted
/// if reading failed, it will be set to rows_count
mutable size_t existing_rows_count = UINT64_MAX;
mutable std::mutex existing_rows_count_mutex;
time_t modification_time = 0;
/// When the part is removed from the working set. Changes once.
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };
@ -381,10 +374,6 @@ public:
void setBytesOnDisk(UInt64 bytes_on_disk_) { bytes_on_disk = bytes_on_disk_; }
void setBytesUncompressedOnDisk(UInt64 bytes_uncompressed_on_disk_) { bytes_uncompressed_on_disk = bytes_uncompressed_on_disk_; }
/// Returns estimated size of existing rows if setting exclude_deleted_rows_for_part_size_in_merge is true
/// Otherwise returns bytes_on_disk
UInt64 getExistingBytesOnDisk() const;
size_t getFileSizeOrZero(const String & file_name) const;
auto getFilesChecksums() const { return checksums.files; }
@ -511,9 +500,6 @@ public:
/// True if here is lightweight deleted mask file in part.
bool hasLightweightDelete() const { return columns.contains(LightweightDeleteDescription::FILTER_COLUMN.name); }
/// Read existing rows count from _row_exists column
void readExistingRowsCount() const;
void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings);
/// Checks the consistency of this data part.

View File

@ -160,7 +160,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
}
/// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts, true);
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
/// Can throw an exception while reserving space.
IMergeTreeDataPart::TTLInfos ttl_infos;

View File

@ -570,6 +570,8 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
global_ctx->storage_snapshot,
global_ctx->future_part->parts[part_num],
column_names,
/*mark_ranges=*/ {},
/*apply_deleted_mask=*/ true,
ctx->read_with_direct_io,
/*take_column_types_from_storage=*/ true,
/*quiet=*/ false,
@ -922,6 +924,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
global_ctx->storage_snapshot,
part,
global_ctx->merging_column_names,
/*mark_ranges=*/ {},
/*apply_deleted_mask=*/ true,
ctx->read_with_direct_io,
/*take_column_types_from_storage=*/ true,
/*quiet=*/ false,

View File

@ -405,7 +405,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
}
IMergeSelector::Part part_info;
part_info.size = part->getExistingBytesOnDisk();
part_info.size = part->getBytesOnDisk();
part_info.age = res.current_time - part->modification_time;
part_info.level = part->info.level;
part_info.data = &part;
@ -611,7 +611,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
return SelectPartsDecision::CANNOT_SELECT;
}
sum_bytes += (*it)->getExistingBytesOnDisk();
sum_bytes += (*it)->getBytesOnDisk();
prev_it = it;
++it;
@ -791,7 +791,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
}
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & is_merge)
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts)
{
size_t res = 0;
time_t current_time = std::time(nullptr);
@ -802,10 +802,7 @@ size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::
if (part_max_ttl && part_max_ttl <= current_time)
continue;
if (is_merge && part->storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge)
res += part->getExistingBytesOnDisk();
else
res += part->getBytesOnDisk();
res += part->getBytesOnDisk();
}
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);

View File

@ -192,7 +192,7 @@ public:
/// The approximate amount of disk space needed for merge or mutation. With a surplus.
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & is_merge);
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts);
private:
/** Select all parts belonging to the same partition.

View File

@ -131,6 +131,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical)
.withExtendedObjects()
.withSystemColumns();
if (storage.supportsSubcolumns())
options.withSubcolumns();
columns_for_reader = storage_snapshot->getColumnsByNames(options, columns_to_read);
@ -241,19 +242,24 @@ Pipe createMergeTreeSequentialSource(
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
Names columns_to_read,
std::optional<MarkRanges> mark_ranges,
bool apply_deleted_mask,
bool read_with_direct_io,
bool take_column_types_from_storage,
bool quiet,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count)
{
const auto & filter_column = LightweightDeleteDescription::FILTER_COLUMN;
/// The part might have some rows masked by lightweight deletes
const bool need_to_filter_deleted_rows = data_part->hasLightweightDelete();
auto columns = columns_to_read;
if (need_to_filter_deleted_rows)
columns.emplace_back(LightweightDeleteDescription::FILTER_COLUMN.name);
const bool need_to_filter_deleted_rows = apply_deleted_mask && data_part->hasLightweightDelete();
const bool has_filter_column = std::ranges::find(columns_to_read, filter_column.name) != columns_to_read.end();
if (need_to_filter_deleted_rows && !has_filter_column)
columns_to_read.emplace_back(filter_column.name);
auto column_part_source = std::make_shared<MergeTreeSequentialSource>(
storage, storage_snapshot, data_part, columns, std::optional<MarkRanges>{},
storage, storage_snapshot, data_part, columns_to_read, std::move(mark_ranges),
/*apply_deleted_mask=*/ false, read_with_direct_io, take_column_types_from_storage, quiet);
Pipe pipe(std::move(column_part_source));
@ -261,10 +267,10 @@ Pipe createMergeTreeSequentialSource(
/// Add filtering step that discards deleted rows
if (need_to_filter_deleted_rows)
{
pipe.addSimpleTransform([filtered_rows_count](const Block & header)
pipe.addSimpleTransform([filtered_rows_count, has_filter_column](const Block & header)
{
return std::make_shared<FilterTransform>(
header, nullptr, LightweightDeleteDescription::FILTER_COLUMN.name, true, false, filtered_rows_count);
header, nullptr, filter_column.name, !has_filter_column, false, filtered_rows_count);
});
}
@ -325,9 +331,17 @@ public:
}
}
auto source = std::make_unique<MergeTreeSequentialSource>(
storage, storage_snapshot, data_part, columns_to_read,
std::move(mark_ranges), apply_deleted_mask, false, true);
auto source = createMergeTreeSequentialSource(
storage,
storage_snapshot,
data_part,
columns_to_read,
std::move(mark_ranges),
apply_deleted_mask,
/*read_with_direct_io=*/ false,
/*take_column_types_from_storage=*/ true,
/*quiet=*/ false,
/*filtered_rows_count=*/ nullptr);
pipeline.init(Pipe(std::move(source)));
}
@ -343,7 +357,7 @@ private:
Poco::Logger * log;
};
void createMergeTreeSequentialSource(
void createReadFromPartStep(
QueryPlan & plan,
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,

View File

@ -15,6 +15,8 @@ Pipe createMergeTreeSequentialSource(
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
Names columns_to_read,
std::optional<MarkRanges> mark_ranges,
bool apply_deleted_mask,
bool read_with_direct_io,
bool take_column_types_from_storage,
bool quiet,
@ -22,7 +24,7 @@ Pipe createMergeTreeSequentialSource(
class QueryPlan;
void createMergeTreeSequentialSource(
void createReadFromPartStep(
QueryPlan & plan,
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,

View File

@ -78,7 +78,6 @@ struct Settings;
M(UInt64, number_of_mutations_to_throw, 1000, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \
M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \

View File

@ -49,7 +49,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
}
/// TODO - some better heuristic?
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part}, false);
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part});
if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)

View File

@ -51,7 +51,6 @@ static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeLis
return true;
}
/** Split mutation commands into two parts:
* First part should be executed by mutations interpreter.
* Other is just simple drop/renames, so they can be executed without interpreter.
@ -79,7 +78,8 @@ static void splitAndModifyMutationCommands(
|| command.type == MutationCommand::Type::MATERIALIZE_PROJECTION
|| command.type == MutationCommand::Type::MATERIALIZE_TTL
|| command.type == MutationCommand::Type::DELETE
|| command.type == MutationCommand::Type::UPDATE)
|| command.type == MutationCommand::Type::UPDATE
|| command.type == MutationCommand::Type::APPLY_DELETED_MASK)
{
for_interpreter.push_back(command);
for (const auto & [column_name, expr] : command.column_to_update_expression)
@ -202,7 +202,8 @@ static void splitAndModifyMutationCommands(
|| command.type == MutationCommand::Type::MATERIALIZE_PROJECTION
|| command.type == MutationCommand::Type::MATERIALIZE_TTL
|| command.type == MutationCommand::Type::DELETE
|| command.type == MutationCommand::Type::UPDATE)
|| command.type == MutationCommand::Type::UPDATE
|| command.type == MutationCommand::Type::APPLY_DELETED_MASK)
{
for_interpreter.push_back(command);
}
@ -257,15 +258,12 @@ getColumnsForNewDataPart(
NameToNameMap renamed_columns_from_to;
ColumnsDescription part_columns(source_part->getColumns());
NamesAndTypesList system_columns;
if (source_part->supportLightweightDeleteMutate())
system_columns.push_back(LightweightDeleteDescription::FILTER_COLUMN);
/// Preserve system columns that have persisted values in the source_part
for (const auto & column : system_columns)
{
if (part_columns.has(column.name) && !storage_columns.contains(column.name))
storage_columns.emplace_back(column);
}
const auto & deleted_mask_column = LightweightDeleteDescription::FILTER_COLUMN;
bool supports_lightweight_deletes = source_part->supportLightweightDeleteMutate();
bool deleted_mask_updated = false;
bool has_delete_command = false;
NameSet storage_columns_set;
for (const auto & [name, _] : storage_columns)
@ -277,23 +275,22 @@ getColumnsForNewDataPart(
{
for (const auto & [column_name, _] : command.column_to_update_expression)
{
/// Allow to update and persist values of system column
auto column = system_columns.tryGetByName(column_name);
if (column && !storage_columns.contains(column_name))
storage_columns.emplace_back(column_name, column->type);
if (column_name == deleted_mask_column.name
&& supports_lightweight_deletes
&& !storage_columns_set.contains(deleted_mask_column.name))
deleted_mask_updated = true;
}
}
if (command.type == MutationCommand::DELETE || command.type == MutationCommand::APPLY_DELETED_MASK)
has_delete_command = true;
/// If we don't have this column in source part, than we don't need to materialize it
if (!part_columns.has(command.column_name))
{
continue;
}
if (command.type == MutationCommand::DROP_COLUMN)
{
removed_columns.insert(command.column_name);
}
if (command.type == MutationCommand::RENAME_COLUMN)
{
@ -302,6 +299,15 @@ getColumnsForNewDataPart(
}
}
if (!storage_columns_set.contains(deleted_mask_column.name))
{
if (deleted_mask_updated || (part_columns.has(deleted_mask_column.name) && !has_delete_command))
{
storage_columns.push_back(deleted_mask_column);
storage_columns_set.insert(deleted_mask_column.name);
}
}
SerializationInfoByName new_serialization_infos;
for (const auto & [name, old_info] : serialization_infos)
{
@ -1530,7 +1536,8 @@ private:
for (auto & command_for_interpreter : ctx->for_interpreter)
{
if (command_for_interpreter.type == MutationCommand::DELETE)
if (command_for_interpreter.type == MutationCommand::DELETE
|| command_for_interpreter.type == MutationCommand::APPLY_DELETED_MASK)
{
has_delete = true;
break;
@ -1937,6 +1944,9 @@ static bool canSkipMutationCommandForPart(const MergeTreeDataPartPtr & part, con
return true;
}
if (command.type == MutationCommand::APPLY_DELETED_MASK && !part->hasLightweightDelete())
return true;
if (canSkipConversionToNullable(part, command))
return true;

View File

@ -269,6 +269,12 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in, MergeTreeDataFor
deduplicate_by_columns = std::move(new_deduplicate_by_columns);
}
else if (checkString("cleanup: ", in))
{
/// Obsolete option, does nothing.
bool cleanup = false;
in >> cleanup;
}
else
trailing_newline_found = true;
}

View File

@ -1349,7 +1349,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (auto part_in_memory = asInMemoryPart(part))
sum_parts_size_in_bytes += part_in_memory->block.bytes();
else
sum_parts_size_in_bytes += part->getExistingBytesOnDisk();
sum_parts_size_in_bytes += part->getBytesOnDisk();
}
}

View File

@ -59,6 +59,15 @@ std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command,
}
return res;
}
else if (command->type == ASTAlterCommand::APPLY_DELETED_MASK)
{
MutationCommand res;
res.ast = command->ptr();
res.type = APPLY_DELETED_MASK;
res.predicate = command->predicate;
res.partition = command->partition;
return res;
}
else if (command->type == ASTAlterCommand::MATERIALIZE_INDEX)
{
MutationCommand res;

View File

@ -39,6 +39,7 @@ struct MutationCommand
MATERIALIZE_TTL,
RENAME_COLUMN,
MATERIALIZE_COLUMN,
APPLY_DELETED_MASK,
ALTER_WITHOUT_MUTATION, /// pure metadata command, currently unusned
};

View File

@ -1085,7 +1085,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (isTTLMergeType(future_part->merge_type))
getContext()->getMergeList().bookMergeWithTTL();
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts, true), *this, metadata_snapshot, false);
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts), *this, metadata_snapshot, false);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(merging_tagger), std::make_shared<MutationCommands>());
}
@ -1301,7 +1301,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
future_part->name = part->getNewName(new_part_info);
future_part->part_format = part->getFormat();
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}, false), *this, metadata_snapshot, true);
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn);
}
}

View File

@ -104,6 +104,7 @@ static const std::unordered_set<std::string_view> optional_configuration_keys =
"structure",
"access_key_id",
"secret_access_key",
"session_token",
"filename",
"use_environment_credentials",
"max_single_read_retries",
@ -1460,7 +1461,7 @@ void StorageS3::Configuration::connect(ContextPtr context)
client_configuration.requestTimeoutMs = request_settings.request_timeout_ms;
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key, auth_settings.session_token);
client = S3::ClientFactory::instance().create(
client_configuration,
url.is_virtual_hosted_style,
@ -1521,11 +1522,14 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
/// S3('url', NOSIGN, 'format')
/// S3('url', NOSIGN, 'format', 'compression')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'session_token')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'format')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'session_token', 'format')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'format', 'compression')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'session_token', 'format', 'compression')
/// with optional headers() function
if (engine_args.empty() || engine_args.size() > 5)
if (engine_args.empty() || engine_args.size() > 6)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage S3 requires 1 to 5 arguments: "
"url, [NOSIGN | access_key_id, secret_access_key], name of used format and [compression_method]");
@ -1541,7 +1545,7 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
static std::unordered_map<size_t, std::unordered_map<std::string_view, size_t>> size_to_engine_args
{
{1, {{}}},
{5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"compression_method", 4}}}
{6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"compression_method", 5}}}
};
std::unordered_map<std::string_view, size_t> engine_args_to_idx;
@ -1577,7 +1581,8 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
else
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}};
}
/// For 4 arguments we support 2 possible variants:
/// For 4 arguments we support 3 possible variants:
/// - s3(source, access_key_id, secret_access_key, session_token)
/// - s3(source, access_key_id, secret_access_key, format)
/// - s3(source, NOSIGN, format, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN or not.
@ -1590,7 +1595,32 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
engine_args_to_idx = {{"format", 2}, {"compression_method", 3}};
}
else
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}};
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "session_token/format");
if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg))
{
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}};
}
else
{
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}};
}
}
}
/// For 5 arguments we support 2 possible variants:
/// - s3(source, access_key_id, secret_access_key, session_token, format)
/// - s3(source, access_key_id, secret_access_key, format, compression)
else if (engine_args.size() == 5)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "session_token/format");
if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg))
{
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"compression", 4}};
}
else
{
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}};
}
}
else
{
@ -1612,6 +1642,10 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
if (engine_args_to_idx.contains("secret_access_key"))
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[engine_args_to_idx["secret_access_key"]], "secret_access_key");
if (engine_args_to_idx.contains("session_token"))
configuration.auth_settings.session_token = checkAndGetLiteralArgument<String>(engine_args[engine_args_to_idx["session_token"]], "session_token");
configuration.auth_settings.no_sign_request = no_sign_request;
}

View File

@ -35,6 +35,7 @@ void StorageSnapshot::init()
if (storage.hasLightweightDeletedMask())
system_columns[LightweightDeleteDescription::FILTER_COLUMN.name] = LightweightDeleteDescription::FILTER_COLUMN.type;
system_columns[BlockNumberColumn::name] = BlockNumberColumn::type;
}

View File

@ -71,7 +71,7 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
if (header_it != args.end())
args.erase(header_it);
if (args.empty() || args.size() > 6)
if (args.empty() || args.size() > 7)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature());
for (auto & arg : args)
@ -81,7 +81,7 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
static std::unordered_map<size_t, std::unordered_map<std::string_view, size_t>> size_to_args
{
{1, {{}}},
{6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}, {"compression_method", 5}}}
{7, {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"structure", 5}, {"compression_method", 6}}}
};
std::unordered_map<std::string_view, size_t> args_to_idx;
@ -118,11 +118,12 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}};
}
/// For 4 arguments we support 3 possible variants:
/// For 4 arguments we support 4 possible variants:
/// - s3(source, format, structure, compression_method),
/// - s3(source, access_key_id, access_key_id, format)
/// - s3(source, access_key_id, access_key_id, format),
/// - s3(source, access_key_id, access_key_id, session_token)
/// - s3(source, NOSIGN, format, structure)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
/// We can distinguish them by looking at the 2-nd and 4-th argument: check if it's a format name or not.
else if (args.size() == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id/NOSIGN");
@ -132,14 +133,28 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
args_to_idx = {{"format", 2}, {"structure", 3}};
}
else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
{
args_to_idx = {{"format", 1}, {"structure", 2}, {"compression_method", 3}};
}
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}};
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/session_token");
if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg))
{
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}};
}
else
{
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}};
}
}
}
/// For 5 arguments we support 2 possible variants:
/// For 5 arguments we support 3 possible variants:
/// - s3(source, access_key_id, access_key_id, format, structure)
/// - s3(source, access_key_id, access_key_id, session_token, format)
/// - s3(source, NOSIGN, format, structure, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not.
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or no,
/// and by the 4-th argument, check if it's a format name or not
else if (args.size() == 5)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "NOSIGN/access_key_id");
@ -149,7 +164,33 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
args_to_idx = {{"format", 2}, {"structure", 3}, {"compression_method", 4}};
}
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}};
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/session_token");
if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg))
{
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}};
}
else
{
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}};
}
}
}
// For 6 arguments we support 2 possible variants:
/// - s3(source, access_key_id, access_key_id, format, structure, compression_method)
/// - s3(source, access_key_id, access_key_id, session_token, format, structure)
/// We can distinguish them by looking at the 4-th argument: check if it's a format name or not
else if (args.size() == 6)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/session_token");
if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg))
{
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}, {"compression_method", 5}};
}
else
{
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"structure", 5}};
}
}
else
{
@ -181,6 +222,9 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
if (args_to_idx.contains("secret_access_key"))
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
if (args_to_idx.contains("session_token"))
configuration.auth_settings.session_token = checkAndGetLiteralArgument<String>(args[args_to_idx["session_token"]], "session_token");
configuration.auth_settings.no_sign_request = no_sign_request;
if (configuration.format == "auto")

View File

@ -22,11 +22,15 @@ public:
static constexpr auto signature = " - url\n"
" - url, format\n"
" - url, format, structure\n"
" - url, access_key_id, secret_access_key\n"
" - url, format, structure, compression_method\n"
" - url, access_key_id, secret_access_key\n"
" - url, access_key_id, secret_access_key, session_token\n"
" - url, access_key_id, secret_access_key, format\n"
" - url, access_key_id, secret_access_key, session_token, format\n"
" - url, access_key_id, secret_access_key, format, structure\n"
" - url, access_key_id, secret_access_key, session_token, format, structure\n"
" - url, access_key_id, secret_access_key, format, structure, compression_method\n"
" - url, access_key_id, secret_access_key, session_token, format, structure, compression_method\n"
"All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
static size_t getMaxNumberOfArguments() { return 6; }

View File

@ -35,6 +35,7 @@ public:
" - cluster, url, access_key_id, secret_access_key, format\n"
" - cluster, url, access_key_id, secret_access_key, format, structure\n"
" - cluster, url, access_key_id, secret_access_key, format, structure, compression_method\n"
" - cluster, url, access_key_id, secret_access_key, session_token, format, structure, compression_method\n"
"All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
String getName() const override

View File

@ -78,7 +78,7 @@ def main():
pr_info = PRInfo()
commit = get_commit(gh, pr_info.sha)
atexit.register(update_mergeable_check, gh, pr_info, build_check_name)
atexit.register(update_mergeable_check, commit, pr_info, build_check_name)
rerun_helper = RerunHelper(commit, build_check_name)
if rerun_helper.is_already_finished_by_status():

View File

@ -135,7 +135,7 @@ def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
"--skip-jobs",
action="store_true",
default=False,
help="skip fetching data about job runs, used in --configure action (for debugging)",
help="skip fetching data about job runs, used in --configure action (for debugging and nigthly ci)",
)
parser.add_argument(
"--rebuild-all-docker",
@ -279,11 +279,11 @@ def _configure_docker_jobs(
images_info = docker_images_helper.get_images_info()
# a. check missing images
print("Start checking missing images in dockerhub")
# FIXME: we need login as docker manifest inspect goes directly to one of the *.docker.com hosts instead of "registry-mirrors" : ["http://dockerhub-proxy.dockerhub-proxy-zone:5000"]
# find if it's possible to use the setting of /etc/docker/daemon.json
docker_images_helper.docker_login()
if not rebuild_all_dockers:
# FIXME: we need login as docker manifest inspect goes directly to one of the *.docker.com hosts instead of "registry-mirrors" : ["http://dockerhub-proxy.dockerhub-proxy-zone:5000"]
# find if it's possible to use the setting of /etc/docker/daemon.json
docker_images_helper.docker_login()
print("Start checking missing images in dockerhub")
missing_multi_dict = check_missing_images_on_dockerhub(imagename_digest_dict)
missing_multi = list(missing_multi_dict)
missing_amd64 = []
@ -305,6 +305,15 @@ def _configure_docker_jobs(
"aarch64",
)
)
# FIXME: temporary hack, remove after transition to docker digest as tag
else:
if missing_multi:
print(
f"WARNING: Missing images {list(missing_multi)} - fallback to latest tag"
)
for image in missing_multi:
imagename_digest_dict[image] = "latest"
print("...checking missing images in dockerhub - done")
else:
# add all images to missing
missing_multi = list(imagename_digest_dict)
@ -315,16 +324,7 @@ def _configure_docker_jobs(
for name in imagename_digest_dict
if not images_info[name]["only_amd64"]
]
# FIXME: temporary hack, remove after transition to docker digest as tag
if docker_digest_or_latest:
if missing_multi:
print(
f"WARNING: Missing images {list(missing_multi)} - fallback to latest tag"
)
for image in missing_multi:
imagename_digest_dict[image] = "latest"
print("...checking missing images in dockerhub - done")
return {
"images": imagename_digest_dict,
"missing_aarch64": missing_aarch64,
@ -377,6 +377,7 @@ def _configure_jobs(
for batch in range(num_batches): # type: ignore
batches_to_do.append(batch)
elif job_config.run_always:
# always add to todo
batches_to_do.append(batch)
else:
# this job controlled by digest, add to todo if it's not successfully done before
@ -396,6 +397,21 @@ def _configure_jobs(
else:
jobs_to_skip += (job,)
if pr_labels:
jobs_requested_by_label = [] # type: List[str]
ci_controlling_labels = [] # type: List[str]
for label in pr_labels:
label_config = CI_CONFIG.get_label_config(label)
if label_config:
jobs_requested_by_label += label_config.run_jobs
ci_controlling_labels += [label]
if ci_controlling_labels:
print(f"NOTE: CI controlling labels are set: [{ci_controlling_labels}]")
print(
f" : following jobs will be executed: [{jobs_requested_by_label}]"
)
jobs_to_do = jobs_requested_by_label
if commit_tokens:
requested_jobs = [
token[len("#job_") :]
@ -415,7 +431,7 @@ def _configure_jobs(
if parent in jobs_to_do and parent not in jobs_to_do_requested:
jobs_to_do_requested.append(parent)
print(
f"NOTE: Only specific job(s) were requested: [{jobs_to_do_requested}]"
f"NOTE: Only specific job(s) were requested by commit message tokens: [{jobs_to_do_requested}]"
)
jobs_to_do = jobs_to_do_requested
@ -548,14 +564,14 @@ def main() -> int:
if args.configure:
GR = GitRunner()
pr_info = PRInfo(need_changed_files=True)
pr_info = PRInfo()
docker_data = {}
git_ref = GR.run(f"{GIT_PREFIX} rev-parse HEAD")
# if '#no-merge-commit' is set in commit message - set git ref to PR branch head to avoid merge-commit
tokens = []
if pr_info.number != 0:
if pr_info.number != 0 and not args.skip_jobs:
message = GR.run(f"{GIT_PREFIX} log {pr_info.sha} --format=%B -n 1")
tokens = _fetch_commit_tokens(message)
print(f"Found commit message tokens: [{tokens}]")
@ -607,8 +623,10 @@ def main() -> int:
result["jobs_data"] = jobs_data
result["docker_data"] = docker_data
if pr_info.number != 0 and not args.docker_digest_or_latest:
# FIXME: it runs style check before docker build if possible (style-check images is not changed)
# find a way to do style check always before docker build and others
_check_and_update_for_early_style_check(result)
if pr_info.number != 0 and pr_info.has_changes_in_documentation_only():
if pr_info.has_changes_in_documentation_only():
_update_config_for_docs_only(result)
elif args.update_gh_statuses:
@ -689,7 +707,8 @@ def main() -> int:
elif args.mark_success:
assert indata, "Run config must be provided via --infile"
job = args.job_name
num_batches = CI_CONFIG.get_job_config(job).num_batches
job_config = CI_CONFIG.get_job_config(job)
num_batches = job_config.num_batches
assert (
num_batches <= 1 or 0 <= args.batch < num_batches
), f"--batch must be provided and in range [0, {num_batches}) for {job}"
@ -706,7 +725,7 @@ def main() -> int:
if not CommitStatusData.is_present():
# apparently exit after rerun-helper check
# do nothing, exit without failure
print("ERROR: no status file for job [{job}]")
print(f"ERROR: no status file for job [{job}]")
job_status = CommitStatusData(
status="dummy failure",
description="dummy status",
@ -717,7 +736,9 @@ def main() -> int:
job_status = CommitStatusData.load_status()
# Storing job data (report_url) to restore OK GH status on job results reuse
if job_status.is_ok():
if job_config.run_always:
print(f"Job [{job}] runs always in CI - do not mark as done")
elif job_status.is_ok():
success_flag_name = get_file_flag_name(
job, indata["jobs_data"]["digests"][job], args.batch, num_batches
)

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
from enum import Enum
import logging
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
@ -8,6 +9,10 @@ from pathlib import Path
from typing import Callable, Dict, Iterable, List, Literal, Optional, Union
class Labels(Enum):
DO_NOT_TEST_LABEL = "do not test"
@dataclass
class DigestConfig:
# all files, dirs to include into digest, glob supported
@ -22,6 +27,15 @@ class DigestConfig:
git_submodules: bool = False
@dataclass
class LabelConfig:
"""
class to configure different CI scenarious per GH label
"""
run_jobs: Iterable[str] = frozenset()
@dataclass
class JobConfig:
"""
@ -95,7 +109,7 @@ class TestConfig:
BuildConfigs = Dict[str, BuildConfig]
BuildsReportConfig = Dict[str, BuildReportConfig]
TestConfigs = Dict[str, TestConfig]
LabelConfigs = Dict[str, LabelConfig]
# common digests configs
compatibility_check_digest = DigestConfig(
@ -268,6 +282,13 @@ class CiConfig:
builds_report_config: BuildsReportConfig
test_configs: TestConfigs
other_jobs_configs: TestConfigs
label_configs: LabelConfigs
def get_label_config(self, label_name: str) -> Optional[LabelConfig]:
for label, config in self.label_configs.items():
if label_name == label:
return config
return None
def get_job_config(self, check_name: str) -> JobConfig:
res = None
@ -417,6 +438,9 @@ class CiConfig:
CI_CONFIG = CiConfig(
label_configs={
Labels.DO_NOT_TEST_LABEL.value: LabelConfig(run_jobs=["Style check"]),
},
build_config={
"package_release": BuildConfig(
name="package_release",
@ -847,6 +871,7 @@ CI_CONFIG.validate()
# checks required by Mergeable Check
REQUIRED_CHECKS = [
"PR Check",
"ClickHouse build check",
"ClickHouse special build check",
"Docs Check",

Some files were not shown because too many files have changed in this diff Show More