Merge branch 'master' into s3_request_throttling_on_backups

This commit is contained in:
Daniel Pozo Escalona 2023-08-04 16:57:47 +02:00 committed by GitHub
commit ed43d6f780
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
261 changed files with 2483 additions and 2151 deletions

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
esac
ARG REPOSITORY="https://s3.amazonaws.com/clickhouse-builds/22.4/31c367d3cd3aefd316778601ff6565119fe36682/package_release"
ARG VERSION="23.7.1.2470"
ARG VERSION="23.7.2.25"
ARG PACKAGES="clickhouse-keeper"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.7.1.2470"
ARG VERSION="23.7.2.25"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -23,7 +23,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.7.1.2470"
ARG VERSION="23.7.2.25"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -200,8 +200,8 @@ Templates:
- [Server Setting](_description_templates/template-server-setting.md)
- [Database or Table engine](_description_templates/template-engine.md)
- [System table](_description_templates/template-system-table.md)
- [Data type](_description_templates/data-type.md)
- [Statement](_description_templates/statement.md)
- [Data type](_description_templates/template-data-type.md)
- [Statement](_description_templates/template-statement.md)
<a name="how-to-build-docs"/>

View File

@ -0,0 +1,31 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.7.2.25-stable (8dd1107b032) FIXME as compared to v23.7.1.2470-stable (a70127baecc)
#### Backward Incompatible Change
* Backported in [#52850](https://github.com/ClickHouse/ClickHouse/issues/52850): If a dynamic disk contains a name, it should be specified as `disk = disk(name = 'disk_name'`, ...) in disk function arguments. In previous version it could be specified as `disk = disk_<disk_name>(...)`, which is no longer supported. [#52820](https://github.com/ClickHouse/ClickHouse/pull/52820) ([Kseniia Sumarokova](https://github.com/kssenii)).
#### Build/Testing/Packaging Improvement
* Backported in [#52913](https://github.com/ClickHouse/ClickHouse/issues/52913): Add `clickhouse-keeper-client` symlink to the clickhouse-server package. [#51882](https://github.com/ClickHouse/ClickHouse/pull/51882) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix binary arithmetic for Nullable(IPv4) [#51642](https://github.com/ClickHouse/ClickHouse/pull/51642) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
* Support IPv4 and IPv6 as dictionary attributes [#51756](https://github.com/ClickHouse/ClickHouse/pull/51756) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
* init and destroy ares channel on demand.. [#52634](https://github.com/ClickHouse/ClickHouse/pull/52634) ([Arthur Passos](https://github.com/arthurpassos)).
* Fix crash in function `tuple` with one sparse column argument [#52659](https://github.com/ClickHouse/ClickHouse/pull/52659) ([Anton Popov](https://github.com/CurtizJ)).
* Fix data race in Keeper reconfiguration [#52804](https://github.com/ClickHouse/ClickHouse/pull/52804) ([Antonio Andelic](https://github.com/antonio2368)).
* clickhouse-keeper: fix implementation of server with poll() [#52833](https://github.com/ClickHouse/ClickHouse/pull/52833) ([Andy Fiddaman](https://github.com/citrus-it)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Rename setting disable_url_encoding to enable_url_encoding and add a test [#52656](https://github.com/ClickHouse/ClickHouse/pull/52656) ([Kruglov Pavel](https://github.com/Avogar)).
* Fix bugs and better test for SYSTEM STOP LISTEN [#52680](https://github.com/ClickHouse/ClickHouse/pull/52680) ([Nikolay Degterinsky](https://github.com/evillique)).
* Increase min protocol version for sparse serialization [#52835](https://github.com/ClickHouse/ClickHouse/pull/52835) ([Anton Popov](https://github.com/CurtizJ)).
* Docker improvements [#52869](https://github.com/ClickHouse/ClickHouse/pull/52869) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -141,6 +141,10 @@ Runs [stateful functional tests](tests.md#functional-tests). Treat them in the s
Runs [integration tests](tests.md#integration-tests).
## Bugfix validate check
Checks that either a new test (functional or integration) or there some changed tests that fail with the binary built on master branch. This check is triggered when pull request has "pr-bugfix" label.
## Stress Test
Runs stateless functional tests concurrently from several clients to detect
concurrency-related errors. If it fails:

View File

@ -22,7 +22,7 @@ CREATE TABLE deltalake
- `url` — Bucket url with path to the existing Delta Lake table.
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file.
Engine parameters can be specified using [Named Collections](../../../operations/named-collections.md)
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
**Example**

View File

@ -22,7 +22,7 @@ CREATE TABLE hudi_table
- `url` — Bucket url with the path to an existing Hudi table.
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file.
Engine parameters can be specified using [Named Collections](../../../operations/named-collections.md)
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
**Example**

View File

@ -237,7 +237,7 @@ The following settings can be set before query execution or placed into configur
- `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited).
- `s3_max_get_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_get_rps`.
- `s3_upload_part_size_multiply_factor` - Multiply `s3_min_upload_part_size` by this factor each time `s3_multiply_parts_count_threshold` parts were uploaded from a single write to S3. Default values is `2`.
- `s3_upload_part_size_multiply_parts_count_threshold` - Each time this number of parts was uploaded to S3 `s3_min_upload_part_size multiplied` by `s3_upload_part_size_multiply_factor`. Default value us `500`.
- `s3_upload_part_size_multiply_parts_count_threshold` - Each time this number of parts was uploaded to S3, `s3_min_upload_part_size` is multiplied by `s3_upload_part_size_multiply_factor`. Default value is `500`.
- `s3_max_inflight_parts_for_one_file` - Limits the number of put requests that can be run concurrently for one object. Its number should be limited. The value `0` means unlimited. Default value is `20`. Each in-flight part has a buffer with size `s3_min_upload_part_size` for the first `s3_upload_part_size_multiply_factor` parts and more when file is big enough, see `upload_part_size_multiply_factor`. With default settings one uploaded file consumes not more than `320Mb` for a file which is less than `8G`. The consumption is greater for a larger file.
Security consideration: if malicious user can specify arbitrary S3 URLs, `s3_max_redirects` must be set to zero to avoid [SSRF](https://en.wikipedia.org/wiki/Server-side_request_forgery) attacks; or alternatively, `remote_host_filter` must be specified in server configuration.

View File

@ -2131,7 +2131,6 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
- [output_format_parquet_row_group_size](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_row_group_size) - row group size in rows while data output. Default value - `1000000`.
- [output_format_parquet_string_as_string](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_string_as_string) - use Parquet String type instead of Binary for String columns. Default value - `false`.
- [input_format_parquet_import_nested](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_import_nested) - allow inserting array of structs into [Nested](/docs/en/sql-reference/data-types/nested-data-structures/index.md) table in Parquet input format. Default value - `false`.
- [input_format_parquet_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_case_insensitive_column_matching) - ignore case when matching Parquet columns with ClickHouse columns. Default value - `false`.
- [input_format_parquet_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_allow_missing_columns) - allow missing columns while reading Parquet data. Default value - `false`.
- [input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Parquet format. Default value - `false`.
@ -2336,7 +2335,6 @@ $ clickhouse-client --query="SELECT * FROM {some_table} FORMAT Arrow" > {filenam
- [output_format_arrow_low_cardinality_as_dictionary](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_low_cardinality_as_dictionary) - enable output ClickHouse LowCardinality type as Dictionary Arrow type. Default value - `false`.
- [output_format_arrow_string_as_string](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_string_as_string) - use Arrow String type instead of Binary for String columns. Default value - `false`.
- [input_format_arrow_import_nested](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_import_nested) - allow inserting array of structs into Nested table in Arrow input format. Default value - `false`.
- [input_format_arrow_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_case_insensitive_column_matching) - ignore case when matching Arrow columns with ClickHouse columns. Default value - `false`.
- [input_format_arrow_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_allow_missing_columns) - allow missing columns while reading Arrow data. Default value - `false`.
- [input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Arrow format. Default value - `false`.
@ -2402,7 +2400,6 @@ $ clickhouse-client --query="SELECT * FROM {some_table} FORMAT ORC" > {filename.
- [output_format_arrow_string_as_string](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_string_as_string) - use Arrow String type instead of Binary for String columns. Default value - `false`.
- [output_format_orc_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_orc_compression_method) - compression method used in output ORC format. Default value - `none`.
- [input_format_arrow_import_nested](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_import_nested) - allow inserting array of structs into Nested table in Arrow input format. Default value - `false`.
- [input_format_arrow_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_case_insensitive_column_matching) - ignore case when matching Arrow columns with ClickHouse columns. Default value - `false`.
- [input_format_arrow_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_allow_missing_columns) - allow missing columns while reading Arrow data. Default value - `false`.
- [input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Arrow format. Default value - `false`.

View File

@ -1112,17 +1112,6 @@ Default value: 1.
## Arrow format settings {#arrow-format-settings}
### input_format_arrow_import_nested {#input_format_arrow_import_nested}
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/index.md) columns as an array of structs in [Arrow](../../interfaces/formats.md/#data_types-matching-arrow) input format.
Possible values:
- 0 — Data can not be inserted into `Nested` columns as an array of structs.
- 1 — Data can be inserted into `Nested` columns as an array of structs.
Default value: `0`.
### input_format_arrow_case_insensitive_column_matching {#input_format_arrow_case_insensitive_column_matching}
Ignore case when matching Arrow column names with ClickHouse column names.
@ -1172,17 +1161,6 @@ Default value: `lz4_frame`.
## ORC format settings {#orc-format-settings}
### input_format_orc_import_nested {#input_format_orc_import_nested}
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/index.md) columns as an array of structs in [ORC](../../interfaces/formats.md/#data-format-orc) input format.
Possible values:
- 0 — Data can not be inserted into `Nested` columns as an array of structs.
- 1 — Data can be inserted into `Nested` columns as an array of structs.
Default value: `0`.
### input_format_orc_row_batch_size {#input_format_orc_row_batch_size}
Batch size when reading ORC stripes.
@ -1221,17 +1199,6 @@ Default value: `none`.
## Parquet format settings {#parquet-format-settings}
### input_format_parquet_import_nested {#input_format_parquet_import_nested}
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/index.md) columns as an array of structs in [Parquet](../../interfaces/formats.md/#data-format-parquet) input format.
Possible values:
- 0 — Data can not be inserted into `Nested` columns as an array of structs.
- 1 — Data can be inserted into `Nested` columns as an array of structs.
Default value: `0`.
### input_format_parquet_case_insensitive_column_matching {#input_format_parquet_case_insensitive_column_matching}
Ignore case when matching Parquet column names with ClickHouse column names.

View File

@ -51,7 +51,3 @@ keeper foo bar
- `rmr <path>` -- Recursively deletes path. Confirmation required
- `flwc <command>` -- Executes four-letter-word command
- `help` -- Prints this message
- `get_stat [path]` -- Returns the node's stat (default `.`)
- `find_super_nodes <threshold> [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `delete_stable_backups` -- Deletes ClickHouse nodes used for backups that are now inactive
- `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)

View File

@ -36,6 +36,8 @@ These `ALTER` statements modify entities related to role-based access control:
[ALTER TABLE ... MODIFY COMMENT](/docs/en/sql-reference/statements/alter/comment.md) statement adds, modifies, or removes comments to the table, regardless if it was set before or not.
[ALTER NAMED COLLECTION](/docs/en/sql-reference/statements/alter/named-collection.md) statement modifies [Named Collections](/docs/en/operations/named-collections.md).
## Mutations
`ALTER` queries that are intended to manipulate table data are implemented with a mechanism called “mutations”, most notably [ALTER TABLE … DELETE](/docs/en/sql-reference/statements/alter/delete.md) and [ALTER TABLE … UPDATE](/docs/en/sql-reference/statements/alter/update.md). They are asynchronous background processes similar to merges in [MergeTree](/docs/en/engines/table-engines/mergetree-family/index.md) tables that to produce new “mutated” versions of parts.

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/statements/alter/named-collection
sidebar_label: NAMED COLLECTION
---
# ALTER NAMED COLLECTION
This query intends to modify already existing named collections.
**Syntax**
```sql
ALTER NAMED COLLECTION [IF EXISTS] name [ON CLUSTER cluster]
[ SET
key_name1 = 'some value',
key_name2 = 'some value',
key_name3 = 'some value',
... ] |
[ DELETE key_name4, key_name5, ... ]
```
**Example**
```sql
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
ALTER NAMED COLLECTION foobar SET a = '2', c = '3';
ALTER NAMED COLLECTION foobar DELETE b;
```

View File

@ -8,13 +8,14 @@ sidebar_label: CREATE
Create queries make a new entity of one of the following kinds:
- [DATABASE](../../../sql-reference/statements/create/database.md)
- [TABLE](../../../sql-reference/statements/create/table.md)
- [VIEW](../../../sql-reference/statements/create/view.md)
- [DICTIONARY](../../../sql-reference/statements/create/dictionary.md)
- [FUNCTION](../../../sql-reference/statements/create/function.md)
- [USER](../../../sql-reference/statements/create/user.md)
- [ROLE](../../../sql-reference/statements/create/role.md)
- [ROW POLICY](../../../sql-reference/statements/create/row-policy.md)
- [QUOTA](../../../sql-reference/statements/create/quota.md)
- [SETTINGS PROFILE](../../../sql-reference/statements/create/settings-profile.md)
- [DATABASE](/docs/en/sql-reference/statements/create/database.md)
- [TABLE](/docs/en/sql-reference/statements/create/table.md)
- [VIEW](/docs/en/sql-reference/statements/create/view.md)
- [DICTIONARY](/docs/en/sql-reference/statements/create/dictionary.md)
- [FUNCTION](/docs/en/sql-reference/statements/create/function.md)
- [USER](/docs/en/sql-reference/statements/create/user.md)
- [ROLE](/docs/en/sql-reference/statements/create/role.md)
- [ROW POLICY](/docs/en/sql-reference/statements/create/row-policy.md)
- [QUOTA](/docs/en/sql-reference/statements/create/quota.md)
- [SETTINGS PROFILE](/docs/en/sql-reference/statements/create/settings-profile.md)
- [NAMED COLLECTION](/docs/en/sql-reference/statements/create/named-collection.md)

View File

@ -0,0 +1,34 @@
---
slug: /en/sql-reference/statements/create/named-collection
sidebar_label: NAMED COLLECTION
---
# CREATE NAMED COLLECTION
Creates a new named collection.
**Syntax**
```sql
CREATE NAMED COLLECTION [IF NOT EXISTS] name [ON CLUSTER cluster] AS
key_name1 = 'some value',
key_name2 = 'some value',
key_name3 = 'some value',
...
```
**Example**
```sql
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
```
**Related statements**
- [CREATE NAMED COLLECTION](https://clickhouse.com/docs/en/sql-reference/statements/alter/named-collection)
- [DROP NAMED COLLECTION](https://clickhouse.com/docs/en/sql-reference/statements/drop#drop-function)
**See Also**
- [Named collections guide](/docs/en/operations/named-collections.md)

View File

@ -119,3 +119,20 @@ DROP FUNCTION [IF EXISTS] function_name [on CLUSTER cluster]
CREATE FUNCTION linear_equation AS (x, k, b) -> k*x + b;
DROP FUNCTION linear_equation;
```
## DROP NAMED COLLECTION
Deletes a named collection.
**Syntax**
``` sql
DROP NAMED COLLECTION [IF EXISTS] name [on CLUSTER cluster]
```
**Example**
``` sql
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
DROP NAMED COLLECTION foobar;
```

View File

@ -314,6 +314,22 @@ Provides possibility to start background fetch tasks from replication queues whi
SYSTEM START REPLICATION QUEUES [ON CLUSTER cluster_name] [[db.]replicated_merge_tree_family_table_name]
```
### STOP PULLING REPLICATION LOG
Stops loading new entries from replication log to replication queue in a `ReplicatedMergeTree` table.
``` sql
SYSTEM STOP PULLING REPLICATION LOG [ON CLUSTER cluster_name] [[db.]replicated_merge_tree_family_table_name]
```
### START PULLING REPLICATION LOG
Cancels `SYSTEM STOP PULLING REPLICATION LOG`.
``` sql
SYSTEM START PULLING REPLICATION LOG [ON CLUSTER cluster_name] [[db.]replicated_merge_tree_family_table_name]
```
### SYNC REPLICA
Wait until a `ReplicatedMergeTree` table will be synced with other replicas in a cluster, but no more than `receive_timeout` seconds.

View File

@ -21,7 +21,7 @@ iceberg(url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure])
- `format` — The [format](/docs/en/interfaces/formats.md/#formats) of the file. By default `Parquet` is used.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
Engine parameters can be specified using [Named Collections](../../operations/named-collections.md)
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
**Returned value**

View File

@ -1353,8 +1353,6 @@ ClickHouse поддерживает настраиваемую точность
$ cat {filename} | clickhouse-client --query="INSERT INTO {some_table} FORMAT Parquet"
```
Чтобы вставить данные в колонки типа [Nested](../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур, нужно включить настройку [input_format_parquet_import_nested](../operations/settings/settings.md#input_format_parquet_import_nested).
Чтобы получить данные из таблицы ClickHouse и сохранить их в файл формата Parquet, используйте команду следующего вида:
``` bash
@ -1413,8 +1411,6 @@ ClickHouse поддерживает настраиваемую точность
$ cat filename.arrow | clickhouse-client --query="INSERT INTO some_table FORMAT Arrow"
```
Чтобы вставить данные в колонки типа [Nested](../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур, нужно включить настройку [input_format_arrow_import_nested](../operations/settings/settings.md#input_format_arrow_import_nested).
### Вывод данных {#selecting-data-arrow}
Чтобы получить данные из таблицы ClickHouse и сохранить их в файл формата Arrow, используйте команду следующего вида:
@ -1471,8 +1467,6 @@ ClickHouse поддерживает настраиваемую точность
$ cat filename.orc | clickhouse-client --query="INSERT INTO some_table FORMAT ORC"
```
Чтобы вставить данные в колонки типа [Nested](../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур, нужно включить настройку [input_format_orc_import_nested](../operations/settings/settings.md#input_format_orc_import_nested).
### Вывод данных {#selecting-data-2}
Чтобы получить данные из таблицы ClickHouse и сохранить их в файл формата ORC, используйте команду следующего вида:

View File

@ -238,39 +238,6 @@ ClickHouse применяет настройку в тех случаях, ко
В случае превышения `input_format_allow_errors_ratio` ClickHouse генерирует исключение.
## input_format_parquet_import_nested {#input_format_parquet_import_nested}
Включает или отключает возможность вставки данных в колонки типа [Nested](../../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур в формате ввода [Parquet](../../interfaces/formats.md#data-format-parquet).
Возможные значения:
- 0 — данные не могут быть вставлены в колонки типа `Nested` в виде массива структур.
- 0 — данные могут быть вставлены в колонки типа `Nested` в виде массива структур.
Значение по умолчанию: `0`.
## input_format_arrow_import_nested {#input_format_arrow_import_nested}
Включает или отключает возможность вставки данных в колонки типа [Nested](../../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур в формате ввода [Arrow](../../interfaces/formats.md#data_types-matching-arrow).
Возможные значения:
- 0 — данные не могут быть вставлены в колонки типа `Nested` в виде массива структур.
- 0 — данные могут быть вставлены в колонки типа `Nested` в виде массива структур.
Значение по умолчанию: `0`.
## input_format_orc_import_nested {#input_format_orc_import_nested}
Включает или отключает возможность вставки данных в колонки типа [Nested](../../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур в формате ввода [ORC](../../interfaces/formats.md#data-format-orc).
Возможные значения:
- 0 — данные не могут быть вставлены в колонки типа `Nested` в виде массива структур.
- 0 — данные могут быть вставлены в колонки типа `Nested` в виде массива структур.
Значение по умолчанию: `0`.
## input_format_values_interpret_expressions {#settings-input_format_values_interpret_expressions}
Включает или отключает парсер SQL, если потоковый парсер не может проанализировать данные. Этот параметр используется только для формата [Values](../../interfaces/formats.md#data-format-values) при вставке данных. Дополнительные сведения о парсерах читайте в разделе [Синтаксис](../../sql-reference/syntax.md).

View File

@ -1,6 +1,5 @@
#include "Commands.h"
#include <queue>
#include "KeeperClient.h"
@ -25,18 +24,8 @@ void LSCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
else
path = client->cwd;
auto children = client->zookeeper->getChildren(path);
std::sort(children.begin(), children.end());
bool need_space = false;
for (const auto & child : children)
{
if (std::exchange(need_space, true))
std::cout << " ";
std::cout << child;
}
for (const auto & child : client->zookeeper->getChildren(path))
std::cout << child << " ";
std::cout << "\n";
}
@ -88,7 +77,7 @@ void SetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
client->zookeeper->set(
client->getAbsolutePath(query->args[0].safeGet<String>()),
query->args[1].safeGet<String>(),
static_cast<Int32>(query->args[2].safeGet<Int64>()));
static_cast<Int32>(query->args[2].get<Int32>()));
}
bool CreateCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
@ -141,173 +130,6 @@ void GetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
std::cout << client->zookeeper->get(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool GetStatCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
return true;
node->args.push_back(std::move(arg));
return true;
}
void GetStatCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
Coordination::Stat stat;
String path;
if (!query->args.empty())
path = client->getAbsolutePath(query->args[0].safeGet<String>());
else
path = client->cwd;
client->zookeeper->get(path, &stat);
std::cout << "cZxid = " << stat.czxid << "\n";
std::cout << "mZxid = " << stat.mzxid << "\n";
std::cout << "pZxid = " << stat.pzxid << "\n";
std::cout << "ctime = " << stat.ctime << "\n";
std::cout << "mtime = " << stat.mtime << "\n";
std::cout << "version = " << stat.version << "\n";
std::cout << "cversion = " << stat.cversion << "\n";
std::cout << "aversion = " << stat.aversion << "\n";
std::cout << "ephemeralOwner = " << stat.ephemeralOwner << "\n";
std::cout << "dataLength = " << stat.dataLength << "\n";
std::cout << "numChildren = " << stat.numChildren << "\n";
}
bool FindSuperNodes::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
ASTPtr threshold;
if (!ParserUnsignedInteger{}.parse(pos, threshold, expected))
return false;
node->args.push_back(threshold->as<ASTLiteral &>().value);
String path;
if (!parseKeeperPath(pos, expected, path))
path = ".";
node->args.push_back(std::move(path));
return true;
}
void FindSuperNodes::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
auto threshold = query->args[0].safeGet<UInt64>();
auto path = client->getAbsolutePath(query->args[1].safeGet<String>());
Coordination::Stat stat;
client->zookeeper->get(path, &stat);
if (stat.numChildren >= static_cast<Int32>(threshold))
{
std::cout << static_cast<String>(path) << "\t" << stat.numChildren << "\n";
return;
}
auto children = client->zookeeper->getChildren(path);
std::sort(children.begin(), children.end());
for (const auto & child : children)
{
auto next_query = *query;
next_query.args[1] = DB::Field(path / child);
execute(&next_query, client);
}
}
bool DeleteStableBackups::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery> & /* node */, Expected & /* expected */) const
{
return true;
}
void DeleteStableBackups::execute(const ASTKeeperQuery * /* query */, KeeperClient * client) const
{
client->askConfirmation(
"You are going to delete all inactive backups in /clickhouse/backups.",
[client]
{
fs::path backup_root = "/clickhouse/backups";
auto backups = client->zookeeper->getChildren(backup_root);
std::sort(backups.begin(), backups.end());
for (const auto & child : backups)
{
auto backup_path = backup_root / child;
std::cout << "Found backup " << backup_path << ", checking if it's active\n";
String stage_path = backup_path / "stage";
auto stages = client->zookeeper->getChildren(stage_path);
bool is_active = false;
for (const auto & stage : stages)
{
if (startsWith(stage, "alive"))
{
is_active = true;
break;
}
}
if (is_active)
{
std::cout << "Backup " << backup_path << " is active, not going to delete\n";
continue;
}
std::cout << "Backup " << backup_path << " is not active, deleting it\n";
client->zookeeper->removeRecursive(backup_path);
}
});
}
bool FindBigFamily::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
path = ".";
node->args.push_back(std::move(path));
ASTPtr count;
if (ParserUnsignedInteger{}.parse(pos, count, expected))
node->args.push_back(count->as<ASTLiteral &>().value);
else
node->args.push_back(UInt64(10));
return true;
}
void FindBigFamily::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
auto path = client->getAbsolutePath(query->args[0].safeGet<String>());
auto n = query->args[1].safeGet<UInt64>();
std::vector<std::tuple<Int32, String>> result;
std::queue<fs::path> queue;
queue.push(path);
while (!queue.empty())
{
auto next_path = queue.front();
queue.pop();
auto children = client->zookeeper->getChildren(next_path);
std::transform(children.cbegin(), children.cend(), children.begin(), [&](const String & child) { return next_path / child; });
auto response = client->zookeeper->get(children);
for (size_t i = 0; i < response.size(); ++i)
{
result.emplace_back(response[i].stat.numChildren, children[i]);
queue.push(children[i]);
}
}
std::sort(result.begin(), result.end(), std::greater());
for (UInt64 i = 0; i < std::min(result.size(), static_cast<size_t>(n)); ++i)
std::cout << std::get<1>(result[i]) << "\t" << std::get<0>(result[i]) << "\n";
}
bool RMCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
@ -348,7 +170,7 @@ bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery
void HelpCommand::execute(const ASTKeeperQuery * /* query */, KeeperClient * /* client */) const
{
for (const auto & pair : KeeperClient::commands)
std::cout << pair.second->generateHelpString() << "\n";
std::cout << pair.second->getHelpMessage() << "\n";
}
bool FourLetterWordCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const

View File

@ -21,12 +21,6 @@ public:
virtual String getName() const = 0;
virtual ~IKeeperClientCommand() = default;
String generateHelpString() const
{
return fmt::vformat(getHelpMessage(), fmt::make_format_args(getName()));
}
};
using Command = std::shared_ptr<IKeeperClientCommand>;
@ -40,7 +34,7 @@ class LSCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} [path] -- Lists the nodes for the given path (default: cwd)"; }
String getHelpMessage() const override { return "ls [path] -- Lists the nodes for the given path (default: cwd)"; }
};
class CDCommand : public IKeeperClientCommand
@ -51,7 +45,7 @@ class CDCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} [path] -- Change the working path (default `.`)"; }
String getHelpMessage() const override { return "cd [path] -- Change the working path (default `.`)"; }
};
class SetCommand : public IKeeperClientCommand
@ -64,7 +58,7 @@ class SetCommand : public IKeeperClientCommand
String getHelpMessage() const override
{
return "{} <path> <value> [version] -- Updates the node's value. Only update if version matches (default: -1)";
return "set <path> <value> [version] -- Updates the node's value. Only update if version matches (default: -1)";
}
};
@ -76,7 +70,7 @@ class CreateCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> <value> -- Creates new node"; }
String getHelpMessage() const override { return "create <path> <value> -- Creates new node"; }
};
class GetCommand : public IKeeperClientCommand
@ -87,63 +81,9 @@ class GetCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Returns the node's value"; }
String getHelpMessage() const override { return "get <path> -- Returns the node's value"; }
};
class GetStatCommand : public IKeeperClientCommand
{
String getName() const override { return "get_stat"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} [path] -- Returns the node's stat (default `.`)"; }
};
class FindSuperNodes : public IKeeperClientCommand
{
String getName() const override { return "find_super_nodes"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} <threshold> [path] -- Finds nodes with number of children larger than some threshold for the given path (default `.`)";
}
};
class DeleteStableBackups : public IKeeperClientCommand
{
String getName() const override { return "delete_stable_backups"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} -- Deletes ClickHouse nodes used for backups that are now inactive";
}
};
class FindBigFamily : public IKeeperClientCommand
{
String getName() const override { return "find_big_family"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} [path] [n] -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)";
}
};
class RMCommand : public IKeeperClientCommand
{
String getName() const override { return "rm"; }
@ -152,7 +92,7 @@ class RMCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Remove the node"; }
String getHelpMessage() const override { return "remove <path> -- Remove the node"; }
};
class RMRCommand : public IKeeperClientCommand
@ -163,7 +103,7 @@ class RMRCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Recursively deletes path. Confirmation required"; }
String getHelpMessage() const override { return "rmr <path> -- Recursively deletes path. Confirmation required"; }
};
class HelpCommand : public IKeeperClientCommand
@ -174,7 +114,7 @@ class HelpCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} -- Prints this message"; }
String getHelpMessage() const override { return "help -- Prints this message"; }
};
class FourLetterWordCommand : public IKeeperClientCommand
@ -185,7 +125,7 @@ class FourLetterWordCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <command> -- Executes four-letter-word command"; }
String getHelpMessage() const override { return "flwc <command> -- Executes four-letter-word command"; }
};
}

View File

@ -177,10 +177,6 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
std::make_shared<SetCommand>(),
std::make_shared<CreateCommand>(),
std::make_shared<GetCommand>(),
std::make_shared<GetStatCommand>(),
std::make_shared<FindSuperNodes>(),
std::make_shared<DeleteStableBackups>(),
std::make_shared<FindBigFamily>(),
std::make_shared<RMCommand>(),
std::make_shared<RMRCommand>(),
std::make_shared<HelpCommand>(),

View File

@ -58,7 +58,6 @@ bool KeeperParser::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
String command_name(pos->begin, pos->end);
std::transform(command_name.begin(), command_name.end(), command_name.begin(), [](unsigned char c) { return std::tolower(c); });
Command command;
auto iter = KeeperClient::commands.find(command_name);

View File

@ -1650,6 +1650,7 @@ try
database_catalog.initializeAndLoadTemporaryDatabase();
loadMetadataSystem(global_context);
maybeConvertSystemDatabase(global_context);
startupSystemTables();
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
@ -1668,7 +1669,6 @@ try
/// Then, load remaining databases
loadMetadata(global_context, default_database);
convertDatabasesEnginesIfNeed(global_context);
startupSystemTables();
database_catalog.startupBackgroundCleanup();
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);

View File

@ -6,6 +6,7 @@
#include <Access/DiskAccessStorage.h>
#include <Access/LDAPAccessStorage.h>
#include <Access/ContextAccess.h>
#include <Access/EnabledSettings.h>
#include <Access/EnabledRolesInfo.h>
#include <Access/RoleCache.h>
#include <Access/RowPolicyCache.h>
@ -729,6 +730,14 @@ std::shared_ptr<const EnabledRoles> AccessControl::getEnabledRoles(
}
std::shared_ptr<const EnabledRolesInfo> AccessControl::getEnabledRolesInfo(
const std::vector<UUID> & current_roles,
const std::vector<UUID> & current_roles_with_admin_option) const
{
return getEnabledRoles(current_roles, current_roles_with_admin_option)->getRolesInfo();
}
std::shared_ptr<const EnabledRowPolicies> AccessControl::getEnabledRowPolicies(const UUID & user_id, const boost::container::flat_set<UUID> & enabled_roles) const
{
return row_policy_cache->getEnabledRowPolicies(user_id, enabled_roles);
@ -772,6 +781,15 @@ std::shared_ptr<const EnabledSettings> AccessControl::getEnabledSettings(
return settings_profiles_cache->getEnabledSettings(user_id, settings_from_user, enabled_roles, settings_from_enabled_roles);
}
std::shared_ptr<const SettingsProfilesInfo> AccessControl::getEnabledSettingsInfo(
const UUID & user_id,
const SettingsProfileElements & settings_from_user,
const boost::container::flat_set<UUID> & enabled_roles,
const SettingsProfileElements & settings_from_enabled_roles) const
{
return getEnabledSettings(user_id, settings_from_user, enabled_roles, settings_from_enabled_roles)->getInfo();
}
std::shared_ptr<const SettingsProfilesInfo> AccessControl::getSettingsProfileInfo(const UUID & profile_id)
{
return settings_profiles_cache->getSettingsProfileInfo(profile_id);

View File

@ -29,6 +29,7 @@ class ContextAccessParams;
struct User;
using UserPtr = std::shared_ptr<const User>;
class EnabledRoles;
struct EnabledRolesInfo;
class RoleCache;
class EnabledRowPolicies;
class RowPolicyCache;
@ -187,6 +188,10 @@ public:
const std::vector<UUID> & current_roles,
const std::vector<UUID> & current_roles_with_admin_option) const;
std::shared_ptr<const EnabledRolesInfo> getEnabledRolesInfo(
const std::vector<UUID> & current_roles,
const std::vector<UUID> & current_roles_with_admin_option) const;
std::shared_ptr<const EnabledRowPolicies> getEnabledRowPolicies(
const UUID & user_id,
const boost::container::flat_set<UUID> & enabled_roles) const;
@ -209,6 +214,12 @@ public:
const boost::container::flat_set<UUID> & enabled_roles,
const SettingsProfileElements & settings_from_enabled_roles) const;
std::shared_ptr<const SettingsProfilesInfo> getEnabledSettingsInfo(
const UUID & user_id,
const SettingsProfileElements & settings_from_user,
const boost::container::flat_set<UUID> & enabled_roles,
const SettingsProfileElements & settings_from_enabled_roles) const;
std::shared_ptr<const SettingsProfilesInfo> getSettingsProfileInfo(const UUID & profile_id);
const ExternalAuthenticators & getExternalAuthenticators() const;

View File

@ -168,6 +168,7 @@ enum class AccessType
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
M(SYSTEM_DISTRIBUTED_SENDS, "SYSTEM STOP DISTRIBUTED SENDS, SYSTEM START DISTRIBUTED SENDS, STOP DISTRIBUTED SENDS, START DISTRIBUTED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_REPLICATED_SENDS, "SYSTEM STOP REPLICATED SENDS, SYSTEM START REPLICATED SENDS, STOP REPLICATED SENDS, START REPLICATED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_SENDS, "SYSTEM STOP SENDS, SYSTEM START SENDS, STOP SENDS, START SENDS", GROUP, SYSTEM) \

View File

@ -51,7 +51,7 @@ TEST(AccessRights, Union)
"CREATE DICTIONARY, DROP DATABASE, DROP TABLE, DROP VIEW, DROP DICTIONARY, UNDROP TABLE, "
"TRUNCATE, OPTIMIZE, BACKUP, CREATE ROW POLICY, ALTER ROW POLICY, DROP ROW POLICY, "
"SHOW ROW POLICIES, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, "
"SYSTEM MOVES, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, "
"SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, "
"SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, "
"SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM SYNC DATABASE REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON db1.*, GRANT NAMED COLLECTION ADMIN ON db1");
}

View File

@ -6887,13 +6887,12 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
scope.scope_node->formatASTForErrorMessage());
}
std::erase_if(with_nodes, [](const QueryTreeNodePtr & node)
{
auto * subquery_node = node->as<QueryNode>();
auto * union_node = node->as<UnionNode>();
return (subquery_node && subquery_node->isCTE()) || (union_node && union_node->isCTE());
});
/** WITH section can be safely removed, because WITH section only can provide aliases to query expressions
* and CTE for other sections to use.
*
* Example: WITH 1 AS constant, (x -> x + 1) AS lambda, a AS (SELECT * FROM test_table);
*/
query_node_typed.getWith().getNodes().clear();
for (auto & window_node : query_node_typed.getWindow().getNodes())
{
@ -6952,9 +6951,6 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
scope.scope_node->formatASTForErrorMessage());
}
if (query_node_typed.hasWith())
resolveExpressionNodeList(query_node_typed.getWithNode(), scope, true /*allow_lambda_expression*/, false /*allow_table_expression*/);
if (query_node_typed.getPrewhere())
resolveExpressionNode(query_node_typed.getPrewhere(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
@ -7123,13 +7119,6 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
scope.scope_node->formatASTForErrorMessage());
}
/** WITH section can be safely removed, because WITH section only can provide aliases to query expressions
* and CTE for other sections to use.
*
* Example: WITH 1 AS constant, (x -> x + 1) AS lambda, a AS (SELECT * FROM test_table);
*/
query_node_typed.getWith().getNodes().clear();
/** WINDOW section can be safely removed, because WINDOW section can only provide window definition to window functions.
*
* Example: SELECT count(*) OVER w FROM test_table WINDOW w AS (PARTITION BY id);

View File

@ -77,10 +77,12 @@ BackupEntriesCollector::BackupEntriesCollector(
const ASTBackupQuery::Elements & backup_query_elements_,
const BackupSettings & backup_settings_,
std::shared_ptr<IBackupCoordination> backup_coordination_,
const ReadSettings & read_settings_,
const ContextPtr & context_)
: backup_query_elements(backup_query_elements_)
, backup_settings(backup_settings_)
, backup_coordination(backup_coordination_)
, read_settings(read_settings_)
, context(context_)
, on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000))
, consistent_metadata_snapshot_timeout(context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 600000))

View File

@ -30,6 +30,7 @@ public:
BackupEntriesCollector(const ASTBackupQuery::Elements & backup_query_elements_,
const BackupSettings & backup_settings_,
std::shared_ptr<IBackupCoordination> backup_coordination_,
const ReadSettings & read_settings_,
const ContextPtr & context_);
~BackupEntriesCollector();
@ -40,6 +41,7 @@ public:
const BackupSettings & getBackupSettings() const { return backup_settings; }
std::shared_ptr<IBackupCoordination> getBackupCoordination() const { return backup_coordination; }
const ReadSettings & getReadSettings() const { return read_settings; }
ContextPtr getContext() const { return context; }
/// Adds a backup entry which will be later returned by run().
@ -93,6 +95,7 @@ private:
const ASTBackupQuery::Elements backup_query_elements;
const BackupSettings backup_settings;
std::shared_ptr<IBackupCoordination> backup_coordination;
const ReadSettings read_settings;
ContextPtr context;
std::chrono::milliseconds on_cluster_first_sync_timeout;
std::chrono::milliseconds consistent_metadata_snapshot_timeout;

View File

@ -57,7 +57,7 @@ UInt64 BackupEntryFromImmutableFile::getSize() const
return *file_size;
}
UInt128 BackupEntryFromImmutableFile::getChecksum() const
UInt128 BackupEntryFromImmutableFile::getChecksum(const ReadSettings & read_settings) const
{
{
std::lock_guard lock{size_and_checksum_mutex};
@ -73,7 +73,7 @@ UInt128 BackupEntryFromImmutableFile::getChecksum() const
}
}
auto calculated_checksum = BackupEntryWithChecksumCalculation<IBackupEntry>::getChecksum();
auto calculated_checksum = BackupEntryWithChecksumCalculation<IBackupEntry>::getChecksum(read_settings);
{
std::lock_guard lock{size_and_checksum_mutex};
@ -86,13 +86,13 @@ UInt128 BackupEntryFromImmutableFile::getChecksum() const
}
}
std::optional<UInt128> BackupEntryFromImmutableFile::getPartialChecksum(size_t prefix_length) const
std::optional<UInt128> BackupEntryFromImmutableFile::getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const
{
if (prefix_length == 0)
return 0;
if (prefix_length >= getSize())
return getChecksum();
return getChecksum(read_settings);
/// For immutable files we don't use partial checksums.
return std::nullopt;

View File

@ -27,8 +27,8 @@ public:
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const override;
UInt64 getSize() const override;
UInt128 getChecksum() const override;
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override;
UInt128 getChecksum(const ReadSettings & read_settings) const override;
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override;
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
bool isEncryptedByDisk() const override { return copy_encrypted; }

View File

@ -11,17 +11,17 @@ namespace DB
{
namespace
{
String readFile(const String & file_path)
String readFile(const String & file_path, const ReadSettings & read_settings)
{
auto buf = createReadBufferFromFileBase(file_path, /* settings= */ {});
auto buf = createReadBufferFromFileBase(file_path, read_settings);
String s;
readStringUntilEOF(s, *buf);
return s;
}
String readFile(const DiskPtr & disk, const String & file_path, bool copy_encrypted)
String readFile(const DiskPtr & disk, const String & file_path, const ReadSettings & read_settings, bool copy_encrypted)
{
auto buf = copy_encrypted ? disk->readEncryptedFile(file_path, {}) : disk->readFile(file_path);
auto buf = copy_encrypted ? disk->readEncryptedFile(file_path, read_settings) : disk->readFile(file_path, read_settings);
String s;
readStringUntilEOF(s, *buf);
return s;
@ -29,19 +29,19 @@ namespace
}
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const String & file_path_)
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const String & file_path_, const ReadSettings & read_settings_)
: file_path(file_path_)
, data_source_description(DiskLocal::getLocalDataSourceDescription(file_path_))
, data(readFile(file_path_))
, data(readFile(file_path_, read_settings_))
{
}
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, bool copy_encrypted_)
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, const ReadSettings & read_settings_, bool copy_encrypted_)
: disk(disk_)
, file_path(file_path_)
, data_source_description(disk_->getDataSourceDescription())
, copy_encrypted(copy_encrypted_ && data_source_description.is_encrypted)
, data(readFile(disk_, file_path, copy_encrypted))
, data(readFile(disk_, file_path, read_settings_, copy_encrypted))
{
}

View File

@ -13,8 +13,8 @@ using DiskPtr = std::shared_ptr<IDisk>;
class BackupEntryFromSmallFile : public BackupEntryWithChecksumCalculation<IBackupEntry>
{
public:
explicit BackupEntryFromSmallFile(const String & file_path_);
BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, bool copy_encrypted_ = false);
explicit BackupEntryFromSmallFile(const String & file_path_, const ReadSettings & read_settings_);
BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, const ReadSettings & read_settings_, bool copy_encrypted_ = false);
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings &) const override;
UInt64 getSize() const override { return data.size(); }

View File

@ -6,7 +6,7 @@ namespace DB
{
template <typename Base>
UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum() const
UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum(const ReadSettings & read_settings) const
{
{
std::lock_guard lock{checksum_calculation_mutex};
@ -26,7 +26,7 @@ UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum() const
}
else
{
auto read_buffer = this->getReadBuffer(ReadSettings{}.adjustBufferSize(size));
auto read_buffer = this->getReadBuffer(read_settings.adjustBufferSize(size));
HashingReadBuffer hashing_read_buffer(*read_buffer);
hashing_read_buffer.ignoreAll();
calculated_checksum = hashing_read_buffer.getHash();
@ -37,23 +37,20 @@ UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum() const
}
template <typename Base>
std::optional<UInt128> BackupEntryWithChecksumCalculation<Base>::getPartialChecksum(size_t prefix_length) const
std::optional<UInt128> BackupEntryWithChecksumCalculation<Base>::getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const
{
if (prefix_length == 0)
return 0;
size_t size = this->getSize();
if (prefix_length >= size)
return this->getChecksum();
return this->getChecksum(read_settings);
std::lock_guard lock{checksum_calculation_mutex};
ReadSettings read_settings;
if (calculated_checksum)
read_settings.adjustBufferSize(calculated_checksum ? prefix_length : size);
auto read_buffer = this->getReadBuffer(read_settings);
auto read_buffer = this->getReadBuffer(read_settings.adjustBufferSize(calculated_checksum ? prefix_length : size));
HashingReadBuffer hashing_read_buffer(*read_buffer);
hashing_read_buffer.ignore(prefix_length);
auto partial_checksum = hashing_read_buffer.getHash();

View File

@ -11,8 +11,8 @@ template <typename Base>
class BackupEntryWithChecksumCalculation : public Base
{
public:
UInt128 getChecksum() const override;
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override;
UInt128 getChecksum(const ReadSettings & read_settings) const override;
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override;
private:
mutable std::optional<UInt128> calculated_checksum;

View File

@ -17,8 +17,8 @@ public:
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const override { return entry->getReadBuffer(read_settings); }
UInt64 getSize() const override { return entry->getSize(); }
UInt128 getChecksum() const override { return entry->getChecksum(); }
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override { return entry->getPartialChecksum(prefix_length); }
UInt128 getChecksum(const ReadSettings & read_settings) const override { return entry->getChecksum(read_settings); }
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override { return entry->getPartialChecksum(prefix_length, read_settings); }
DataSourceDescription getDataSourceDescription() const override { return entry->getDataSourceDescription(); }
bool isEncryptedByDisk() const override { return entry->isEncryptedByDisk(); }
bool isFromFile() const override { return entry->isFromFile(); }

View File

@ -3,6 +3,8 @@
#include <Backups/IBackup.h>
#include <Backups/BackupInfo.h>
#include <Core/Types.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
#include <Parsers/IAST_fwd.h>
#include <boost/noncopyable.hpp>
#include <memory>
@ -37,6 +39,8 @@ public:
std::optional<UUID> backup_uuid;
bool deduplicate_files = true;
bool allow_s3_native_copy = true;
ReadSettings read_settings;
WriteSettings write_settings;
};
static BackupFactory & instance();

View File

@ -57,12 +57,12 @@ namespace
/// Calculate checksum for backup entry if it's empty.
/// Also able to calculate additional checksum of some prefix.
ChecksumsForNewEntry calculateNewEntryChecksumsIfNeeded(const BackupEntryPtr & entry, size_t prefix_size)
ChecksumsForNewEntry calculateNewEntryChecksumsIfNeeded(const BackupEntryPtr & entry, size_t prefix_size, const ReadSettings & read_settings)
{
ChecksumsForNewEntry res;
/// The partial checksum should be calculated before the full checksum to enable optimization in BackupEntryWithChecksumCalculation.
res.prefix_checksum = entry->getPartialChecksum(prefix_size);
res.full_checksum = entry->getChecksum();
res.prefix_checksum = entry->getPartialChecksum(prefix_size, read_settings);
res.full_checksum = entry->getChecksum(read_settings);
return res;
}
@ -93,7 +93,12 @@ String BackupFileInfo::describe() const
}
BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const BackupEntryPtr & backup_entry, const BackupPtr & base_backup, Poco::Logger * log)
BackupFileInfo buildFileInfoForBackupEntry(
const String & file_name,
const BackupEntryPtr & backup_entry,
const BackupPtr & base_backup,
const ReadSettings & read_settings,
Poco::Logger * log)
{
auto adjusted_path = removeLeadingSlash(file_name);
@ -126,7 +131,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
/// File with the same name but smaller size exist in previous backup
if (check_base == CheckBackupResult::HasPrefix)
{
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, base_backup_file_info->first);
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, base_backup_file_info->first, read_settings);
info.checksum = checksums.full_checksum;
/// We have prefix of this file in backup with the same checksum.
@ -146,7 +151,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
{
/// We have full file or have nothing, first of all let's get checksum
/// of current file
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0);
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0, read_settings);
info.checksum = checksums.full_checksum;
if (info.checksum == base_backup_file_info->second)
@ -169,7 +174,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
}
else
{
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0);
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0, read_settings);
info.checksum = checksums.full_checksum;
}
@ -188,7 +193,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
return info;
}
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, ThreadPool & thread_pool)
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, const ReadSettings & read_settings, ThreadPool & thread_pool)
{
BackupFileInfos infos;
infos.resize(backup_entries.size());
@ -210,7 +215,7 @@ BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entr
++num_active_jobs;
}
auto job = [&mutex, &num_active_jobs, &event, &exception, &infos, &backup_entries, &base_backup, &thread_group, i, log](bool async)
auto job = [&mutex, &num_active_jobs, &event, &exception, &infos, &backup_entries, &read_settings, &base_backup, &thread_group, i, log](bool async)
{
SCOPE_EXIT_SAFE({
std::lock_guard lock{mutex};
@ -237,7 +242,7 @@ BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entr
return;
}
infos[i] = buildFileInfoForBackupEntry(name, entry, base_backup, log);
infos[i] = buildFileInfoForBackupEntry(name, entry, base_backup, read_settings, log);
}
catch (...)
{

View File

@ -13,6 +13,7 @@ class IBackupEntry;
using BackupPtr = std::shared_ptr<const IBackup>;
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;
using BackupEntries = std::vector<std::pair<String, BackupEntryPtr>>;
struct ReadSettings;
/// Information about a file stored in a backup.
@ -66,9 +67,9 @@ struct BackupFileInfo
using BackupFileInfos = std::vector<BackupFileInfo>;
/// Builds a BackupFileInfo for a specified backup entry.
BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const BackupEntryPtr & backup_entry, const BackupPtr & base_backup, Poco::Logger * log);
BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const BackupEntryPtr & backup_entry, const BackupPtr & base_backup, const ReadSettings & read_settings, Poco::Logger * log);
/// Builds a vector of BackupFileInfos for specified backup entries.
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, ThreadPool & thread_pool);
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, const ReadSettings & read_settings, ThreadPool & thread_pool);
}

View File

@ -4,17 +4,16 @@
#include <IO/copyData.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
namespace DB
{
BackupReaderDefault::BackupReaderDefault(Poco::Logger * log_, const ContextPtr & context_)
BackupReaderDefault::BackupReaderDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_)
: log(log_)
, read_settings(context_->getBackupReadSettings())
, write_settings(context_->getWriteSettings())
, read_settings(read_settings_)
, write_settings(write_settings_)
, write_buffer_size(DBMS_DEFAULT_BUFFER_SIZE)
{
}
@ -37,10 +36,10 @@ void BackupReaderDefault::copyFileToDisk(const String & path_in_backup, size_t f
write_buffer->finalize();
}
BackupWriterDefault::BackupWriterDefault(Poco::Logger * log_, const ContextPtr & context_)
BackupWriterDefault::BackupWriterDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_)
: log(log_)
, read_settings(context_->getBackupReadSettings())
, write_settings(context_->getWriteSettings())
, read_settings(read_settings_)
, write_settings(write_settings_)
, write_buffer_size(DBMS_DEFAULT_BUFFER_SIZE)
{
}

View File

@ -3,7 +3,6 @@
#include <Backups/BackupIO.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
#include <Interpreters/Context_fwd.h>
namespace DB
@ -19,7 +18,7 @@ enum class WriteMode;
class BackupReaderDefault : public IBackupReader
{
public:
BackupReaderDefault(Poco::Logger * log_, const ContextPtr & context_);
BackupReaderDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_);
~BackupReaderDefault() override = default;
/// The function copyFileToDisk() can be much faster than reading the file with readFile() and then writing it to some disk.
@ -46,7 +45,7 @@ protected:
class BackupWriterDefault : public IBackupWriter
{
public:
BackupWriterDefault(Poco::Logger * log_, const ContextPtr & context_);
BackupWriterDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_);
~BackupWriterDefault() override = default;
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;

View File

@ -8,8 +8,8 @@
namespace DB
{
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_)
: BackupReaderDefault(&Poco::Logger::get("BackupReaderDisk"), context_)
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderDisk"))
, disk(disk_)
, root_path(root_path_)
, data_source_description(disk->getDataSourceDescription())
@ -56,8 +56,8 @@ void BackupReaderDisk::copyFileToDisk(const String & path_in_backup, size_t file
}
BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_)
: BackupWriterDefault(&Poco::Logger::get("BackupWriterDisk"), context_)
BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterDisk"))
, disk(disk_)
, root_path(root_path_)
, data_source_description(disk->getDataSourceDescription())

View File

@ -13,7 +13,7 @@ using DiskPtr = std::shared_ptr<IDisk>;
class BackupReaderDisk : public BackupReaderDefault
{
public:
BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_);
BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
~BackupReaderDisk() override;
bool fileExists(const String & file_name) override;
@ -33,7 +33,7 @@ private:
class BackupWriterDisk : public BackupWriterDefault
{
public:
BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_);
BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
~BackupWriterDisk() override;
bool fileExists(const String & file_name) override;

View File

@ -16,8 +16,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
BackupReaderFile::BackupReaderFile(const String & root_path_, const ContextPtr & context_)
: BackupReaderDefault(&Poco::Logger::get("BackupReaderFile"), context_)
BackupReaderFile::BackupReaderFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderFile"))
, root_path(root_path_)
, data_source_description(DiskLocal::getLocalDataSourceDescription(root_path))
{
@ -74,8 +74,8 @@ void BackupReaderFile::copyFileToDisk(const String & path_in_backup, size_t file
}
BackupWriterFile::BackupWriterFile(const String & root_path_, const ContextPtr & context_)
: BackupWriterDefault(&Poco::Logger::get("BackupWriterFile"), context_)
BackupWriterFile::BackupWriterFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterFile"))
, root_path(root_path_)
, data_source_description(DiskLocal::getLocalDataSourceDescription(root_path))
{

View File

@ -11,7 +11,7 @@ namespace DB
class BackupReaderFile : public BackupReaderDefault
{
public:
explicit BackupReaderFile(const String & root_path_, const ContextPtr & context_);
explicit BackupReaderFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
@ -29,7 +29,7 @@ private:
class BackupWriterFile : public BackupWriterDefault
{
public:
BackupWriterFile(const String & root_path_, const ContextPtr & context_);
BackupWriterFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;

View File

@ -101,8 +101,14 @@ namespace
BackupReaderS3::BackupReaderS3(
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ContextPtr & context_)
: BackupReaderDefault(&Poco::Logger::get("BackupReaderS3"), context_)
const S3::URI & s3_uri_,
const String & access_key_id_,
const String & secret_access_key_,
bool allow_s3_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderS3"))
, s3_uri(s3_uri_)
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
@ -178,8 +184,15 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
BackupWriterS3::BackupWriterS3(
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ContextPtr & context_)
: BackupWriterDefault(&Poco::Logger::get("BackupWriterS3"), context_)
const S3::URI & s3_uri_,
const String & access_key_id_,
const String & secret_access_key_,
bool allow_s3_native_copy,
const String & storage_class_name,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterS3"))
, s3_uri(s3_uri_)
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)

View File

@ -17,7 +17,7 @@ namespace DB
class BackupReaderS3 : public BackupReaderDefault
{
public:
BackupReaderS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ContextPtr & context_);
BackupReaderS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupReaderS3() override;
bool fileExists(const String & file_name) override;
@ -38,7 +38,7 @@ private:
class BackupWriterS3 : public BackupWriterDefault
{
public:
BackupWriterS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ContextPtr & context_);
BackupWriterS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupWriterS3() override;
bool fileExists(const String & file_name) override;

View File

@ -27,6 +27,7 @@ namespace ErrorCodes
M(Bool, decrypt_files_from_encrypted_disks) \
M(Bool, deduplicate_files) \
M(Bool, allow_s3_native_copy) \
M(Bool, read_from_filesystem_cache) \
M(UInt64, shard_num) \
M(UInt64, replica_num) \
M(Bool, internal) \

View File

@ -44,6 +44,10 @@ struct BackupSettings
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
bool allow_s3_native_copy = true;
/// Allow to use the filesystem cache in passive mode - benefit from the existing cache entries,
/// but don't put more entries into the cache.
bool read_from_filesystem_cache = true;
/// 1-based shard index to store in the backup. 0 means all shards.
/// Can only be used with BACKUP ON CLUSTER.
size_t shard_num = 0;

View File

@ -178,6 +178,42 @@ namespace
{
return status == BackupStatus::RESTORING;
}
/// We use slightly different read and write settings for backup/restore
/// with a separate throttler and limited usage of filesystem cache.
ReadSettings getReadSettingsForBackup(const ContextPtr & context, const BackupSettings & backup_settings)
{
auto read_settings = context->getReadSettings();
read_settings.remote_throttler = context->getBackupsThrottler();
read_settings.local_throttler = context->getBackupsThrottler();
read_settings.enable_filesystem_cache = backup_settings.read_from_filesystem_cache;
read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = backup_settings.read_from_filesystem_cache;
return read_settings;
}
WriteSettings getWriteSettingsForBackup(const ContextPtr & context)
{
auto write_settings = context->getWriteSettings();
write_settings.enable_filesystem_cache_on_write_operations = false;
return write_settings;
}
ReadSettings getReadSettingsForRestore(const ContextPtr & context)
{
auto read_settings = context->getReadSettings();
read_settings.remote_throttler = context->getBackupsThrottler();
read_settings.local_throttler = context->getBackupsThrottler();
read_settings.enable_filesystem_cache = false;
read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
return read_settings;
}
WriteSettings getWriteSettingsForRestore(const ContextPtr & context)
{
auto write_settings = context->getWriteSettings();
write_settings.enable_filesystem_cache_on_write_operations = false;
return write_settings;
}
}
@ -350,6 +386,8 @@ void BackupsWorker::doBackup(
backup_create_params.backup_uuid = backup_settings.backup_uuid;
backup_create_params.deduplicate_files = backup_settings.deduplicate_files;
backup_create_params.allow_s3_native_copy = backup_settings.allow_s3_native_copy;
backup_create_params.read_settings = getReadSettingsForBackup(context, backup_settings);
backup_create_params.write_settings = getWriteSettingsForBackup(context);
BackupMutablePtr backup = BackupFactory::instance().createBackup(backup_create_params);
/// Write the backup.
@ -378,12 +416,12 @@ void BackupsWorker::doBackup(
/// Prepare backup entries.
BackupEntries backup_entries;
{
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, context};
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, backup_create_params.read_settings, context};
backup_entries = backup_entries_collector.run();
}
/// Write the backup entries to the backup.
buildFileInfosForBackupEntries(backup, backup_entries, backup_coordination);
buildFileInfosForBackupEntries(backup, backup_entries, backup_create_params.read_settings, backup_coordination);
writeBackupEntries(backup, std::move(backup_entries), backup_id, backup_coordination, backup_settings.internal);
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
@ -433,12 +471,12 @@ void BackupsWorker::doBackup(
}
void BackupsWorker::buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, std::shared_ptr<IBackupCoordination> backup_coordination)
void BackupsWorker::buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, const ReadSettings & read_settings, std::shared_ptr<IBackupCoordination> backup_coordination)
{
LOG_TRACE(log, "{}", Stage::BUILDING_FILE_INFOS);
backup_coordination->setStage(Stage::BUILDING_FILE_INFOS, "");
backup_coordination->waitForStage(Stage::BUILDING_FILE_INFOS);
backup_coordination->addFileInfos(::DB::buildFileInfosForBackupEntries(backup_entries, backup->getBaseBackup(), *backups_thread_pool));
backup_coordination->addFileInfos(::DB::buildFileInfosForBackupEntries(backup_entries, backup->getBaseBackup(), read_settings, *backups_thread_pool));
}
@ -650,6 +688,8 @@ void BackupsWorker::doRestore(
backup_open_params.base_backup_info = restore_settings.base_backup_info;
backup_open_params.password = restore_settings.password;
backup_open_params.allow_s3_native_copy = restore_settings.allow_s3_native_copy;
backup_open_params.read_settings = getReadSettingsForRestore(context);
backup_open_params.write_settings = getWriteSettingsForRestore(context);
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
String current_database = context->getCurrentDatabase();

View File

@ -24,6 +24,7 @@ using BackupPtr = std::shared_ptr<const IBackup>;
class IBackupEntry;
using BackupEntries = std::vector<std::pair<String, std::shared_ptr<const IBackupEntry>>>;
using DataRestoreTasks = std::vector<std::function<void()>>;
struct ReadSettings;
/// Manager of backups and restores: executes backups and restores' threads in the background.
/// Keeps information about backups and restores started in this session.
@ -107,7 +108,7 @@ private:
bool called_async);
/// Builds file infos for specified backup entries.
void buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, std::shared_ptr<IBackupCoordination> backup_coordination);
void buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, const ReadSettings & read_settings, std::shared_ptr<IBackupCoordination> backup_coordination);
/// Write backup entries to an opened backup.
void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, const OperationID & backup_id, std::shared_ptr<IBackupCoordination> backup_coordination, bool internal);

View File

@ -19,8 +19,8 @@ public:
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const override { return getInternalBackupEntry()->getReadBuffer(read_settings); }
UInt64 getSize() const override { return getInternalBackupEntry()->getSize(); }
UInt128 getChecksum() const override { return getInternalBackupEntry()->getChecksum(); }
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override { return getInternalBackupEntry()->getPartialChecksum(prefix_length); }
UInt128 getChecksum(const ReadSettings & read_settings) const override { return getInternalBackupEntry()->getChecksum(read_settings); }
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override { return getInternalBackupEntry()->getPartialChecksum(prefix_length, read_settings); }
DataSourceDescription getDataSourceDescription() const override { return getInternalBackupEntry()->getDataSourceDescription(); }
bool isEncryptedByDisk() const override { return getInternalBackupEntry()->isEncryptedByDisk(); }
bool isFromFile() const override { return getInternalBackupEntry()->isFromFile(); }

View File

@ -21,11 +21,11 @@ public:
virtual UInt64 getSize() const = 0;
/// Returns the checksum of the data.
virtual UInt128 getChecksum() const = 0;
virtual UInt128 getChecksum(const ReadSettings & read_settings) const = 0;
/// Returns a partial checksum, i.e. the checksum calculated for a prefix part of the data.
/// Can return nullopt if the partial checksum is too difficult to calculate.
virtual std::optional<UInt128> getPartialChecksum(size_t /* prefix_length */) const { return {}; }
virtual std::optional<UInt128> getPartialChecksum(size_t /* prefix_length */, const ReadSettings &) const { return {}; }
/// Returns a read buffer for reading the data.
virtual std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const = 0;

View File

@ -107,12 +107,27 @@ void registerBackupEngineS3(BackupFactory & factory)
if (params.open_mode == IBackup::OpenMode::READ)
{
auto reader = std::make_shared<BackupReaderS3>(S3::URI{s3_uri}, access_key_id, secret_access_key, params.allow_s3_native_copy, params.context);
auto reader = std::make_shared<BackupReaderS3>(S3::URI{s3_uri},
access_key_id,
secret_access_key,
params.allow_s3_native_copy,
params.read_settings,
params.write_settings,
params.context);
return std::make_unique<BackupImpl>(backup_name_for_logging, archive_params, params.base_backup_info, reader, params.context);
}
else
{
auto writer = std::make_shared<BackupWriterS3>(S3::URI{s3_uri}, access_key_id, secret_access_key, params.allow_s3_native_copy, params.s3_storage_class, params.context);
auto writer = std::make_shared<BackupWriterS3>(S3::URI{s3_uri},
access_key_id,
secret_access_key,
params.allow_s3_native_copy,
params.s3_storage_class,
params.read_settings,
params.write_settings,
params.context);
return std::make_unique<BackupImpl>(
backup_name_for_logging,
archive_params,

View File

@ -169,18 +169,18 @@ void registerBackupEnginesFileAndDisk(BackupFactory & factory)
{
std::shared_ptr<IBackupReader> reader;
if (engine_name == "File")
reader = std::make_shared<BackupReaderFile>(path, params.context);
reader = std::make_shared<BackupReaderFile>(path, params.read_settings, params.write_settings);
else
reader = std::make_shared<BackupReaderDisk>(disk, path, params.context);
reader = std::make_shared<BackupReaderDisk>(disk, path, params.read_settings, params.write_settings);
return std::make_unique<BackupImpl>(backup_name_for_logging, archive_params, params.base_backup_info, reader, params.context);
}
else
{
std::shared_ptr<IBackupWriter> writer;
if (engine_name == "File")
writer = std::make_shared<BackupWriterFile>(path, params.context);
writer = std::make_shared<BackupWriterFile>(path, params.read_settings, params.write_settings);
else
writer = std::make_shared<BackupWriterDisk>(disk, path, params.context);
writer = std::make_shared<BackupWriterDisk>(disk, path, params.read_settings, params.write_settings);
return std::make_unique<BackupImpl>(
backup_name_for_logging,
archive_params,

View File

@ -69,14 +69,14 @@ protected:
static String getChecksum(const BackupEntryPtr & backup_entry)
{
return getHexUIntUppercase(backup_entry->getChecksum());
return getHexUIntUppercase(backup_entry->getChecksum({}));
}
static const constexpr std::string_view NO_CHECKSUM = "no checksum";
static String getPartialChecksum(const BackupEntryPtr & backup_entry, size_t prefix_length)
{
auto partial_checksum = backup_entry->getPartialChecksum(prefix_length);
auto partial_checksum = backup_entry->getPartialChecksum(prefix_length, {});
if (!partial_checksum)
return String{NO_CHECKSUM};
return getHexUIntUppercase(*partial_checksum);
@ -218,7 +218,7 @@ TEST_F(BackupEntriesTest, PartialChecksumBeforeFullChecksum)
TEST_F(BackupEntriesTest, BackupEntryFromSmallFile)
{
writeFile(local_disk, "a.txt");
auto entry = std::make_shared<BackupEntryFromSmallFile>(local_disk, "a.txt");
auto entry = std::make_shared<BackupEntryFromSmallFile>(local_disk, "a.txt", ReadSettings{});
local_disk->removeFile("a.txt");
@ -239,7 +239,7 @@ TEST_F(BackupEntriesTest, DecryptedEntriesFromEncryptedDisk)
std::pair<BackupEntryPtr, bool /* partial_checksum_allowed */> test_cases[]
= {{std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "a.txt"), false},
{std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "a.txt"), true},
{std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt"), true}};
{std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt", ReadSettings{}), true}};
for (const auto & [entry, partial_checksum_allowed] : test_cases)
{
EXPECT_EQ(entry->getSize(), 9);
@ -258,7 +258,7 @@ TEST_F(BackupEntriesTest, DecryptedEntriesFromEncryptedDisk)
BackupEntryPtr entries[]
= {std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "empty.txt"),
std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "empty.txt"),
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt")};
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt", ReadSettings{})};
for (const auto & entry : entries)
{
EXPECT_EQ(entry->getSize(), 0);
@ -288,7 +288,7 @@ TEST_F(BackupEntriesTest, EncryptedEntriesFromEncryptedDisk)
BackupEntryPtr entries[]
= {std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "a.txt", /* copy_encrypted= */ true),
std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "a.txt", /* copy_encrypted= */ true),
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt", /* copy_encrypted= */ true)};
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt", ReadSettings{}, /* copy_encrypted= */ true)};
auto encrypted_checksum = getChecksum(entries[0]);
EXPECT_NE(encrypted_checksum, NO_CHECKSUM);
@ -322,7 +322,7 @@ TEST_F(BackupEntriesTest, EncryptedEntriesFromEncryptedDisk)
BackupEntryPtr entries[]
= {std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "empty.txt", /* copy_encrypted= */ true),
std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "empty.txt", /* copy_encrypted= */ true),
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt", /* copy_encrypted= */ true)};
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt", ReadSettings{}, /* copy_encrypted= */ true)};
for (const auto & entry : entries)
{
EXPECT_EQ(entry->getSize(), 0);

View File

@ -582,7 +582,8 @@
M(697, CANNOT_RESTORE_TO_NONENCRYPTED_DISK) \
M(698, INVALID_REDIS_STORAGE_TYPE) \
M(699, INVALID_REDIS_TABLE_STRUCTURE) \
M(700, USER_SESSION_LIMIT_EXCEEDED) \
M(700, USER_SESSION_LIMIT_EXCEEDED) \
M(701, CLUSTER_DOESNT_EXIST) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -33,6 +33,9 @@ void HTTPHeaderFilter::setValuesFromConfig(const Poco::Util::AbstractConfigurati
{
std::lock_guard guard(mutex);
forbidden_headers.clear();
forbidden_headers_regexp.clear();
if (config.has("http_forbid_headers"))
{
std::vector<std::string> keys;
@ -46,11 +49,6 @@ void HTTPHeaderFilter::setValuesFromConfig(const Poco::Util::AbstractConfigurati
forbidden_headers.insert(config.getString("http_forbid_headers." + key));
}
}
else
{
forbidden_headers.clear();
forbidden_headers_regexp.clear();
}
}
}

View File

@ -8,6 +8,7 @@
#include <Parsers/formatAST.h>
#include <Parsers/ASTCreateNamedCollectionQuery.h>
#include <Parsers/ASTAlterNamedCollectionQuery.h>
#include <Parsers/ASTDropNamedCollectionQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
@ -225,24 +226,15 @@ public:
void remove(const std::string & collection_name)
{
if (!removeIfExists(collection_name))
auto collection_path = getMetadataPath(collection_name);
if (!fs::exists(collection_path))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
collection_name);
}
}
bool removeIfExists(const std::string & collection_name)
{
auto collection_path = getMetadataPath(collection_name);
if (fs::exists(collection_path))
{
fs::remove(collection_path);
return true;
}
return false;
fs::remove(collection_path);
}
private:
@ -393,36 +385,64 @@ void loadIfNot()
return loadIfNotUnlocked(lock);
}
void removeFromSQL(const std::string & collection_name, ContextPtr context)
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
LoadFromSQL(context).remove(collection_name);
NamedCollectionFactory::instance().remove(collection_name);
}
void removeIfExistsFromSQL(const std::string & collection_name, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
LoadFromSQL(context).removeIfExists(collection_name);
NamedCollectionFactory::instance().removeIfExists(collection_name);
auto & instance = NamedCollectionFactory::instance();
if (!instance.exists(query.collection_name))
{
if (!query.if_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
query.collection_name);
}
return;
}
LoadFromSQL(context).remove(query.collection_name);
instance.remove(query.collection_name);
}
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
NamedCollectionFactory::instance().add(query.collection_name, LoadFromSQL(context).create(query));
auto & instance = NamedCollectionFactory::instance();
if (instance.exists(query.collection_name))
{
if (!query.if_not_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"A named collection `{}` already exists",
query.collection_name);
}
return;
}
instance.add(query.collection_name, LoadFromSQL(context).create(query));
}
void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
auto & instance = NamedCollectionFactory::instance();
if (!instance.exists(query.collection_name))
{
if (!query.if_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
query.collection_name);
}
return;
}
LoadFromSQL(context).update(query);
auto collection = NamedCollectionFactory::instance().getMutable(query.collection_name);
auto collection = instance.getMutable(query.collection_name);
auto collection_lock = collection->lock();
for (const auto & [name, value] : query.changes)

View File

@ -8,6 +8,7 @@ namespace DB
class ASTCreateNamedCollectionQuery;
class ASTAlterNamedCollectionQuery;
class ASTDropNamedCollectionQuery;
namespace NamedCollectionUtils
{
@ -26,8 +27,7 @@ void reloadFromConfig(const Poco::Util::AbstractConfiguration & config);
void loadFromSQL(ContextPtr context);
/// Remove collection as well as its metadata from `context->getPath() / named_collections /`.
void removeFromSQL(const std::string & collection_name, ContextPtr context);
void removeIfExistsFromSQL(const std::string & collection_name, ContextPtr context);
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context);
/// Create a new collection from AST and put it to `context->getPath() / named_collections /`.
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context);

View File

@ -101,6 +101,10 @@ void ProgressIndication::writeFinalProgress()
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.)";
else
std::cout << ". ";
auto peak_memory_usage = getMemoryUsage().peak;
if (peak_memory_usage >= 0)
std::cout << "\nPeak memory usage: " << formatReadableSizeWithBinarySuffix(peak_memory_usage) << ".";
}
void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message)

View File

@ -70,6 +70,8 @@ ThreadGroup::ThreadGroup()
ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_)
: thread_id{getThreadId()}, check_current_thread_on_destruction(check_current_thread_on_destruction_)
{
chassert(!current_thread);
last_rusage = std::make_unique<RUsageCounters>();
memory_tracker.setDescription("(for thread)");
@ -123,6 +125,7 @@ ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_)
ThreadGroupPtr ThreadStatus::getThreadGroup() const
{
chassert(current_thread == this);
return thread_group;
}

View File

@ -218,7 +218,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
}
catch (...)
{
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_path);
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_file_info.path);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});

View File

@ -78,7 +78,7 @@ class IColumn;
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_factor, 2, "Multiply s3_min_upload_part_size by this factor each time s3_multiply_parts_count_threshold parts were uploaded from a single write to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3, s3_min_upload_part_size is multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited. You ", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
@ -838,6 +838,9 @@ class IColumn;
MAKE_OBSOLETE(M, Seconds, drain_timeout, 3) \
MAKE_OBSOLETE(M, UInt64, backup_threads, 16) \
MAKE_OBSOLETE(M, UInt64, restore_threads, 16) \
MAKE_OBSOLETE(M, Bool, input_format_arrow_import_nested, false) \
MAKE_OBSOLETE(M, Bool, input_format_parquet_import_nested, false) \
MAKE_OBSOLETE(M, Bool, input_format_orc_import_nested, false) \
MAKE_OBSOLETE(M, Bool, optimize_duplicate_order_by_and_distinct, false) \
/** The section above is for obsolete settings. Do not add anything there. */
@ -859,12 +862,9 @@ class IColumn;
M(Bool, input_format_tsv_empty_as_default, false, "Treat empty fields in TSV input as default values.", 0) \
M(Bool, input_format_tsv_enum_as_number, false, "Treat inserted enum values in TSV formats as enum indices.", 0) \
M(Bool, input_format_null_as_default, true, "Initialize null fields with default values if the data type of this field is not nullable and it is supported by the input format", 0) \
M(Bool, input_format_arrow_import_nested, false, "Allow to insert array of structs into Nested table in Arrow input format.", 0) \
M(Bool, input_format_arrow_case_insensitive_column_matching, false, "Ignore case when matching Arrow columns with CH columns.", 0) \
M(Bool, input_format_orc_import_nested, false, "Allow to insert array of structs into Nested table in ORC input format.", 0) \
M(Int64, input_format_orc_row_batch_size, 100'000, "Batch size when reading ORC stripes.", 0) \
M(Bool, input_format_orc_case_insensitive_column_matching, false, "Ignore case when matching ORC columns with CH columns.", 0) \
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
M(Bool, input_format_parquet_case_insensitive_column_matching, false, "Ignore case when matching Parquet columns with CH columns.", 0) \
M(Bool, input_format_parquet_preserve_order, false, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \

View File

@ -107,9 +107,6 @@ DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & m
{
cckMetadataPathForOrdinary(create, metadata_path);
/// Creates store/xxx/ for Atomic
fs::create_directories(fs::path(metadata_path).parent_path());
DatabasePtr impl = getImpl(create, metadata_path, context);
if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries)

View File

@ -77,6 +77,8 @@ DatabaseMySQL::DatabaseMySQL(
throw;
}
fs::create_directories(metadata_path);
thread = ThreadFromGlobalPool{&DatabaseMySQL::cleanOutdatedTables, this};
}
@ -144,6 +146,7 @@ ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const String & table_name, Context
auto table_storage_define = database_engine_define->clone();
{
ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
ast_storage->engine->kind = ASTFunction::Kind::TABLE_ENGINE;
ASTs storage_children = ast_storage->children;
auto storage_engine_arguments = ast_storage->engine->arguments;

View File

@ -54,6 +54,7 @@ DatabasePostgreSQL::DatabasePostgreSQL(
, cache_tables(cache_tables_)
, log(&Poco::Logger::get("DatabasePostgreSQL(" + dbname_ + ")"))
{
fs::create_directories(metadata_path);
cleaner_task = getContext()->getSchedulePool().createTask("PostgreSQLCleanerTask", [this]{ removeOutdatedTables(); });
cleaner_task->deactivate();
}
@ -390,6 +391,7 @@ ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, Co
auto create_table_query = std::make_shared<ASTCreateQuery>();
auto table_storage_define = database_engine_define->clone();
table_storage_define->as<ASTStorage>()->engine->kind = ASTFunction::Kind::TABLE_ENGINE;
create_table_query->set(create_table_query->storage, table_storage_define);
auto columns_declare_list = std::make_shared<ASTColumns>();

View File

@ -187,6 +187,7 @@ ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, Contex
}
auto table_storage_define = database_engine_define->clone();
ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
ast_storage->engine->kind = ASTFunction::Kind::TABLE_ENGINE;
auto storage_engine_arguments = ast_storage->engine->arguments;
auto table_id = storage->getStorageID();
/// Add table_name to engine arguments

View File

@ -120,7 +120,6 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.row_group_rows = settings.output_format_parquet_row_group_size;
format_settings.parquet.row_group_bytes = settings.output_format_parquet_row_group_size_bytes;
format_settings.parquet.output_version = settings.output_format_parquet_version;
format_settings.parquet.import_nested = settings.input_format_parquet_import_nested;
format_settings.parquet.case_insensitive_column_matching = settings.input_format_parquet_case_insensitive_column_matching;
format_settings.parquet.preserve_order = settings.input_format_parquet_preserve_order;
format_settings.parquet.allow_missing_columns = settings.input_format_parquet_allow_missing_columns;
@ -170,7 +169,6 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.with_types_use_header = settings.input_format_with_types_use_header;
format_settings.write_statistics = settings.output_format_write_statistics;
format_settings.arrow.low_cardinality_as_dictionary = settings.output_format_arrow_low_cardinality_as_dictionary;
format_settings.arrow.import_nested = settings.input_format_arrow_import_nested;
format_settings.arrow.allow_missing_columns = settings.input_format_arrow_allow_missing_columns;
format_settings.arrow.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.arrow.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference;
@ -178,11 +176,9 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.arrow.output_string_as_string = settings.output_format_arrow_string_as_string;
format_settings.arrow.output_fixed_string_as_fixed_byte_array = settings.output_format_arrow_fixed_string_as_fixed_byte_array;
format_settings.arrow.output_compression_method = settings.output_format_arrow_compression_method;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.orc.allow_missing_columns = settings.input_format_orc_allow_missing_columns;
format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size;
format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_orc_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.orc.allow_missing_columns = settings.input_format_orc_allow_missing_columns;
format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size;
format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_orc_skip_columns_with_unsupported_types_in_schema_inference;
@ -687,14 +683,6 @@ void FormatFactory::markFormatSupportsSubsetOfColumns(const String & name)
target = true;
}
void FormatFactory::markFormatSupportsSubcolumns(const String & name)
{
auto & target = dict[name].supports_subcolumns;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subcolumns", name);
target = true;
}
void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
{
auto & target = dict[name].prefers_large_blocks;
@ -703,12 +691,6 @@ void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
target = true;
}
bool FormatFactory::checkIfFormatSupportsSubcolumns(const String & name) const
{
const auto & target = getCreators(name);
return target.supports_subcolumns;
}
bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) const
{
const auto & target = getCreators(name);

View File

@ -228,10 +228,8 @@ public:
void markOutputFormatSupportsParallelFormatting(const String & name);
void markOutputFormatPrefersLargeBlocks(const String & name);
void markFormatSupportsSubcolumns(const String & name);
void markFormatSupportsSubsetOfColumns(const String & name);
bool checkIfFormatSupportsSubcolumns(const String & name) const;
bool checkIfFormatSupportsSubsetOfColumns(const String & name) const;
bool checkIfFormatHasSchemaReader(const String & name) const;

View File

@ -113,7 +113,6 @@ struct FormatSettings
{
UInt64 row_group_size = 1000000;
bool low_cardinality_as_dictionary = false;
bool import_nested = false;
bool allow_missing_columns = false;
bool skip_columns_with_unsupported_types_in_schema_inference = false;
bool case_insensitive_column_matching = false;
@ -227,7 +226,6 @@ struct FormatSettings
{
UInt64 row_group_rows = 1000000;
UInt64 row_group_bytes = 512 * 1024 * 1024;
bool import_nested = false;
bool allow_missing_columns = false;
bool skip_columns_with_unsupported_types_in_schema_inference = false;
bool case_insensitive_column_matching = false;
@ -338,7 +336,6 @@ struct FormatSettings
struct
{
bool import_nested = false;
bool allow_missing_columns = false;
int64_t row_batch_size = 100'000;
bool skip_columns_with_unsupported_types_in_schema_inference = false;

View File

@ -510,11 +510,12 @@ ColumnPtr FunctionArrayIntersect::execute(const UnpackedArrays & arrays, Mutable
map.clear();
bool all_has_nullable = all_nullable;
bool current_has_nullable = false;
for (size_t arg_num = 0; arg_num < args; ++arg_num)
{
const auto & arg = arrays.args[arg_num];
bool current_has_nullable = false;
current_has_nullable = false;
size_t off;
// const array has only one row
@ -549,44 +550,93 @@ ColumnPtr FunctionArrayIntersect::execute(const UnpackedArrays & arrays, Mutable
}
}
prev_off[arg_num] = off;
if (arg.is_const)
prev_off[arg_num] = 0;
// We update offsets for all the arrays except the first one. Offsets for the first array would be updated later.
// It is needed to iterate the first array again so that the elements in the result would have fixed order.
if (arg_num)
{
prev_off[arg_num] = off;
if (arg.is_const)
prev_off[arg_num] = 0;
}
if (!current_has_nullable)
all_has_nullable = false;
}
if (all_has_nullable)
{
++result_offset;
result_data.insertDefault();
null_map.push_back(1);
}
// We have NULL in output only once if it should be there
bool null_added = false;
const auto & arg = arrays.args[0];
size_t off;
// const array has only one row
if (arg.is_const)
off = (*arg.offsets)[0];
else
off = (*arg.offsets)[row];
for (const auto & pair : map)
for (auto i : collections::range(prev_off[0], off))
{
if (pair.getMapped() == args)
all_has_nullable = all_nullable;
typename Map::LookupResult pair = nullptr;
if (arg.null_map && (*arg.null_map)[i])
{
current_has_nullable = true;
if (all_has_nullable && !null_added)
{
++result_offset;
result_data.insertDefault();
null_map.push_back(1);
null_added = true;
}
if (null_added)
continue;
}
else if constexpr (is_numeric_column)
{
pair = map.find(columns[0]->getElement(i));
}
else if constexpr (std::is_same_v<ColumnType, ColumnString> || std::is_same_v<ColumnType, ColumnFixedString>)
pair = map.find(columns[0]->getDataAt(i));
else
{
const char * data = nullptr;
pair = map.find(columns[0]->serializeValueIntoArena(i, arena, data));
}
prev_off[0] = off;
if (arg.is_const)
prev_off[0] = 0;
if (!current_has_nullable)
all_has_nullable = false;
if (pair && pair->getMapped() == args)
{
// We increase pair->getMapped() here to not skip duplicate values from the first array.
++pair->getMapped();
++result_offset;
if constexpr (is_numeric_column)
result_data.insertValue(pair.getKey());
{
result_data.insertValue(pair->getKey());
}
else if constexpr (std::is_same_v<ColumnType, ColumnString> || std::is_same_v<ColumnType, ColumnFixedString>)
result_data.insertData(pair.getKey().data, pair.getKey().size);
{
result_data.insertData(pair->getKey().data, pair->getKey().size);
}
else
result_data.deserializeAndInsertFromArena(pair.getKey().data);
{
result_data.deserializeAndInsertFromArena(pair->getKey().data);
}
if (all_nullable)
null_map.push_back(0);
}
}
result_offsets.getElement(row) = result_offset;
}
}
ColumnPtr result_column = std::move(result_data_ptr);
if (all_nullable)
result_column = ColumnNullable::create(result_column, std::move(null_map_column));
return ColumnArray::create(result_column, std::move(result_offsets_ptr));
}

View File

@ -16,6 +16,7 @@ namespace ActionLocks
extern const StorageActionBlockType DistributedSend = 5;
extern const StorageActionBlockType PartsTTLMerge = 6;
extern const StorageActionBlockType PartsMove = 7;
extern const StorageActionBlockType PullReplicationLog = 8;
}

View File

@ -166,7 +166,6 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int BAD_GET;
extern const int UNKNOWN_DATABASE;
extern const int UNKNOWN_TABLE;
extern const int TABLE_ALREADY_EXISTS;
@ -181,6 +180,7 @@ namespace ErrorCodes
extern const int UNKNOWN_FUNCTION;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int CLUSTER_DOESNT_EXIST;
}
#define SHUTDOWN(log, desc, ptr, method) do \
@ -1089,52 +1089,32 @@ ConfigurationPtr Context::getUsersConfig()
return shared->users_config;
}
void Context::setUser(const UUID & user_id_, bool set_current_profiles_, bool set_current_roles_, bool set_current_database_)
void Context::setUser(const UUID & user_id_, const std::optional<const std::vector<UUID>> & current_roles_)
{
/// Prepare lists of user's profiles, constraints, settings, roles.
/// NOTE: AccessControl::read<User>() and other AccessControl's functions may require some IO work,
/// so Context::getLock() must be unlocked while we're doing this.
std::shared_ptr<const User> user;
std::shared_ptr<const ContextAccess> temp_access;
if (set_current_profiles_ || set_current_roles_ || set_current_database_)
{
std::optional<ContextAccessParams> params;
{
auto lock = getLock();
params.emplace(ContextAccessParams{user_id_, /* full_access= */ false, /* use_default_roles = */ true, {}, settings, current_database, client_info });
}
/// `temp_access` is used here only to extract information about the user, not to actually check access.
/// NOTE: AccessControl::getContextAccess() may require some IO work, so Context::getLock() must be unlocked while we're doing this.
temp_access = getAccessControl().getContextAccess(*params);
user = temp_access->getUser();
}
auto user = getAccessControl().read<User>(user_id_);
std::shared_ptr<const SettingsProfilesInfo> profiles;
if (set_current_profiles_)
profiles = temp_access->getDefaultProfileInfo();
std::optional<std::vector<UUID>> roles;
if (set_current_roles_)
roles = user->granted_roles.findGranted(user->default_roles);
String database;
if (set_current_database_)
database = user->default_database;
auto new_current_roles = current_roles_ ? user->granted_roles.findGranted(*current_roles_) : user->granted_roles.findGranted(user->default_roles);
auto enabled_roles = getAccessControl().getEnabledRolesInfo(new_current_roles, {});
auto enabled_profiles = getAccessControl().getEnabledSettingsInfo(user_id_, user->settings, enabled_roles->enabled_roles, enabled_roles->settings_from_enabled_roles);
const auto & database = user->default_database;
/// Apply user's profiles, constraints, settings, roles.
auto lock = getLock();
setUserID(user_id_);
if (profiles)
{
/// A profile can specify a value and a readonly constraint for same setting at the same time,
/// so we shouldn't check constraints here.
setCurrentProfiles(*profiles, /* check_constraints= */ false);
}
/// A profile can specify a value and a readonly constraint for same setting at the same time,
/// so we shouldn't check constraints here.
setCurrentProfiles(*enabled_profiles, /* check_constraints= */ false);
if (roles)
setCurrentRoles(*roles);
setCurrentRoles(new_current_roles);
/// It's optional to specify the DEFAULT DATABASE in the user's definition.
if (!database.empty())
setCurrentDatabase(database);
}
@ -3073,7 +3053,7 @@ UInt16 Context::getServerPort(const String & port_name) const
{
auto it = shared->server_ports.find(port_name);
if (it == shared->server_ports.end())
throw Exception(ErrorCodes::BAD_GET, "There is no port named {}", port_name);
throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "There is no port named {}", port_name);
else
return it->second;
}
@ -3082,7 +3062,7 @@ std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) c
{
if (auto res = tryGetCluster(cluster_name))
return res;
throw Exception(ErrorCodes::BAD_GET, "Requested cluster '{}' not found", cluster_name);
throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name);
}
@ -4550,14 +4530,6 @@ ReadSettings Context::getReadSettings() const
return res;
}
ReadSettings Context::getBackupReadSettings() const
{
ReadSettings read_settings = getReadSettings();
read_settings.remote_throttler = getBackupsThrottler();
read_settings.local_throttler = getBackupsThrottler();
return read_settings;
}
WriteSettings Context::getWriteSettings() const
{
WriteSettings res;

View File

@ -534,12 +534,10 @@ public:
/// Sets the current user assuming that he/she is already authenticated.
/// WARNING: This function doesn't check password!
void setUser(const UUID & user_id_, bool set_current_profiles_ = true, bool set_current_roles_ = true, bool set_current_database_ = true);
void setUser(const UUID & user_id_, const std::optional<const std::vector<UUID>> & current_roles_ = {});
UserPtr getUser() const;
void setUserID(const UUID & user_id_);
std::optional<UUID> getUserID() const;
String getUserName() const;
void setCurrentRoles(const std::vector<UUID> & current_roles_);
@ -1168,9 +1166,6 @@ public:
/** Get settings for reading from filesystem. */
ReadSettings getReadSettings() const;
/** Get settings for reading from filesystem for BACKUPs. */
ReadSettings getBackupReadSettings() const;
/** Get settings for writing to filesystem. */
WriteSettings getWriteSettings() const;
@ -1195,6 +1190,8 @@ private:
void initGlobal();
void setUserID(const UUID & user_id_);
template <typename... Args>
void checkAccessImpl(const Args &... args) const;

View File

@ -1,5 +1,4 @@
#include <Interpreters/InterpreterCreateNamedCollectionQuery.h>
#include <Parsers/ASTCreateNamedCollectionQuery.h>
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>

View File

@ -22,11 +22,7 @@ BlockIO InterpreterDropNamedCollectionQuery::execute()
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
if (query.if_exists)
NamedCollectionUtils::removeIfExistsFromSQL(query.collection_name, current_context);
else
NamedCollectionUtils::removeFromSQL(query.collection_name, current_context);
NamedCollectionUtils::removeFromSQL(query, current_context);
return {};
}

View File

@ -89,13 +89,14 @@ namespace ErrorCodes
namespace ActionLocks
{
extern StorageActionBlockType PartsMerge;
extern StorageActionBlockType PartsFetch;
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType ReplicationQueue;
extern StorageActionBlockType DistributedSend;
extern StorageActionBlockType PartsTTLMerge;
extern StorageActionBlockType PartsMove;
extern const StorageActionBlockType PartsMerge;
extern const StorageActionBlockType PartsFetch;
extern const StorageActionBlockType PartsSend;
extern const StorageActionBlockType ReplicationQueue;
extern const StorageActionBlockType DistributedSend;
extern const StorageActionBlockType PartsTTLMerge;
extern const StorageActionBlockType PartsMove;
extern const StorageActionBlockType PullReplicationLog;
}
@ -155,6 +156,8 @@ AccessType getRequiredAccessType(StorageActionBlockType action_type)
return AccessType::SYSTEM_TTL_MERGES;
else if (action_type == ActionLocks::PartsMove)
return AccessType::SYSTEM_MOVES;
else if (action_type == ActionLocks::PullReplicationLog)
return AccessType::SYSTEM_PULLING_REPLICATION_LOG;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown action type: {}", std::to_string(action_type));
}
@ -513,6 +516,12 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_DISTRIBUTED_SENDS:
startStopAction(ActionLocks::DistributedSend, true);
break;
case Type::STOP_PULLING_REPLICATION_LOG:
startStopAction(ActionLocks::PullReplicationLog, false);
break;
case Type::START_PULLING_REPLICATION_LOG:
startStopAction(ActionLocks::PullReplicationLog, true);
break;
case Type::DROP_REPLICA:
dropReplica(query);
break;
@ -1090,6 +1099,15 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable());
break;
}
case Type::STOP_PULLING_REPLICATION_LOG:
case Type::START_PULLING_REPLICATION_LOG:
{
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_PULLING_REPLICATION_LOG);
else
required_access.emplace_back(AccessType::SYSTEM_PULLING_REPLICATION_LOG, query.getDatabase(), query.getTable());
break;
}
case Type::STOP_FETCHES:
case Type::START_FETCHES:
{

View File

@ -91,34 +91,30 @@ void WindowFrame::toString(WriteBuffer & buf) const
void WindowFrame::checkValid() const
{
// Check the validity of offsets.
if (type == WindowFrame::FrameType::ROWS
|| type == WindowFrame::FrameType::GROUPS)
if (begin_type == BoundaryType::Offset
&& !((begin_offset.getType() == Field::Types::UInt64
|| begin_offset.getType() == Field::Types::Int64)
&& begin_offset.get<Int64>() >= 0
&& begin_offset.get<Int64>() < INT_MAX))
{
if (begin_type == BoundaryType::Offset
&& !((begin_offset.getType() == Field::Types::UInt64
|| begin_offset.getType() == Field::Types::Int64)
&& begin_offset.get<Int64>() >= 0
&& begin_offset.get<Int64>() < INT_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame start offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), begin_offset),
begin_offset.getType());
}
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame start offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), begin_offset),
begin_offset.getType());
}
if (end_type == BoundaryType::Offset
&& !((end_offset.getType() == Field::Types::UInt64
|| end_offset.getType() == Field::Types::Int64)
&& end_offset.get<Int64>() >= 0
&& end_offset.get<Int64>() < INT_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame end offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), end_offset),
end_offset.getType());
}
if (end_type == BoundaryType::Offset
&& !((end_offset.getType() == Field::Types::UInt64
|| end_offset.getType() == Field::Types::Int64)
&& end_offset.get<Int64>() >= 0
&& end_offset.get<Int64>() < INT_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame end offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), end_offset),
end_offset.getType());
}
// Check relative positioning of offsets.

View File

@ -45,10 +45,10 @@ namespace ErrorCodes
namespace ActionLocks
{
extern StorageActionBlockType PartsMerge;
extern StorageActionBlockType PartsFetch;
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType DistributedSend;
extern const StorageActionBlockType PartsMerge;
extern const StorageActionBlockType PartsFetch;
extern const StorageActionBlockType PartsSend;
extern const StorageActionBlockType DistributedSend;
}
static void executeCreateQuery(
@ -250,6 +250,9 @@ static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & dat
{
String path = context->getPath() + "metadata/" + database_name;
String metadata_file = path + ".sql";
if (fs::exists(metadata_file + ".tmp"))
fs::remove(metadata_file + ".tmp");
if (fs::exists(fs::path(metadata_file)))
{
/// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted.

View File

@ -15,6 +15,8 @@ ASTPtr ASTAlterNamedCollectionQuery::clone() const
void ASTAlterNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "Alter NAMED COLLECTION ";
if (if_exists)
settings.ostr << "IF EXISTS ";
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
if (!changes.empty())

View File

@ -18,6 +18,8 @@ ASTPtr ASTCreateNamedCollectionQuery::clone() const
void ASTCreateNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "CREATE NAMED COLLECTION ";
if (if_not_exists)
settings.ostr << "IF NOT EXISTS ";
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);

View File

@ -13,6 +13,7 @@ class ASTCreateNamedCollectionQuery : public IAST, public ASTQueryWithOnCluster
public:
std::string collection_name;
SettingsChanges changes;
bool if_not_exists = false;
String getID(char) const override { return "CreateNamedCollectionQuery"; }

View File

@ -13,6 +13,8 @@ ASTPtr ASTDropNamedCollectionQuery::clone() const
void ASTDropNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "DROP NAMED COLLECTION ";
if (if_exists)
settings.ostr << "IF EXISTS ";
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
}

View File

@ -162,7 +162,9 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|| type == Type::STOP_REPLICATION_QUEUES
|| type == Type::START_REPLICATION_QUEUES
|| type == Type::STOP_DISTRIBUTED_SENDS
|| type == Type::START_DISTRIBUTED_SENDS)
|| type == Type::START_DISTRIBUTED_SENDS
|| type == Type::STOP_PULLING_REPLICATION_LOG
|| type == Type::START_PULLING_REPLICATION_LOG)
{
if (table)
print_database_table();

View File

@ -80,6 +80,8 @@ public:
UNFREEZE,
ENABLE_FAILPOINT,
DISABLE_FAILPOINT,
STOP_PULLING_REPLICATION_LOG,
START_PULLING_REPLICATION_LOG,
END
};

View File

@ -13,8 +13,9 @@ bool ParserAlterNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & nod
{
ParserKeyword s_alter("ALTER");
ParserKeyword s_collection("NAMED COLLECTION");
ParserKeyword s_if_exists("IF EXISTS");
ParserKeyword s_on("ON");
ParserKeyword s_delete("DELETE");
ParserIdentifier name_p;
ParserSetQuery set_p;
ParserToken s_comma(TokenType::Comma);
@ -32,10 +33,13 @@ bool ParserAlterNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & nod
if (!s_collection.ignore(pos, expected))
return false;
if (s_if_exists.ignore(pos, expected))
if_exists = true;
if (!name_p.parse(pos, collection_name, expected))
return false;
if (ParserKeyword{"ON"}.ignore(pos, expected))
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;

View File

@ -1421,15 +1421,17 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_create("CREATE");
ParserKeyword s_attach("ATTACH");
ParserKeyword s_named_collection("NAMED COLLECTION");
ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserKeyword s_on("ON");
ParserKeyword s_as("AS");
ParserToken s_comma(TokenType::Comma);
ParserIdentifier name_p;
ParserToken s_comma(TokenType::Comma);
String cluster_str;
bool if_not_exists = false;
ASTPtr collection_name;
String cluster_str;
if (!s_create.ignore(pos, expected))
return false;
@ -1437,10 +1439,13 @@ bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expec
if (!s_named_collection.ignore(pos, expected))
return false;
if (s_if_not_exists.ignore(pos, expected))
if_not_exists = true;
if (!name_p.parse(pos, collection_name, expected))
return false;
if (ParserKeyword{"ON"}.ignore(pos, expected))
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
@ -1465,7 +1470,9 @@ bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expec
auto query = std::make_shared<ASTCreateNamedCollectionQuery>();
tryGetIdentifierNameInto(collection_name, query->collection_name);
query->if_not_exists = if_not_exists;
query->changes = changes;
query->cluster = std::move(cluster_str);
node = query;
return true;

View File

@ -548,6 +548,7 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/// CREATE NAMED COLLECTION name [ON CLUSTER cluster]
class ParserCreateNamedCollectionQuery : public IParserBase
{
protected:

View File

@ -12,6 +12,7 @@ bool ParserDropNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node
ParserKeyword s_drop("DROP");
ParserKeyword s_collection("NAMED COLLECTION");
ParserKeyword s_if_exists("IF EXISTS");
ParserKeyword s_on("ON");
ParserIdentifier name_p;
String cluster_str;
@ -31,7 +32,7 @@ bool ParserDropNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node
if (!name_p.parse(pos, collection_name, expected))
return false;
if (ParserKeyword{"ON"}.ignore(pos, expected))
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;

View File

@ -379,6 +379,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::START_REPLICATED_SENDS:
case Type::STOP_REPLICATION_QUEUES:
case Type::START_REPLICATION_QUEUES:
case Type::STOP_PULLING_REPLICATION_LOG:
case Type::START_PULLING_REPLICATION_LOG:
if (!parseQueryWithOnCluster(res, pos, expected))
return false;
parseDatabaseAndTableAsAST(pos, expected, res->database, res->table);

View File

@ -97,7 +97,6 @@ Chunk IRowInputFormat::generate()
size_t num_rows = 0;
size_t chunk_start_offset = getDataOffsetMaybeCompressed(getReadBuffer());
try
{
RowReadExtension info;

View File

@ -143,7 +143,6 @@ void ArrowBlockInputFormat::prepareReader()
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(),
"Arrow",
format_settings.arrow.import_nested,
format_settings.arrow.allow_missing_columns,
format_settings.null_as_default,
format_settings.arrow.case_insensitive_column_matching);
@ -190,7 +189,6 @@ void registerInputFormatArrow(FormatFactory & factory)
{
return std::make_shared<ArrowBlockInputFormat>(buf, sample, false, format_settings);
});
factory.markFormatSupportsSubcolumns("Arrow");
factory.markFormatSupportsSubsetOfColumns("Arrow");
factory.registerInputFormat(
"ArrowStream",

View File

@ -1032,13 +1032,11 @@ Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
ArrowColumnToCHColumn::ArrowColumnToCHColumn(
const Block & header_,
const std::string & format_name_,
bool import_nested_,
bool allow_missing_columns_,
bool null_as_default_,
bool case_insensitive_matching_)
: header(header_)
, format_name(format_name_)
, import_nested(import_nested_)
, allow_missing_columns(allow_missing_columns_)
, null_as_default(null_as_default_)
, case_insensitive_matching(case_insensitive_matching_)
@ -1080,42 +1078,40 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
if (!name_to_column_ptr.contains(search_column_name))
{
bool read_from_nested = false;
/// Check if it's a column from nested table.
if (import_nested)
/// Check if it's a subcolumn from some struct.
String nested_table_name = Nested::extractTableName(header_column.name);
String search_nested_table_name = nested_table_name;
if (case_insensitive_matching)
boost::to_lower(search_nested_table_name);
if (name_to_column_ptr.contains(search_nested_table_name))
{
String nested_table_name = Nested::extractTableName(header_column.name);
String search_nested_table_name = nested_table_name;
if (case_insensitive_matching)
boost::to_lower(search_nested_table_name);
if (name_to_column_ptr.contains(search_nested_table_name))
if (!nested_tables.contains(search_nested_table_name))
{
if (!nested_tables.contains(search_nested_table_name))
NamesAndTypesList nested_columns;
for (const auto & name_and_type : header.getNamesAndTypesList())
{
NamesAndTypesList nested_columns;
for (const auto & name_and_type : header.getNamesAndTypesList())
{
if (name_and_type.name.starts_with(nested_table_name + "."))
nested_columns.push_back(name_and_type);
}
auto nested_table_type = Nested::collect(nested_columns).front().type;
if (name_and_type.name.starts_with(nested_table_name + "."))
nested_columns.push_back(name_and_type);
}
auto nested_table_type = Nested::collect(nested_columns).front().type;
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[search_nested_table_name];
ColumnsWithTypeAndName cols = {readColumnFromArrowColumn(
arrow_column, nested_table_name, format_name, false, dictionary_infos, true, false, skipped, nested_table_type)};
BlockPtr block_ptr = std::make_shared<Block>(cols);
auto column_extractor = std::make_shared<NestedColumnExtractHelper>(*block_ptr, case_insensitive_matching);
nested_tables[search_nested_table_name] = {block_ptr, column_extractor};
}
auto nested_column = nested_tables[search_nested_table_name].second->extractColumn(search_column_name);
if (nested_column)
{
column = *nested_column;
if (case_insensitive_matching)
column.name = header_column.name;
read_from_nested = true;
}
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[search_nested_table_name];
ColumnsWithTypeAndName cols = {readColumnFromArrowColumn(
arrow_column, nested_table_name, format_name, false, dictionary_infos, true, false, skipped, nested_table_type)};
BlockPtr block_ptr = std::make_shared<Block>(cols);
auto column_extractor = std::make_shared<NestedColumnExtractHelper>(*block_ptr, case_insensitive_matching);
nested_tables[search_nested_table_name] = {block_ptr, column_extractor};
}
auto nested_column = nested_tables[search_nested_table_name].second->extractColumn(search_column_name);
if (nested_column)
{
column = *nested_column;
if (case_insensitive_matching)
column.name = header_column.name;
read_from_nested = true;
}
}
if (!read_from_nested)
{
if (!allow_missing_columns)

View File

@ -24,7 +24,6 @@ public:
ArrowColumnToCHColumn(
const Block & header_,
const std::string & format_name_,
bool import_nested_,
bool allow_missing_columns_,
bool null_as_default_,
bool case_insensitive_matching_ = false);
@ -53,7 +52,6 @@ public:
private:
const Block & header;
const std::string format_name;
bool import_nested;
/// If false, throw exception if some columns in header not exists in arrow table.
bool allow_missing_columns;
bool null_as_default;

View File

@ -1258,6 +1258,8 @@ void registerInputFormatAvro(FormatFactory & factory)
{
return std::make_shared<AvroConfluentRowInputFormat>(sample, buf, params, settings);
});
factory.markFormatSupportsSubsetOfColumns("AvroConfluent");
}
void registerAvroSchemaReader(FormatFactory & factory)

Some files were not shown because too many files have changed in this diff Show More