mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge branch 'master' into sqltest
This commit is contained in:
commit
580f207b8d
@ -200,8 +200,8 @@ Templates:
|
||||
- [Server Setting](_description_templates/template-server-setting.md)
|
||||
- [Database or Table engine](_description_templates/template-engine.md)
|
||||
- [System table](_description_templates/template-system-table.md)
|
||||
- [Data type](_description_templates/data-type.md)
|
||||
- [Statement](_description_templates/statement.md)
|
||||
- [Data type](_description_templates/template-data-type.md)
|
||||
- [Statement](_description_templates/template-statement.md)
|
||||
|
||||
|
||||
<a name="how-to-build-docs"/>
|
||||
|
@ -141,6 +141,10 @@ Runs [stateful functional tests](tests.md#functional-tests). Treat them in the s
|
||||
Runs [integration tests](tests.md#integration-tests).
|
||||
|
||||
|
||||
## Bugfix validate check
|
||||
Checks that either a new test (functional or integration) or there some changed tests that fail with the binary built on master branch. This check is triggered when pull request has "pr-bugfix" label.
|
||||
|
||||
|
||||
## Stress Test
|
||||
Runs stateless functional tests concurrently from several clients to detect
|
||||
concurrency-related errors. If it fails:
|
||||
|
@ -22,7 +22,7 @@ CREATE TABLE deltalake
|
||||
- `url` — Bucket url with path to the existing Delta Lake table.
|
||||
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file.
|
||||
|
||||
Engine parameters can be specified using [Named Collections](../../../operations/named-collections.md)
|
||||
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
|
||||
|
||||
**Example**
|
||||
|
||||
|
@ -22,7 +22,7 @@ CREATE TABLE hudi_table
|
||||
- `url` — Bucket url with the path to an existing Hudi table.
|
||||
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file.
|
||||
|
||||
Engine parameters can be specified using [Named Collections](../../../operations/named-collections.md)
|
||||
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
|
||||
|
||||
**Example**
|
||||
|
||||
|
@ -237,7 +237,7 @@ The following settings can be set before query execution or placed into configur
|
||||
- `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited).
|
||||
- `s3_max_get_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_get_rps`.
|
||||
- `s3_upload_part_size_multiply_factor` - Multiply `s3_min_upload_part_size` by this factor each time `s3_multiply_parts_count_threshold` parts were uploaded from a single write to S3. Default values is `2`.
|
||||
- `s3_upload_part_size_multiply_parts_count_threshold` - Each time this number of parts was uploaded to S3 `s3_min_upload_part_size multiplied` by `s3_upload_part_size_multiply_factor`. Default value us `500`.
|
||||
- `s3_upload_part_size_multiply_parts_count_threshold` - Each time this number of parts was uploaded to S3, `s3_min_upload_part_size` is multiplied by `s3_upload_part_size_multiply_factor`. Default value is `500`.
|
||||
- `s3_max_inflight_parts_for_one_file` - Limits the number of put requests that can be run concurrently for one object. Its number should be limited. The value `0` means unlimited. Default value is `20`. Each in-flight part has a buffer with size `s3_min_upload_part_size` for the first `s3_upload_part_size_multiply_factor` parts and more when file is big enough, see `upload_part_size_multiply_factor`. With default settings one uploaded file consumes not more than `320Mb` for a file which is less than `8G`. The consumption is greater for a larger file.
|
||||
|
||||
Security consideration: if malicious user can specify arbitrary S3 URLs, `s3_max_redirects` must be set to zero to avoid [SSRF](https://en.wikipedia.org/wiki/Server-side_request_forgery) attacks; or alternatively, `remote_host_filter` must be specified in server configuration.
|
||||
|
@ -2131,7 +2131,6 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
|
||||
|
||||
- [output_format_parquet_row_group_size](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_row_group_size) - row group size in rows while data output. Default value - `1000000`.
|
||||
- [output_format_parquet_string_as_string](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_string_as_string) - use Parquet String type instead of Binary for String columns. Default value - `false`.
|
||||
- [input_format_parquet_import_nested](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_import_nested) - allow inserting array of structs into [Nested](/docs/en/sql-reference/data-types/nested-data-structures/index.md) table in Parquet input format. Default value - `false`.
|
||||
- [input_format_parquet_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_case_insensitive_column_matching) - ignore case when matching Parquet columns with ClickHouse columns. Default value - `false`.
|
||||
- [input_format_parquet_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_allow_missing_columns) - allow missing columns while reading Parquet data. Default value - `false`.
|
||||
- [input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Parquet format. Default value - `false`.
|
||||
@ -2336,7 +2335,6 @@ $ clickhouse-client --query="SELECT * FROM {some_table} FORMAT Arrow" > {filenam
|
||||
|
||||
- [output_format_arrow_low_cardinality_as_dictionary](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_low_cardinality_as_dictionary) - enable output ClickHouse LowCardinality type as Dictionary Arrow type. Default value - `false`.
|
||||
- [output_format_arrow_string_as_string](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_string_as_string) - use Arrow String type instead of Binary for String columns. Default value - `false`.
|
||||
- [input_format_arrow_import_nested](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_import_nested) - allow inserting array of structs into Nested table in Arrow input format. Default value - `false`.
|
||||
- [input_format_arrow_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_case_insensitive_column_matching) - ignore case when matching Arrow columns with ClickHouse columns. Default value - `false`.
|
||||
- [input_format_arrow_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_allow_missing_columns) - allow missing columns while reading Arrow data. Default value - `false`.
|
||||
- [input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Arrow format. Default value - `false`.
|
||||
@ -2402,7 +2400,6 @@ $ clickhouse-client --query="SELECT * FROM {some_table} FORMAT ORC" > {filename.
|
||||
|
||||
- [output_format_arrow_string_as_string](/docs/en/operations/settings/settings-formats.md/#output_format_arrow_string_as_string) - use Arrow String type instead of Binary for String columns. Default value - `false`.
|
||||
- [output_format_orc_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_orc_compression_method) - compression method used in output ORC format. Default value - `none`.
|
||||
- [input_format_arrow_import_nested](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_import_nested) - allow inserting array of structs into Nested table in Arrow input format. Default value - `false`.
|
||||
- [input_format_arrow_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_case_insensitive_column_matching) - ignore case when matching Arrow columns with ClickHouse columns. Default value - `false`.
|
||||
- [input_format_arrow_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_allow_missing_columns) - allow missing columns while reading Arrow data. Default value - `false`.
|
||||
- [input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Arrow format. Default value - `false`.
|
||||
|
@ -1112,17 +1112,6 @@ Default value: 1.
|
||||
|
||||
## Arrow format settings {#arrow-format-settings}
|
||||
|
||||
### input_format_arrow_import_nested {#input_format_arrow_import_nested}
|
||||
|
||||
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/index.md) columns as an array of structs in [Arrow](../../interfaces/formats.md/#data_types-matching-arrow) input format.
|
||||
|
||||
Possible values:
|
||||
|
||||
- 0 — Data can not be inserted into `Nested` columns as an array of structs.
|
||||
- 1 — Data can be inserted into `Nested` columns as an array of structs.
|
||||
|
||||
Default value: `0`.
|
||||
|
||||
### input_format_arrow_case_insensitive_column_matching {#input_format_arrow_case_insensitive_column_matching}
|
||||
|
||||
Ignore case when matching Arrow column names with ClickHouse column names.
|
||||
@ -1172,17 +1161,6 @@ Default value: `lz4_frame`.
|
||||
|
||||
## ORC format settings {#orc-format-settings}
|
||||
|
||||
### input_format_orc_import_nested {#input_format_orc_import_nested}
|
||||
|
||||
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/index.md) columns as an array of structs in [ORC](../../interfaces/formats.md/#data-format-orc) input format.
|
||||
|
||||
Possible values:
|
||||
|
||||
- 0 — Data can not be inserted into `Nested` columns as an array of structs.
|
||||
- 1 — Data can be inserted into `Nested` columns as an array of structs.
|
||||
|
||||
Default value: `0`.
|
||||
|
||||
### input_format_orc_row_batch_size {#input_format_orc_row_batch_size}
|
||||
|
||||
Batch size when reading ORC stripes.
|
||||
@ -1221,17 +1199,6 @@ Default value: `none`.
|
||||
|
||||
## Parquet format settings {#parquet-format-settings}
|
||||
|
||||
### input_format_parquet_import_nested {#input_format_parquet_import_nested}
|
||||
|
||||
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/index.md) columns as an array of structs in [Parquet](../../interfaces/formats.md/#data-format-parquet) input format.
|
||||
|
||||
Possible values:
|
||||
|
||||
- 0 — Data can not be inserted into `Nested` columns as an array of structs.
|
||||
- 1 — Data can be inserted into `Nested` columns as an array of structs.
|
||||
|
||||
Default value: `0`.
|
||||
|
||||
### input_format_parquet_case_insensitive_column_matching {#input_format_parquet_case_insensitive_column_matching}
|
||||
|
||||
Ignore case when matching Parquet column names with ClickHouse column names.
|
||||
|
@ -34,7 +34,13 @@ The binary you just downloaded can run all sorts of ClickHouse tools and utiliti
|
||||
|
||||
A common use of `clickhouse-local` is to run ad-hoc queries on files: where you don't have to insert the data into a table. `clickhouse-local` can stream the data from a file into a temporary table and execute your SQL.
|
||||
|
||||
If the file is sitting on the same machine as `clickhouse-local`, use the `file` table engine. The following `reviews.tsv` file contains a sampling of Amazon product reviews:
|
||||
If the file is sitting on the same machine as `clickhouse-local`, you can simple specify the file to load. The following `reviews.tsv` file contains a sampling of Amazon product reviews:
|
||||
|
||||
```bash
|
||||
./clickhouse local -q "SELECT * FROM 'reviews.tsv'"
|
||||
```
|
||||
|
||||
This command is a shortcut of:
|
||||
|
||||
```bash
|
||||
./clickhouse local -q "SELECT * FROM file('reviews.tsv')"
|
||||
|
@ -36,6 +36,8 @@ These `ALTER` statements modify entities related to role-based access control:
|
||||
|
||||
[ALTER TABLE ... MODIFY COMMENT](/docs/en/sql-reference/statements/alter/comment.md) statement adds, modifies, or removes comments to the table, regardless if it was set before or not.
|
||||
|
||||
[ALTER NAMED COLLECTION](/docs/en/sql-reference/statements/alter/named-collection.md) statement modifies [Named Collections](/docs/en/operations/named-collections.md).
|
||||
|
||||
## Mutations
|
||||
|
||||
`ALTER` queries that are intended to manipulate table data are implemented with a mechanism called “mutations”, most notably [ALTER TABLE … DELETE](/docs/en/sql-reference/statements/alter/delete.md) and [ALTER TABLE … UPDATE](/docs/en/sql-reference/statements/alter/update.md). They are asynchronous background processes similar to merges in [MergeTree](/docs/en/engines/table-engines/mergetree-family/index.md) tables that to produce new “mutated” versions of parts.
|
||||
|
30
docs/en/sql-reference/statements/alter/named-collection.md
Normal file
30
docs/en/sql-reference/statements/alter/named-collection.md
Normal file
@ -0,0 +1,30 @@
|
||||
---
|
||||
slug: /en/sql-reference/statements/alter/named-collection
|
||||
sidebar_label: NAMED COLLECTION
|
||||
---
|
||||
|
||||
# ALTER NAMED COLLECTION
|
||||
|
||||
This query intends to modify already existing named collections.
|
||||
|
||||
**Syntax**
|
||||
|
||||
```sql
|
||||
ALTER NAMED COLLECTION [IF EXISTS] name [ON CLUSTER cluster]
|
||||
[ SET
|
||||
key_name1 = 'some value',
|
||||
key_name2 = 'some value',
|
||||
key_name3 = 'some value',
|
||||
... ] |
|
||||
[ DELETE key_name4, key_name5, ... ]
|
||||
```
|
||||
|
||||
**Example**
|
||||
|
||||
```sql
|
||||
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
|
||||
|
||||
ALTER NAMED COLLECTION foobar SET a = '2', c = '3';
|
||||
|
||||
ALTER NAMED COLLECTION foobar DELETE b;
|
||||
```
|
@ -8,13 +8,14 @@ sidebar_label: CREATE
|
||||
|
||||
Create queries make a new entity of one of the following kinds:
|
||||
|
||||
- [DATABASE](../../../sql-reference/statements/create/database.md)
|
||||
- [TABLE](../../../sql-reference/statements/create/table.md)
|
||||
- [VIEW](../../../sql-reference/statements/create/view.md)
|
||||
- [DICTIONARY](../../../sql-reference/statements/create/dictionary.md)
|
||||
- [FUNCTION](../../../sql-reference/statements/create/function.md)
|
||||
- [USER](../../../sql-reference/statements/create/user.md)
|
||||
- [ROLE](../../../sql-reference/statements/create/role.md)
|
||||
- [ROW POLICY](../../../sql-reference/statements/create/row-policy.md)
|
||||
- [QUOTA](../../../sql-reference/statements/create/quota.md)
|
||||
- [SETTINGS PROFILE](../../../sql-reference/statements/create/settings-profile.md)
|
||||
- [DATABASE](/docs/en/sql-reference/statements/create/database.md)
|
||||
- [TABLE](/docs/en/sql-reference/statements/create/table.md)
|
||||
- [VIEW](/docs/en/sql-reference/statements/create/view.md)
|
||||
- [DICTIONARY](/docs/en/sql-reference/statements/create/dictionary.md)
|
||||
- [FUNCTION](/docs/en/sql-reference/statements/create/function.md)
|
||||
- [USER](/docs/en/sql-reference/statements/create/user.md)
|
||||
- [ROLE](/docs/en/sql-reference/statements/create/role.md)
|
||||
- [ROW POLICY](/docs/en/sql-reference/statements/create/row-policy.md)
|
||||
- [QUOTA](/docs/en/sql-reference/statements/create/quota.md)
|
||||
- [SETTINGS PROFILE](/docs/en/sql-reference/statements/create/settings-profile.md)
|
||||
- [NAMED COLLECTION](/docs/en/sql-reference/statements/create/named-collection.md)
|
||||
|
34
docs/en/sql-reference/statements/create/named-collection.md
Normal file
34
docs/en/sql-reference/statements/create/named-collection.md
Normal file
@ -0,0 +1,34 @@
|
||||
---
|
||||
slug: /en/sql-reference/statements/create/named-collection
|
||||
sidebar_label: NAMED COLLECTION
|
||||
---
|
||||
|
||||
# CREATE NAMED COLLECTION
|
||||
|
||||
Creates a new named collection.
|
||||
|
||||
**Syntax**
|
||||
|
||||
```sql
|
||||
CREATE NAMED COLLECTION [IF NOT EXISTS] name [ON CLUSTER cluster] AS
|
||||
key_name1 = 'some value',
|
||||
key_name2 = 'some value',
|
||||
key_name3 = 'some value',
|
||||
...
|
||||
```
|
||||
|
||||
**Example**
|
||||
|
||||
```sql
|
||||
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
|
||||
```
|
||||
|
||||
**Related statements**
|
||||
|
||||
- [CREATE NAMED COLLECTION](https://clickhouse.com/docs/en/sql-reference/statements/alter/named-collection)
|
||||
- [DROP NAMED COLLECTION](https://clickhouse.com/docs/en/sql-reference/statements/drop#drop-function)
|
||||
|
||||
|
||||
**See Also**
|
||||
|
||||
- [Named collections guide](/docs/en/operations/named-collections.md)
|
@ -119,3 +119,20 @@ DROP FUNCTION [IF EXISTS] function_name [on CLUSTER cluster]
|
||||
CREATE FUNCTION linear_equation AS (x, k, b) -> k*x + b;
|
||||
DROP FUNCTION linear_equation;
|
||||
```
|
||||
|
||||
## DROP NAMED COLLECTION
|
||||
|
||||
Deletes a named collection.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
DROP NAMED COLLECTION [IF EXISTS] name [on CLUSTER cluster]
|
||||
```
|
||||
|
||||
**Example**
|
||||
|
||||
``` sql
|
||||
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
|
||||
DROP NAMED COLLECTION foobar;
|
||||
```
|
||||
|
@ -21,7 +21,7 @@ iceberg(url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure])
|
||||
- `format` — The [format](/docs/en/interfaces/formats.md/#formats) of the file. By default `Parquet` is used.
|
||||
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
|
||||
|
||||
Engine parameters can be specified using [Named Collections](../../operations/named-collections.md)
|
||||
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
|
@ -1353,8 +1353,6 @@ ClickHouse поддерживает настраиваемую точность
|
||||
$ cat {filename} | clickhouse-client --query="INSERT INTO {some_table} FORMAT Parquet"
|
||||
```
|
||||
|
||||
Чтобы вставить данные в колонки типа [Nested](../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур, нужно включить настройку [input_format_parquet_import_nested](../operations/settings/settings.md#input_format_parquet_import_nested).
|
||||
|
||||
Чтобы получить данные из таблицы ClickHouse и сохранить их в файл формата Parquet, используйте команду следующего вида:
|
||||
|
||||
``` bash
|
||||
@ -1413,8 +1411,6 @@ ClickHouse поддерживает настраиваемую точность
|
||||
$ cat filename.arrow | clickhouse-client --query="INSERT INTO some_table FORMAT Arrow"
|
||||
```
|
||||
|
||||
Чтобы вставить данные в колонки типа [Nested](../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур, нужно включить настройку [input_format_arrow_import_nested](../operations/settings/settings.md#input_format_arrow_import_nested).
|
||||
|
||||
### Вывод данных {#selecting-data-arrow}
|
||||
|
||||
Чтобы получить данные из таблицы ClickHouse и сохранить их в файл формата Arrow, используйте команду следующего вида:
|
||||
@ -1471,8 +1467,6 @@ ClickHouse поддерживает настраиваемую точность
|
||||
$ cat filename.orc | clickhouse-client --query="INSERT INTO some_table FORMAT ORC"
|
||||
```
|
||||
|
||||
Чтобы вставить данные в колонки типа [Nested](../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур, нужно включить настройку [input_format_orc_import_nested](../operations/settings/settings.md#input_format_orc_import_nested).
|
||||
|
||||
### Вывод данных {#selecting-data-2}
|
||||
|
||||
Чтобы получить данные из таблицы ClickHouse и сохранить их в файл формата ORC, используйте команду следующего вида:
|
||||
|
@ -238,39 +238,6 @@ ClickHouse применяет настройку в тех случаях, ко
|
||||
|
||||
В случае превышения `input_format_allow_errors_ratio` ClickHouse генерирует исключение.
|
||||
|
||||
## input_format_parquet_import_nested {#input_format_parquet_import_nested}
|
||||
|
||||
Включает или отключает возможность вставки данных в колонки типа [Nested](../../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур в формате ввода [Parquet](../../interfaces/formats.md#data-format-parquet).
|
||||
|
||||
Возможные значения:
|
||||
|
||||
- 0 — данные не могут быть вставлены в колонки типа `Nested` в виде массива структур.
|
||||
- 0 — данные могут быть вставлены в колонки типа `Nested` в виде массива структур.
|
||||
|
||||
Значение по умолчанию: `0`.
|
||||
|
||||
## input_format_arrow_import_nested {#input_format_arrow_import_nested}
|
||||
|
||||
Включает или отключает возможность вставки данных в колонки типа [Nested](../../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур в формате ввода [Arrow](../../interfaces/formats.md#data_types-matching-arrow).
|
||||
|
||||
Возможные значения:
|
||||
|
||||
- 0 — данные не могут быть вставлены в колонки типа `Nested` в виде массива структур.
|
||||
- 0 — данные могут быть вставлены в колонки типа `Nested` в виде массива структур.
|
||||
|
||||
Значение по умолчанию: `0`.
|
||||
|
||||
## input_format_orc_import_nested {#input_format_orc_import_nested}
|
||||
|
||||
Включает или отключает возможность вставки данных в колонки типа [Nested](../../sql-reference/data-types/nested-data-structures/nested.md) в виде массива структур в формате ввода [ORC](../../interfaces/formats.md#data-format-orc).
|
||||
|
||||
Возможные значения:
|
||||
|
||||
- 0 — данные не могут быть вставлены в колонки типа `Nested` в виде массива структур.
|
||||
- 0 — данные могут быть вставлены в колонки типа `Nested` в виде массива структур.
|
||||
|
||||
Значение по умолчанию: `0`.
|
||||
|
||||
## input_format_values_interpret_expressions {#settings-input_format_values_interpret_expressions}
|
||||
|
||||
Включает или отключает парсер SQL, если потоковый парсер не может проанализировать данные. Этот параметр используется только для формата [Values](../../interfaces/formats.md#data-format-values) при вставке данных. Дополнительные сведения о парсерах читайте в разделе [Синтаксис](../../sql-reference/syntax.md).
|
||||
|
@ -77,7 +77,7 @@ void SetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
|
||||
client->zookeeper->set(
|
||||
client->getAbsolutePath(query->args[0].safeGet<String>()),
|
||||
query->args[1].safeGet<String>(),
|
||||
static_cast<Int32>(query->args[2].safeGet<Int64>()));
|
||||
static_cast<Int32>(query->args[2].get<Int32>()));
|
||||
}
|
||||
|
||||
bool CreateCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
|
||||
|
@ -1650,6 +1650,7 @@ try
|
||||
database_catalog.initializeAndLoadTemporaryDatabase();
|
||||
loadMetadataSystem(global_context);
|
||||
maybeConvertSystemDatabase(global_context);
|
||||
startupSystemTables();
|
||||
/// After attaching system databases we can initialize system log.
|
||||
global_context->initializeSystemLogs();
|
||||
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
|
||||
@ -1668,7 +1669,6 @@ try
|
||||
/// Then, load remaining databases
|
||||
loadMetadata(global_context, default_database);
|
||||
convertDatabasesEnginesIfNeed(global_context);
|
||||
startupSystemTables();
|
||||
database_catalog.startupBackgroundCleanup();
|
||||
/// After loading validate that default database exists
|
||||
database_catalog.assertDatabaseExists(default_database);
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Access/DiskAccessStorage.h>
|
||||
#include <Access/LDAPAccessStorage.h>
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Access/EnabledSettings.h>
|
||||
#include <Access/EnabledRolesInfo.h>
|
||||
#include <Access/RoleCache.h>
|
||||
#include <Access/RowPolicyCache.h>
|
||||
@ -729,6 +730,14 @@ std::shared_ptr<const EnabledRoles> AccessControl::getEnabledRoles(
|
||||
}
|
||||
|
||||
|
||||
std::shared_ptr<const EnabledRolesInfo> AccessControl::getEnabledRolesInfo(
|
||||
const std::vector<UUID> & current_roles,
|
||||
const std::vector<UUID> & current_roles_with_admin_option) const
|
||||
{
|
||||
return getEnabledRoles(current_roles, current_roles_with_admin_option)->getRolesInfo();
|
||||
}
|
||||
|
||||
|
||||
std::shared_ptr<const EnabledRowPolicies> AccessControl::getEnabledRowPolicies(const UUID & user_id, const boost::container::flat_set<UUID> & enabled_roles) const
|
||||
{
|
||||
return row_policy_cache->getEnabledRowPolicies(user_id, enabled_roles);
|
||||
@ -772,6 +781,15 @@ std::shared_ptr<const EnabledSettings> AccessControl::getEnabledSettings(
|
||||
return settings_profiles_cache->getEnabledSettings(user_id, settings_from_user, enabled_roles, settings_from_enabled_roles);
|
||||
}
|
||||
|
||||
std::shared_ptr<const SettingsProfilesInfo> AccessControl::getEnabledSettingsInfo(
|
||||
const UUID & user_id,
|
||||
const SettingsProfileElements & settings_from_user,
|
||||
const boost::container::flat_set<UUID> & enabled_roles,
|
||||
const SettingsProfileElements & settings_from_enabled_roles) const
|
||||
{
|
||||
return getEnabledSettings(user_id, settings_from_user, enabled_roles, settings_from_enabled_roles)->getInfo();
|
||||
}
|
||||
|
||||
std::shared_ptr<const SettingsProfilesInfo> AccessControl::getSettingsProfileInfo(const UUID & profile_id)
|
||||
{
|
||||
return settings_profiles_cache->getSettingsProfileInfo(profile_id);
|
||||
|
@ -29,6 +29,7 @@ class ContextAccessParams;
|
||||
struct User;
|
||||
using UserPtr = std::shared_ptr<const User>;
|
||||
class EnabledRoles;
|
||||
struct EnabledRolesInfo;
|
||||
class RoleCache;
|
||||
class EnabledRowPolicies;
|
||||
class RowPolicyCache;
|
||||
@ -187,6 +188,10 @@ public:
|
||||
const std::vector<UUID> & current_roles,
|
||||
const std::vector<UUID> & current_roles_with_admin_option) const;
|
||||
|
||||
std::shared_ptr<const EnabledRolesInfo> getEnabledRolesInfo(
|
||||
const std::vector<UUID> & current_roles,
|
||||
const std::vector<UUID> & current_roles_with_admin_option) const;
|
||||
|
||||
std::shared_ptr<const EnabledRowPolicies> getEnabledRowPolicies(
|
||||
const UUID & user_id,
|
||||
const boost::container::flat_set<UUID> & enabled_roles) const;
|
||||
@ -209,6 +214,12 @@ public:
|
||||
const boost::container::flat_set<UUID> & enabled_roles,
|
||||
const SettingsProfileElements & settings_from_enabled_roles) const;
|
||||
|
||||
std::shared_ptr<const SettingsProfilesInfo> getEnabledSettingsInfo(
|
||||
const UUID & user_id,
|
||||
const SettingsProfileElements & settings_from_user,
|
||||
const boost::container::flat_set<UUID> & enabled_roles,
|
||||
const SettingsProfileElements & settings_from_enabled_roles) const;
|
||||
|
||||
std::shared_ptr<const SettingsProfilesInfo> getSettingsProfileInfo(const UUID & profile_id);
|
||||
|
||||
const ExternalAuthenticators & getExternalAuthenticators() const;
|
||||
|
@ -6887,13 +6887,12 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
}
|
||||
|
||||
std::erase_if(with_nodes, [](const QueryTreeNodePtr & node)
|
||||
{
|
||||
auto * subquery_node = node->as<QueryNode>();
|
||||
auto * union_node = node->as<UnionNode>();
|
||||
|
||||
return (subquery_node && subquery_node->isCTE()) || (union_node && union_node->isCTE());
|
||||
});
|
||||
/** WITH section can be safely removed, because WITH section only can provide aliases to query expressions
|
||||
* and CTE for other sections to use.
|
||||
*
|
||||
* Example: WITH 1 AS constant, (x -> x + 1) AS lambda, a AS (SELECT * FROM test_table);
|
||||
*/
|
||||
query_node_typed.getWith().getNodes().clear();
|
||||
|
||||
for (auto & window_node : query_node_typed.getWindow().getNodes())
|
||||
{
|
||||
@ -6952,9 +6951,6 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
}
|
||||
|
||||
if (query_node_typed.hasWith())
|
||||
resolveExpressionNodeList(query_node_typed.getWithNode(), scope, true /*allow_lambda_expression*/, false /*allow_table_expression*/);
|
||||
|
||||
if (query_node_typed.getPrewhere())
|
||||
resolveExpressionNode(query_node_typed.getPrewhere(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
|
||||
|
||||
@ -7123,13 +7119,6 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
}
|
||||
|
||||
/** WITH section can be safely removed, because WITH section only can provide aliases to query expressions
|
||||
* and CTE for other sections to use.
|
||||
*
|
||||
* Example: WITH 1 AS constant, (x -> x + 1) AS lambda, a AS (SELECT * FROM test_table);
|
||||
*/
|
||||
query_node_typed.getWith().getNodes().clear();
|
||||
|
||||
/** WINDOW section can be safely removed, because WINDOW section can only provide window definition to window functions.
|
||||
*
|
||||
* Example: SELECT count(*) OVER w FROM test_table WINDOW w AS (PARTITION BY id);
|
||||
|
@ -77,10 +77,12 @@ BackupEntriesCollector::BackupEntriesCollector(
|
||||
const ASTBackupQuery::Elements & backup_query_elements_,
|
||||
const BackupSettings & backup_settings_,
|
||||
std::shared_ptr<IBackupCoordination> backup_coordination_,
|
||||
const ReadSettings & read_settings_,
|
||||
const ContextPtr & context_)
|
||||
: backup_query_elements(backup_query_elements_)
|
||||
, backup_settings(backup_settings_)
|
||||
, backup_coordination(backup_coordination_)
|
||||
, read_settings(read_settings_)
|
||||
, context(context_)
|
||||
, on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000))
|
||||
, consistent_metadata_snapshot_timeout(context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 600000))
|
||||
|
@ -30,6 +30,7 @@ public:
|
||||
BackupEntriesCollector(const ASTBackupQuery::Elements & backup_query_elements_,
|
||||
const BackupSettings & backup_settings_,
|
||||
std::shared_ptr<IBackupCoordination> backup_coordination_,
|
||||
const ReadSettings & read_settings_,
|
||||
const ContextPtr & context_);
|
||||
~BackupEntriesCollector();
|
||||
|
||||
@ -40,6 +41,7 @@ public:
|
||||
|
||||
const BackupSettings & getBackupSettings() const { return backup_settings; }
|
||||
std::shared_ptr<IBackupCoordination> getBackupCoordination() const { return backup_coordination; }
|
||||
const ReadSettings & getReadSettings() const { return read_settings; }
|
||||
ContextPtr getContext() const { return context; }
|
||||
|
||||
/// Adds a backup entry which will be later returned by run().
|
||||
@ -93,6 +95,7 @@ private:
|
||||
const ASTBackupQuery::Elements backup_query_elements;
|
||||
const BackupSettings backup_settings;
|
||||
std::shared_ptr<IBackupCoordination> backup_coordination;
|
||||
const ReadSettings read_settings;
|
||||
ContextPtr context;
|
||||
std::chrono::milliseconds on_cluster_first_sync_timeout;
|
||||
std::chrono::milliseconds consistent_metadata_snapshot_timeout;
|
||||
|
@ -57,7 +57,7 @@ UInt64 BackupEntryFromImmutableFile::getSize() const
|
||||
return *file_size;
|
||||
}
|
||||
|
||||
UInt128 BackupEntryFromImmutableFile::getChecksum() const
|
||||
UInt128 BackupEntryFromImmutableFile::getChecksum(const ReadSettings & read_settings) const
|
||||
{
|
||||
{
|
||||
std::lock_guard lock{size_and_checksum_mutex};
|
||||
@ -73,7 +73,7 @@ UInt128 BackupEntryFromImmutableFile::getChecksum() const
|
||||
}
|
||||
}
|
||||
|
||||
auto calculated_checksum = BackupEntryWithChecksumCalculation<IBackupEntry>::getChecksum();
|
||||
auto calculated_checksum = BackupEntryWithChecksumCalculation<IBackupEntry>::getChecksum(read_settings);
|
||||
|
||||
{
|
||||
std::lock_guard lock{size_and_checksum_mutex};
|
||||
@ -86,13 +86,13 @@ UInt128 BackupEntryFromImmutableFile::getChecksum() const
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<UInt128> BackupEntryFromImmutableFile::getPartialChecksum(size_t prefix_length) const
|
||||
std::optional<UInt128> BackupEntryFromImmutableFile::getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const
|
||||
{
|
||||
if (prefix_length == 0)
|
||||
return 0;
|
||||
|
||||
if (prefix_length >= getSize())
|
||||
return getChecksum();
|
||||
return getChecksum(read_settings);
|
||||
|
||||
/// For immutable files we don't use partial checksums.
|
||||
return std::nullopt;
|
||||
|
@ -27,8 +27,8 @@ public:
|
||||
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const override;
|
||||
|
||||
UInt64 getSize() const override;
|
||||
UInt128 getChecksum() const override;
|
||||
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override;
|
||||
UInt128 getChecksum(const ReadSettings & read_settings) const override;
|
||||
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override;
|
||||
|
||||
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
|
||||
bool isEncryptedByDisk() const override { return copy_encrypted; }
|
||||
|
@ -11,17 +11,17 @@ namespace DB
|
||||
{
|
||||
namespace
|
||||
{
|
||||
String readFile(const String & file_path)
|
||||
String readFile(const String & file_path, const ReadSettings & read_settings)
|
||||
{
|
||||
auto buf = createReadBufferFromFileBase(file_path, /* settings= */ {});
|
||||
auto buf = createReadBufferFromFileBase(file_path, read_settings);
|
||||
String s;
|
||||
readStringUntilEOF(s, *buf);
|
||||
return s;
|
||||
}
|
||||
|
||||
String readFile(const DiskPtr & disk, const String & file_path, bool copy_encrypted)
|
||||
String readFile(const DiskPtr & disk, const String & file_path, const ReadSettings & read_settings, bool copy_encrypted)
|
||||
{
|
||||
auto buf = copy_encrypted ? disk->readEncryptedFile(file_path, {}) : disk->readFile(file_path);
|
||||
auto buf = copy_encrypted ? disk->readEncryptedFile(file_path, read_settings) : disk->readFile(file_path, read_settings);
|
||||
String s;
|
||||
readStringUntilEOF(s, *buf);
|
||||
return s;
|
||||
@ -29,19 +29,19 @@ namespace
|
||||
}
|
||||
|
||||
|
||||
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const String & file_path_)
|
||||
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const String & file_path_, const ReadSettings & read_settings_)
|
||||
: file_path(file_path_)
|
||||
, data_source_description(DiskLocal::getLocalDataSourceDescription(file_path_))
|
||||
, data(readFile(file_path_))
|
||||
, data(readFile(file_path_, read_settings_))
|
||||
{
|
||||
}
|
||||
|
||||
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, bool copy_encrypted_)
|
||||
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, const ReadSettings & read_settings_, bool copy_encrypted_)
|
||||
: disk(disk_)
|
||||
, file_path(file_path_)
|
||||
, data_source_description(disk_->getDataSourceDescription())
|
||||
, copy_encrypted(copy_encrypted_ && data_source_description.is_encrypted)
|
||||
, data(readFile(disk_, file_path, copy_encrypted))
|
||||
, data(readFile(disk_, file_path, read_settings_, copy_encrypted))
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -13,8 +13,8 @@ using DiskPtr = std::shared_ptr<IDisk>;
|
||||
class BackupEntryFromSmallFile : public BackupEntryWithChecksumCalculation<IBackupEntry>
|
||||
{
|
||||
public:
|
||||
explicit BackupEntryFromSmallFile(const String & file_path_);
|
||||
BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, bool copy_encrypted_ = false);
|
||||
explicit BackupEntryFromSmallFile(const String & file_path_, const ReadSettings & read_settings_);
|
||||
BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, const ReadSettings & read_settings_, bool copy_encrypted_ = false);
|
||||
|
||||
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings &) const override;
|
||||
UInt64 getSize() const override { return data.size(); }
|
||||
|
@ -6,7 +6,7 @@ namespace DB
|
||||
{
|
||||
|
||||
template <typename Base>
|
||||
UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum() const
|
||||
UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum(const ReadSettings & read_settings) const
|
||||
{
|
||||
{
|
||||
std::lock_guard lock{checksum_calculation_mutex};
|
||||
@ -26,7 +26,7 @@ UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum() const
|
||||
}
|
||||
else
|
||||
{
|
||||
auto read_buffer = this->getReadBuffer(ReadSettings{}.adjustBufferSize(size));
|
||||
auto read_buffer = this->getReadBuffer(read_settings.adjustBufferSize(size));
|
||||
HashingReadBuffer hashing_read_buffer(*read_buffer);
|
||||
hashing_read_buffer.ignoreAll();
|
||||
calculated_checksum = hashing_read_buffer.getHash();
|
||||
@ -37,23 +37,20 @@ UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum() const
|
||||
}
|
||||
|
||||
template <typename Base>
|
||||
std::optional<UInt128> BackupEntryWithChecksumCalculation<Base>::getPartialChecksum(size_t prefix_length) const
|
||||
std::optional<UInt128> BackupEntryWithChecksumCalculation<Base>::getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const
|
||||
{
|
||||
if (prefix_length == 0)
|
||||
return 0;
|
||||
|
||||
size_t size = this->getSize();
|
||||
if (prefix_length >= size)
|
||||
return this->getChecksum();
|
||||
return this->getChecksum(read_settings);
|
||||
|
||||
std::lock_guard lock{checksum_calculation_mutex};
|
||||
|
||||
ReadSettings read_settings;
|
||||
if (calculated_checksum)
|
||||
read_settings.adjustBufferSize(calculated_checksum ? prefix_length : size);
|
||||
|
||||
auto read_buffer = this->getReadBuffer(read_settings);
|
||||
auto read_buffer = this->getReadBuffer(read_settings.adjustBufferSize(calculated_checksum ? prefix_length : size));
|
||||
HashingReadBuffer hashing_read_buffer(*read_buffer);
|
||||
|
||||
hashing_read_buffer.ignore(prefix_length);
|
||||
auto partial_checksum = hashing_read_buffer.getHash();
|
||||
|
||||
|
@ -11,8 +11,8 @@ template <typename Base>
|
||||
class BackupEntryWithChecksumCalculation : public Base
|
||||
{
|
||||
public:
|
||||
UInt128 getChecksum() const override;
|
||||
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override;
|
||||
UInt128 getChecksum(const ReadSettings & read_settings) const override;
|
||||
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override;
|
||||
|
||||
private:
|
||||
mutable std::optional<UInt128> calculated_checksum;
|
||||
|
@ -17,8 +17,8 @@ public:
|
||||
|
||||
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const override { return entry->getReadBuffer(read_settings); }
|
||||
UInt64 getSize() const override { return entry->getSize(); }
|
||||
UInt128 getChecksum() const override { return entry->getChecksum(); }
|
||||
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override { return entry->getPartialChecksum(prefix_length); }
|
||||
UInt128 getChecksum(const ReadSettings & read_settings) const override { return entry->getChecksum(read_settings); }
|
||||
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override { return entry->getPartialChecksum(prefix_length, read_settings); }
|
||||
DataSourceDescription getDataSourceDescription() const override { return entry->getDataSourceDescription(); }
|
||||
bool isEncryptedByDisk() const override { return entry->isEncryptedByDisk(); }
|
||||
bool isFromFile() const override { return entry->isFromFile(); }
|
||||
|
@ -3,6 +3,8 @@
|
||||
#include <Backups/IBackup.h>
|
||||
#include <Backups/BackupInfo.h>
|
||||
#include <Core/Types.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <IO/WriteSettings.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <memory>
|
||||
@ -37,6 +39,8 @@ public:
|
||||
std::optional<UUID> backup_uuid;
|
||||
bool deduplicate_files = true;
|
||||
bool allow_s3_native_copy = true;
|
||||
ReadSettings read_settings;
|
||||
WriteSettings write_settings;
|
||||
};
|
||||
|
||||
static BackupFactory & instance();
|
||||
|
@ -57,12 +57,12 @@ namespace
|
||||
|
||||
/// Calculate checksum for backup entry if it's empty.
|
||||
/// Also able to calculate additional checksum of some prefix.
|
||||
ChecksumsForNewEntry calculateNewEntryChecksumsIfNeeded(const BackupEntryPtr & entry, size_t prefix_size)
|
||||
ChecksumsForNewEntry calculateNewEntryChecksumsIfNeeded(const BackupEntryPtr & entry, size_t prefix_size, const ReadSettings & read_settings)
|
||||
{
|
||||
ChecksumsForNewEntry res;
|
||||
/// The partial checksum should be calculated before the full checksum to enable optimization in BackupEntryWithChecksumCalculation.
|
||||
res.prefix_checksum = entry->getPartialChecksum(prefix_size);
|
||||
res.full_checksum = entry->getChecksum();
|
||||
res.prefix_checksum = entry->getPartialChecksum(prefix_size, read_settings);
|
||||
res.full_checksum = entry->getChecksum(read_settings);
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -93,7 +93,12 @@ String BackupFileInfo::describe() const
|
||||
}
|
||||
|
||||
|
||||
BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const BackupEntryPtr & backup_entry, const BackupPtr & base_backup, Poco::Logger * log)
|
||||
BackupFileInfo buildFileInfoForBackupEntry(
|
||||
const String & file_name,
|
||||
const BackupEntryPtr & backup_entry,
|
||||
const BackupPtr & base_backup,
|
||||
const ReadSettings & read_settings,
|
||||
Poco::Logger * log)
|
||||
{
|
||||
auto adjusted_path = removeLeadingSlash(file_name);
|
||||
|
||||
@ -126,7 +131,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
|
||||
/// File with the same name but smaller size exist in previous backup
|
||||
if (check_base == CheckBackupResult::HasPrefix)
|
||||
{
|
||||
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, base_backup_file_info->first);
|
||||
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, base_backup_file_info->first, read_settings);
|
||||
info.checksum = checksums.full_checksum;
|
||||
|
||||
/// We have prefix of this file in backup with the same checksum.
|
||||
@ -146,7 +151,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
|
||||
{
|
||||
/// We have full file or have nothing, first of all let's get checksum
|
||||
/// of current file
|
||||
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0);
|
||||
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0, read_settings);
|
||||
info.checksum = checksums.full_checksum;
|
||||
|
||||
if (info.checksum == base_backup_file_info->second)
|
||||
@ -169,7 +174,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
|
||||
}
|
||||
else
|
||||
{
|
||||
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0);
|
||||
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0, read_settings);
|
||||
info.checksum = checksums.full_checksum;
|
||||
}
|
||||
|
||||
@ -188,7 +193,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
|
||||
return info;
|
||||
}
|
||||
|
||||
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, ThreadPool & thread_pool)
|
||||
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, const ReadSettings & read_settings, ThreadPool & thread_pool)
|
||||
{
|
||||
BackupFileInfos infos;
|
||||
infos.resize(backup_entries.size());
|
||||
@ -210,7 +215,7 @@ BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entr
|
||||
++num_active_jobs;
|
||||
}
|
||||
|
||||
auto job = [&mutex, &num_active_jobs, &event, &exception, &infos, &backup_entries, &base_backup, &thread_group, i, log](bool async)
|
||||
auto job = [&mutex, &num_active_jobs, &event, &exception, &infos, &backup_entries, &read_settings, &base_backup, &thread_group, i, log](bool async)
|
||||
{
|
||||
SCOPE_EXIT_SAFE({
|
||||
std::lock_guard lock{mutex};
|
||||
@ -237,7 +242,7 @@ BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entr
|
||||
return;
|
||||
}
|
||||
|
||||
infos[i] = buildFileInfoForBackupEntry(name, entry, base_backup, log);
|
||||
infos[i] = buildFileInfoForBackupEntry(name, entry, base_backup, read_settings, log);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -13,6 +13,7 @@ class IBackupEntry;
|
||||
using BackupPtr = std::shared_ptr<const IBackup>;
|
||||
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;
|
||||
using BackupEntries = std::vector<std::pair<String, BackupEntryPtr>>;
|
||||
struct ReadSettings;
|
||||
|
||||
|
||||
/// Information about a file stored in a backup.
|
||||
@ -66,9 +67,9 @@ struct BackupFileInfo
|
||||
using BackupFileInfos = std::vector<BackupFileInfo>;
|
||||
|
||||
/// Builds a BackupFileInfo for a specified backup entry.
|
||||
BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const BackupEntryPtr & backup_entry, const BackupPtr & base_backup, Poco::Logger * log);
|
||||
BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const BackupEntryPtr & backup_entry, const BackupPtr & base_backup, const ReadSettings & read_settings, Poco::Logger * log);
|
||||
|
||||
/// Builds a vector of BackupFileInfos for specified backup entries.
|
||||
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, ThreadPool & thread_pool);
|
||||
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, const ReadSettings & read_settings, ThreadPool & thread_pool);
|
||||
|
||||
}
|
||||
|
@ -4,17 +4,16 @@
|
||||
#include <IO/copyData.h>
|
||||
#include <IO/WriteBufferFromFileBase.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
BackupReaderDefault::BackupReaderDefault(Poco::Logger * log_, const ContextPtr & context_)
|
||||
BackupReaderDefault::BackupReaderDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_)
|
||||
: log(log_)
|
||||
, read_settings(context_->getBackupReadSettings())
|
||||
, write_settings(context_->getWriteSettings())
|
||||
, read_settings(read_settings_)
|
||||
, write_settings(write_settings_)
|
||||
, write_buffer_size(DBMS_DEFAULT_BUFFER_SIZE)
|
||||
{
|
||||
}
|
||||
@ -37,10 +36,10 @@ void BackupReaderDefault::copyFileToDisk(const String & path_in_backup, size_t f
|
||||
write_buffer->finalize();
|
||||
}
|
||||
|
||||
BackupWriterDefault::BackupWriterDefault(Poco::Logger * log_, const ContextPtr & context_)
|
||||
BackupWriterDefault::BackupWriterDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_)
|
||||
: log(log_)
|
||||
, read_settings(context_->getBackupReadSettings())
|
||||
, write_settings(context_->getWriteSettings())
|
||||
, read_settings(read_settings_)
|
||||
, write_settings(write_settings_)
|
||||
, write_buffer_size(DBMS_DEFAULT_BUFFER_SIZE)
|
||||
{
|
||||
}
|
||||
|
@ -3,7 +3,6 @@
|
||||
#include <Backups/BackupIO.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <IO/WriteSettings.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -19,7 +18,7 @@ enum class WriteMode;
|
||||
class BackupReaderDefault : public IBackupReader
|
||||
{
|
||||
public:
|
||||
BackupReaderDefault(Poco::Logger * log_, const ContextPtr & context_);
|
||||
BackupReaderDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_);
|
||||
~BackupReaderDefault() override = default;
|
||||
|
||||
/// The function copyFileToDisk() can be much faster than reading the file with readFile() and then writing it to some disk.
|
||||
@ -46,7 +45,7 @@ protected:
|
||||
class BackupWriterDefault : public IBackupWriter
|
||||
{
|
||||
public:
|
||||
BackupWriterDefault(Poco::Logger * log_, const ContextPtr & context_);
|
||||
BackupWriterDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_);
|
||||
~BackupWriterDefault() override = default;
|
||||
|
||||
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;
|
||||
|
@ -8,8 +8,8 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_)
|
||||
: BackupReaderDefault(&Poco::Logger::get("BackupReaderDisk"), context_)
|
||||
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
|
||||
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderDisk"))
|
||||
, disk(disk_)
|
||||
, root_path(root_path_)
|
||||
, data_source_description(disk->getDataSourceDescription())
|
||||
@ -56,8 +56,8 @@ void BackupReaderDisk::copyFileToDisk(const String & path_in_backup, size_t file
|
||||
}
|
||||
|
||||
|
||||
BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_)
|
||||
: BackupWriterDefault(&Poco::Logger::get("BackupWriterDisk"), context_)
|
||||
BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
|
||||
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterDisk"))
|
||||
, disk(disk_)
|
||||
, root_path(root_path_)
|
||||
, data_source_description(disk->getDataSourceDescription())
|
||||
|
@ -13,7 +13,7 @@ using DiskPtr = std::shared_ptr<IDisk>;
|
||||
class BackupReaderDisk : public BackupReaderDefault
|
||||
{
|
||||
public:
|
||||
BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_);
|
||||
BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
|
||||
~BackupReaderDisk() override;
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
@ -33,7 +33,7 @@ private:
|
||||
class BackupWriterDisk : public BackupWriterDefault
|
||||
{
|
||||
public:
|
||||
BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_);
|
||||
BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
|
||||
~BackupWriterDisk() override;
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
|
@ -16,8 +16,8 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
BackupReaderFile::BackupReaderFile(const String & root_path_, const ContextPtr & context_)
|
||||
: BackupReaderDefault(&Poco::Logger::get("BackupReaderFile"), context_)
|
||||
BackupReaderFile::BackupReaderFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
|
||||
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderFile"))
|
||||
, root_path(root_path_)
|
||||
, data_source_description(DiskLocal::getLocalDataSourceDescription(root_path))
|
||||
{
|
||||
@ -74,8 +74,8 @@ void BackupReaderFile::copyFileToDisk(const String & path_in_backup, size_t file
|
||||
}
|
||||
|
||||
|
||||
BackupWriterFile::BackupWriterFile(const String & root_path_, const ContextPtr & context_)
|
||||
: BackupWriterDefault(&Poco::Logger::get("BackupWriterFile"), context_)
|
||||
BackupWriterFile::BackupWriterFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
|
||||
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterFile"))
|
||||
, root_path(root_path_)
|
||||
, data_source_description(DiskLocal::getLocalDataSourceDescription(root_path))
|
||||
{
|
||||
|
@ -11,7 +11,7 @@ namespace DB
|
||||
class BackupReaderFile : public BackupReaderDefault
|
||||
{
|
||||
public:
|
||||
explicit BackupReaderFile(const String & root_path_, const ContextPtr & context_);
|
||||
explicit BackupReaderFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
UInt64 getFileSize(const String & file_name) override;
|
||||
@ -29,7 +29,7 @@ private:
|
||||
class BackupWriterFile : public BackupWriterDefault
|
||||
{
|
||||
public:
|
||||
BackupWriterFile(const String & root_path_, const ContextPtr & context_);
|
||||
BackupWriterFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
UInt64 getFileSize(const String & file_name) override;
|
||||
|
@ -101,8 +101,14 @@ namespace
|
||||
|
||||
|
||||
BackupReaderS3::BackupReaderS3(
|
||||
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ContextPtr & context_)
|
||||
: BackupReaderDefault(&Poco::Logger::get("BackupReaderS3"), context_)
|
||||
const S3::URI & s3_uri_,
|
||||
const String & access_key_id_,
|
||||
const String & secret_access_key_,
|
||||
bool allow_s3_native_copy,
|
||||
const ReadSettings & read_settings_,
|
||||
const WriteSettings & write_settings_,
|
||||
const ContextPtr & context_)
|
||||
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderS3"))
|
||||
, s3_uri(s3_uri_)
|
||||
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
|
||||
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
|
||||
@ -178,8 +184,15 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
|
||||
|
||||
|
||||
BackupWriterS3::BackupWriterS3(
|
||||
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ContextPtr & context_)
|
||||
: BackupWriterDefault(&Poco::Logger::get("BackupWriterS3"), context_)
|
||||
const S3::URI & s3_uri_,
|
||||
const String & access_key_id_,
|
||||
const String & secret_access_key_,
|
||||
bool allow_s3_native_copy,
|
||||
const String & storage_class_name,
|
||||
const ReadSettings & read_settings_,
|
||||
const WriteSettings & write_settings_,
|
||||
const ContextPtr & context_)
|
||||
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterS3"))
|
||||
, s3_uri(s3_uri_)
|
||||
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
|
||||
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
|
||||
|
@ -17,7 +17,7 @@ namespace DB
|
||||
class BackupReaderS3 : public BackupReaderDefault
|
||||
{
|
||||
public:
|
||||
BackupReaderS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ContextPtr & context_);
|
||||
BackupReaderS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
|
||||
~BackupReaderS3() override;
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
@ -38,7 +38,7 @@ private:
|
||||
class BackupWriterS3 : public BackupWriterDefault
|
||||
{
|
||||
public:
|
||||
BackupWriterS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ContextPtr & context_);
|
||||
BackupWriterS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
|
||||
~BackupWriterS3() override;
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
|
@ -27,6 +27,7 @@ namespace ErrorCodes
|
||||
M(Bool, decrypt_files_from_encrypted_disks) \
|
||||
M(Bool, deduplicate_files) \
|
||||
M(Bool, allow_s3_native_copy) \
|
||||
M(Bool, read_from_filesystem_cache) \
|
||||
M(UInt64, shard_num) \
|
||||
M(UInt64, replica_num) \
|
||||
M(Bool, internal) \
|
||||
|
@ -44,6 +44,10 @@ struct BackupSettings
|
||||
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
|
||||
bool allow_s3_native_copy = true;
|
||||
|
||||
/// Allow to use the filesystem cache in passive mode - benefit from the existing cache entries,
|
||||
/// but don't put more entries into the cache.
|
||||
bool read_from_filesystem_cache = true;
|
||||
|
||||
/// 1-based shard index to store in the backup. 0 means all shards.
|
||||
/// Can only be used with BACKUP ON CLUSTER.
|
||||
size_t shard_num = 0;
|
||||
|
@ -178,6 +178,42 @@ namespace
|
||||
{
|
||||
return status == BackupStatus::RESTORING;
|
||||
}
|
||||
|
||||
/// We use slightly different read and write settings for backup/restore
|
||||
/// with a separate throttler and limited usage of filesystem cache.
|
||||
ReadSettings getReadSettingsForBackup(const ContextPtr & context, const BackupSettings & backup_settings)
|
||||
{
|
||||
auto read_settings = context->getReadSettings();
|
||||
read_settings.remote_throttler = context->getBackupsThrottler();
|
||||
read_settings.local_throttler = context->getBackupsThrottler();
|
||||
read_settings.enable_filesystem_cache = backup_settings.read_from_filesystem_cache;
|
||||
read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = backup_settings.read_from_filesystem_cache;
|
||||
return read_settings;
|
||||
}
|
||||
|
||||
WriteSettings getWriteSettingsForBackup(const ContextPtr & context)
|
||||
{
|
||||
auto write_settings = context->getWriteSettings();
|
||||
write_settings.enable_filesystem_cache_on_write_operations = false;
|
||||
return write_settings;
|
||||
}
|
||||
|
||||
ReadSettings getReadSettingsForRestore(const ContextPtr & context)
|
||||
{
|
||||
auto read_settings = context->getReadSettings();
|
||||
read_settings.remote_throttler = context->getBackupsThrottler();
|
||||
read_settings.local_throttler = context->getBackupsThrottler();
|
||||
read_settings.enable_filesystem_cache = false;
|
||||
read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
|
||||
return read_settings;
|
||||
}
|
||||
|
||||
WriteSettings getWriteSettingsForRestore(const ContextPtr & context)
|
||||
{
|
||||
auto write_settings = context->getWriteSettings();
|
||||
write_settings.enable_filesystem_cache_on_write_operations = false;
|
||||
return write_settings;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -350,6 +386,8 @@ void BackupsWorker::doBackup(
|
||||
backup_create_params.backup_uuid = backup_settings.backup_uuid;
|
||||
backup_create_params.deduplicate_files = backup_settings.deduplicate_files;
|
||||
backup_create_params.allow_s3_native_copy = backup_settings.allow_s3_native_copy;
|
||||
backup_create_params.read_settings = getReadSettingsForBackup(context, backup_settings);
|
||||
backup_create_params.write_settings = getWriteSettingsForBackup(context);
|
||||
BackupMutablePtr backup = BackupFactory::instance().createBackup(backup_create_params);
|
||||
|
||||
/// Write the backup.
|
||||
@ -378,12 +416,12 @@ void BackupsWorker::doBackup(
|
||||
/// Prepare backup entries.
|
||||
BackupEntries backup_entries;
|
||||
{
|
||||
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, context};
|
||||
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, backup_create_params.read_settings, context};
|
||||
backup_entries = backup_entries_collector.run();
|
||||
}
|
||||
|
||||
/// Write the backup entries to the backup.
|
||||
buildFileInfosForBackupEntries(backup, backup_entries, backup_coordination);
|
||||
buildFileInfosForBackupEntries(backup, backup_entries, backup_create_params.read_settings, backup_coordination);
|
||||
writeBackupEntries(backup, std::move(backup_entries), backup_id, backup_coordination, backup_settings.internal);
|
||||
|
||||
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
|
||||
@ -433,12 +471,12 @@ void BackupsWorker::doBackup(
|
||||
}
|
||||
|
||||
|
||||
void BackupsWorker::buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, std::shared_ptr<IBackupCoordination> backup_coordination)
|
||||
void BackupsWorker::buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, const ReadSettings & read_settings, std::shared_ptr<IBackupCoordination> backup_coordination)
|
||||
{
|
||||
LOG_TRACE(log, "{}", Stage::BUILDING_FILE_INFOS);
|
||||
backup_coordination->setStage(Stage::BUILDING_FILE_INFOS, "");
|
||||
backup_coordination->waitForStage(Stage::BUILDING_FILE_INFOS);
|
||||
backup_coordination->addFileInfos(::DB::buildFileInfosForBackupEntries(backup_entries, backup->getBaseBackup(), *backups_thread_pool));
|
||||
backup_coordination->addFileInfos(::DB::buildFileInfosForBackupEntries(backup_entries, backup->getBaseBackup(), read_settings, *backups_thread_pool));
|
||||
}
|
||||
|
||||
|
||||
@ -650,6 +688,8 @@ void BackupsWorker::doRestore(
|
||||
backup_open_params.base_backup_info = restore_settings.base_backup_info;
|
||||
backup_open_params.password = restore_settings.password;
|
||||
backup_open_params.allow_s3_native_copy = restore_settings.allow_s3_native_copy;
|
||||
backup_open_params.read_settings = getReadSettingsForRestore(context);
|
||||
backup_open_params.write_settings = getWriteSettingsForRestore(context);
|
||||
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
|
||||
|
||||
String current_database = context->getCurrentDatabase();
|
||||
|
@ -24,6 +24,7 @@ using BackupPtr = std::shared_ptr<const IBackup>;
|
||||
class IBackupEntry;
|
||||
using BackupEntries = std::vector<std::pair<String, std::shared_ptr<const IBackupEntry>>>;
|
||||
using DataRestoreTasks = std::vector<std::function<void()>>;
|
||||
struct ReadSettings;
|
||||
|
||||
/// Manager of backups and restores: executes backups and restores' threads in the background.
|
||||
/// Keeps information about backups and restores started in this session.
|
||||
@ -107,7 +108,7 @@ private:
|
||||
bool called_async);
|
||||
|
||||
/// Builds file infos for specified backup entries.
|
||||
void buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, std::shared_ptr<IBackupCoordination> backup_coordination);
|
||||
void buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, const ReadSettings & read_settings, std::shared_ptr<IBackupCoordination> backup_coordination);
|
||||
|
||||
/// Write backup entries to an opened backup.
|
||||
void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, const OperationID & backup_id, std::shared_ptr<IBackupCoordination> backup_coordination, bool internal);
|
||||
|
@ -19,8 +19,8 @@ public:
|
||||
|
||||
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const override { return getInternalBackupEntry()->getReadBuffer(read_settings); }
|
||||
UInt64 getSize() const override { return getInternalBackupEntry()->getSize(); }
|
||||
UInt128 getChecksum() const override { return getInternalBackupEntry()->getChecksum(); }
|
||||
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override { return getInternalBackupEntry()->getPartialChecksum(prefix_length); }
|
||||
UInt128 getChecksum(const ReadSettings & read_settings) const override { return getInternalBackupEntry()->getChecksum(read_settings); }
|
||||
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override { return getInternalBackupEntry()->getPartialChecksum(prefix_length, read_settings); }
|
||||
DataSourceDescription getDataSourceDescription() const override { return getInternalBackupEntry()->getDataSourceDescription(); }
|
||||
bool isEncryptedByDisk() const override { return getInternalBackupEntry()->isEncryptedByDisk(); }
|
||||
bool isFromFile() const override { return getInternalBackupEntry()->isFromFile(); }
|
||||
|
@ -21,11 +21,11 @@ public:
|
||||
virtual UInt64 getSize() const = 0;
|
||||
|
||||
/// Returns the checksum of the data.
|
||||
virtual UInt128 getChecksum() const = 0;
|
||||
virtual UInt128 getChecksum(const ReadSettings & read_settings) const = 0;
|
||||
|
||||
/// Returns a partial checksum, i.e. the checksum calculated for a prefix part of the data.
|
||||
/// Can return nullopt if the partial checksum is too difficult to calculate.
|
||||
virtual std::optional<UInt128> getPartialChecksum(size_t /* prefix_length */) const { return {}; }
|
||||
virtual std::optional<UInt128> getPartialChecksum(size_t /* prefix_length */, const ReadSettings &) const { return {}; }
|
||||
|
||||
/// Returns a read buffer for reading the data.
|
||||
virtual std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const = 0;
|
||||
|
@ -107,12 +107,27 @@ void registerBackupEngineS3(BackupFactory & factory)
|
||||
|
||||
if (params.open_mode == IBackup::OpenMode::READ)
|
||||
{
|
||||
auto reader = std::make_shared<BackupReaderS3>(S3::URI{s3_uri}, access_key_id, secret_access_key, params.allow_s3_native_copy, params.context);
|
||||
auto reader = std::make_shared<BackupReaderS3>(S3::URI{s3_uri},
|
||||
access_key_id,
|
||||
secret_access_key,
|
||||
params.allow_s3_native_copy,
|
||||
params.read_settings,
|
||||
params.write_settings,
|
||||
params.context);
|
||||
|
||||
return std::make_unique<BackupImpl>(backup_name_for_logging, archive_params, params.base_backup_info, reader, params.context);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto writer = std::make_shared<BackupWriterS3>(S3::URI{s3_uri}, access_key_id, secret_access_key, params.allow_s3_native_copy, params.s3_storage_class, params.context);
|
||||
auto writer = std::make_shared<BackupWriterS3>(S3::URI{s3_uri},
|
||||
access_key_id,
|
||||
secret_access_key,
|
||||
params.allow_s3_native_copy,
|
||||
params.s3_storage_class,
|
||||
params.read_settings,
|
||||
params.write_settings,
|
||||
params.context);
|
||||
|
||||
return std::make_unique<BackupImpl>(
|
||||
backup_name_for_logging,
|
||||
archive_params,
|
||||
|
@ -169,18 +169,18 @@ void registerBackupEnginesFileAndDisk(BackupFactory & factory)
|
||||
{
|
||||
std::shared_ptr<IBackupReader> reader;
|
||||
if (engine_name == "File")
|
||||
reader = std::make_shared<BackupReaderFile>(path, params.context);
|
||||
reader = std::make_shared<BackupReaderFile>(path, params.read_settings, params.write_settings);
|
||||
else
|
||||
reader = std::make_shared<BackupReaderDisk>(disk, path, params.context);
|
||||
reader = std::make_shared<BackupReaderDisk>(disk, path, params.read_settings, params.write_settings);
|
||||
return std::make_unique<BackupImpl>(backup_name_for_logging, archive_params, params.base_backup_info, reader, params.context);
|
||||
}
|
||||
else
|
||||
{
|
||||
std::shared_ptr<IBackupWriter> writer;
|
||||
if (engine_name == "File")
|
||||
writer = std::make_shared<BackupWriterFile>(path, params.context);
|
||||
writer = std::make_shared<BackupWriterFile>(path, params.read_settings, params.write_settings);
|
||||
else
|
||||
writer = std::make_shared<BackupWriterDisk>(disk, path, params.context);
|
||||
writer = std::make_shared<BackupWriterDisk>(disk, path, params.read_settings, params.write_settings);
|
||||
return std::make_unique<BackupImpl>(
|
||||
backup_name_for_logging,
|
||||
archive_params,
|
||||
|
@ -69,14 +69,14 @@ protected:
|
||||
|
||||
static String getChecksum(const BackupEntryPtr & backup_entry)
|
||||
{
|
||||
return getHexUIntUppercase(backup_entry->getChecksum());
|
||||
return getHexUIntUppercase(backup_entry->getChecksum({}));
|
||||
}
|
||||
|
||||
static const constexpr std::string_view NO_CHECKSUM = "no checksum";
|
||||
|
||||
static String getPartialChecksum(const BackupEntryPtr & backup_entry, size_t prefix_length)
|
||||
{
|
||||
auto partial_checksum = backup_entry->getPartialChecksum(prefix_length);
|
||||
auto partial_checksum = backup_entry->getPartialChecksum(prefix_length, {});
|
||||
if (!partial_checksum)
|
||||
return String{NO_CHECKSUM};
|
||||
return getHexUIntUppercase(*partial_checksum);
|
||||
@ -218,7 +218,7 @@ TEST_F(BackupEntriesTest, PartialChecksumBeforeFullChecksum)
|
||||
TEST_F(BackupEntriesTest, BackupEntryFromSmallFile)
|
||||
{
|
||||
writeFile(local_disk, "a.txt");
|
||||
auto entry = std::make_shared<BackupEntryFromSmallFile>(local_disk, "a.txt");
|
||||
auto entry = std::make_shared<BackupEntryFromSmallFile>(local_disk, "a.txt", ReadSettings{});
|
||||
|
||||
local_disk->removeFile("a.txt");
|
||||
|
||||
@ -239,7 +239,7 @@ TEST_F(BackupEntriesTest, DecryptedEntriesFromEncryptedDisk)
|
||||
std::pair<BackupEntryPtr, bool /* partial_checksum_allowed */> test_cases[]
|
||||
= {{std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "a.txt"), false},
|
||||
{std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "a.txt"), true},
|
||||
{std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt"), true}};
|
||||
{std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt", ReadSettings{}), true}};
|
||||
for (const auto & [entry, partial_checksum_allowed] : test_cases)
|
||||
{
|
||||
EXPECT_EQ(entry->getSize(), 9);
|
||||
@ -258,7 +258,7 @@ TEST_F(BackupEntriesTest, DecryptedEntriesFromEncryptedDisk)
|
||||
BackupEntryPtr entries[]
|
||||
= {std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "empty.txt"),
|
||||
std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "empty.txt"),
|
||||
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt")};
|
||||
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt", ReadSettings{})};
|
||||
for (const auto & entry : entries)
|
||||
{
|
||||
EXPECT_EQ(entry->getSize(), 0);
|
||||
@ -288,7 +288,7 @@ TEST_F(BackupEntriesTest, EncryptedEntriesFromEncryptedDisk)
|
||||
BackupEntryPtr entries[]
|
||||
= {std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "a.txt", /* copy_encrypted= */ true),
|
||||
std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "a.txt", /* copy_encrypted= */ true),
|
||||
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt", /* copy_encrypted= */ true)};
|
||||
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt", ReadSettings{}, /* copy_encrypted= */ true)};
|
||||
|
||||
auto encrypted_checksum = getChecksum(entries[0]);
|
||||
EXPECT_NE(encrypted_checksum, NO_CHECKSUM);
|
||||
@ -322,7 +322,7 @@ TEST_F(BackupEntriesTest, EncryptedEntriesFromEncryptedDisk)
|
||||
BackupEntryPtr entries[]
|
||||
= {std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "empty.txt", /* copy_encrypted= */ true),
|
||||
std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "empty.txt", /* copy_encrypted= */ true),
|
||||
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt", /* copy_encrypted= */ true)};
|
||||
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt", ReadSettings{}, /* copy_encrypted= */ true)};
|
||||
for (const auto & entry : entries)
|
||||
{
|
||||
EXPECT_EQ(entry->getSize(), 0);
|
||||
|
@ -35,6 +35,18 @@ LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool
|
||||
|
||||
LocalConnection::~LocalConnection()
|
||||
{
|
||||
/// Last query may not have been finished or cancelled due to exception on client side.
|
||||
if (state && !state->is_finished && !state->is_cancelled)
|
||||
{
|
||||
try
|
||||
{
|
||||
LocalConnection::sendCancel();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Just ignore any exception.
|
||||
}
|
||||
}
|
||||
state.reset();
|
||||
}
|
||||
|
||||
@ -73,6 +85,10 @@ void LocalConnection::sendQuery(
|
||||
bool,
|
||||
std::function<void(const Progress &)> process_progress_callback)
|
||||
{
|
||||
/// Last query may not have been finished or cancelled due to exception on client side.
|
||||
if (state && !state->is_finished && !state->is_cancelled)
|
||||
sendCancel();
|
||||
|
||||
/// Suggestion comes without client_info.
|
||||
if (client_info)
|
||||
query_context = session.makeQueryContext(*client_info);
|
||||
@ -204,6 +220,10 @@ void LocalConnection::sendCancel()
|
||||
state->is_cancelled = true;
|
||||
if (state->executor)
|
||||
state->executor->cancel();
|
||||
if (state->pushing_executor)
|
||||
state->pushing_executor->cancel();
|
||||
if (state->pushing_async_executor)
|
||||
state->pushing_async_executor->cancel();
|
||||
}
|
||||
|
||||
bool LocalConnection::pullBlock(Block & block)
|
||||
|
@ -582,7 +582,8 @@
|
||||
M(697, CANNOT_RESTORE_TO_NONENCRYPTED_DISK) \
|
||||
M(698, INVALID_REDIS_STORAGE_TYPE) \
|
||||
M(699, INVALID_REDIS_TABLE_STRUCTURE) \
|
||||
M(700, USER_SESSION_LIMIT_EXCEEDED) \
|
||||
M(700, USER_SESSION_LIMIT_EXCEEDED) \
|
||||
M(701, CLUSTER_DOESNT_EXIST) \
|
||||
\
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
M(1000, POCO_EXCEPTION) \
|
||||
|
@ -33,6 +33,9 @@ void HTTPHeaderFilter::setValuesFromConfig(const Poco::Util::AbstractConfigurati
|
||||
{
|
||||
std::lock_guard guard(mutex);
|
||||
|
||||
forbidden_headers.clear();
|
||||
forbidden_headers_regexp.clear();
|
||||
|
||||
if (config.has("http_forbid_headers"))
|
||||
{
|
||||
std::vector<std::string> keys;
|
||||
@ -46,11 +49,6 @@ void HTTPHeaderFilter::setValuesFromConfig(const Poco::Util::AbstractConfigurati
|
||||
forbidden_headers.insert(config.getString("http_forbid_headers." + key));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
forbidden_headers.clear();
|
||||
forbidden_headers_regexp.clear();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Parsers/ASTCreateNamedCollectionQuery.h>
|
||||
#include <Parsers/ASTAlterNamedCollectionQuery.h>
|
||||
#include <Parsers/ASTDropNamedCollectionQuery.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
@ -225,24 +226,15 @@ public:
|
||||
|
||||
void remove(const std::string & collection_name)
|
||||
{
|
||||
if (!removeIfExists(collection_name))
|
||||
auto collection_path = getMetadataPath(collection_name);
|
||||
if (!fs::exists(collection_path))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
|
||||
"Cannot remove collection `{}`, because it doesn't exist",
|
||||
collection_name);
|
||||
}
|
||||
}
|
||||
|
||||
bool removeIfExists(const std::string & collection_name)
|
||||
{
|
||||
auto collection_path = getMetadataPath(collection_name);
|
||||
if (fs::exists(collection_path))
|
||||
{
|
||||
fs::remove(collection_path);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
fs::remove(collection_path);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -393,36 +385,64 @@ void loadIfNot()
|
||||
return loadIfNotUnlocked(lock);
|
||||
}
|
||||
|
||||
void removeFromSQL(const std::string & collection_name, ContextPtr context)
|
||||
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context)
|
||||
{
|
||||
auto lock = lockNamedCollectionsTransaction();
|
||||
loadIfNotUnlocked(lock);
|
||||
LoadFromSQL(context).remove(collection_name);
|
||||
NamedCollectionFactory::instance().remove(collection_name);
|
||||
}
|
||||
|
||||
void removeIfExistsFromSQL(const std::string & collection_name, ContextPtr context)
|
||||
{
|
||||
auto lock = lockNamedCollectionsTransaction();
|
||||
loadIfNotUnlocked(lock);
|
||||
LoadFromSQL(context).removeIfExists(collection_name);
|
||||
NamedCollectionFactory::instance().removeIfExists(collection_name);
|
||||
auto & instance = NamedCollectionFactory::instance();
|
||||
if (!instance.exists(query.collection_name))
|
||||
{
|
||||
if (!query.if_exists)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
|
||||
"Cannot remove collection `{}`, because it doesn't exist",
|
||||
query.collection_name);
|
||||
}
|
||||
return;
|
||||
}
|
||||
LoadFromSQL(context).remove(query.collection_name);
|
||||
instance.remove(query.collection_name);
|
||||
}
|
||||
|
||||
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context)
|
||||
{
|
||||
auto lock = lockNamedCollectionsTransaction();
|
||||
loadIfNotUnlocked(lock);
|
||||
NamedCollectionFactory::instance().add(query.collection_name, LoadFromSQL(context).create(query));
|
||||
auto & instance = NamedCollectionFactory::instance();
|
||||
if (instance.exists(query.collection_name))
|
||||
{
|
||||
if (!query.if_not_exists)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
|
||||
"A named collection `{}` already exists",
|
||||
query.collection_name);
|
||||
}
|
||||
return;
|
||||
}
|
||||
instance.add(query.collection_name, LoadFromSQL(context).create(query));
|
||||
}
|
||||
|
||||
void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context)
|
||||
{
|
||||
auto lock = lockNamedCollectionsTransaction();
|
||||
loadIfNotUnlocked(lock);
|
||||
auto & instance = NamedCollectionFactory::instance();
|
||||
if (!instance.exists(query.collection_name))
|
||||
{
|
||||
if (!query.if_exists)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
|
||||
"Cannot remove collection `{}`, because it doesn't exist",
|
||||
query.collection_name);
|
||||
}
|
||||
return;
|
||||
}
|
||||
LoadFromSQL(context).update(query);
|
||||
|
||||
auto collection = NamedCollectionFactory::instance().getMutable(query.collection_name);
|
||||
auto collection = instance.getMutable(query.collection_name);
|
||||
auto collection_lock = collection->lock();
|
||||
|
||||
for (const auto & [name, value] : query.changes)
|
||||
|
@ -8,6 +8,7 @@ namespace DB
|
||||
|
||||
class ASTCreateNamedCollectionQuery;
|
||||
class ASTAlterNamedCollectionQuery;
|
||||
class ASTDropNamedCollectionQuery;
|
||||
|
||||
namespace NamedCollectionUtils
|
||||
{
|
||||
@ -26,8 +27,7 @@ void reloadFromConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
void loadFromSQL(ContextPtr context);
|
||||
|
||||
/// Remove collection as well as its metadata from `context->getPath() / named_collections /`.
|
||||
void removeFromSQL(const std::string & collection_name, ContextPtr context);
|
||||
void removeIfExistsFromSQL(const std::string & collection_name, ContextPtr context);
|
||||
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context);
|
||||
|
||||
/// Create a new collection from AST and put it to `context->getPath() / named_collections /`.
|
||||
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context);
|
||||
|
@ -101,6 +101,10 @@ void ProgressIndication::writeFinalProgress()
|
||||
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.)";
|
||||
else
|
||||
std::cout << ". ";
|
||||
|
||||
auto peak_memory_usage = getMemoryUsage().peak;
|
||||
if (peak_memory_usage >= 0)
|
||||
std::cout << "\nPeak memory usage: " << formatReadableSizeWithBinarySuffix(peak_memory_usage) << ".";
|
||||
}
|
||||
|
||||
void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message)
|
||||
|
@ -70,6 +70,8 @@ ThreadGroup::ThreadGroup()
|
||||
ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_)
|
||||
: thread_id{getThreadId()}, check_current_thread_on_destruction(check_current_thread_on_destruction_)
|
||||
{
|
||||
chassert(!current_thread);
|
||||
|
||||
last_rusage = std::make_unique<RUsageCounters>();
|
||||
|
||||
memory_tracker.setDescription("(for thread)");
|
||||
@ -123,6 +125,7 @@ ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_)
|
||||
|
||||
ThreadGroupPtr ThreadStatus::getThreadGroup() const
|
||||
{
|
||||
chassert(current_thread == this);
|
||||
return thread_group;
|
||||
}
|
||||
|
||||
|
@ -218,7 +218,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_path);
|
||||
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_file_info.path);
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
});
|
||||
|
@ -78,7 +78,7 @@ class IColumn;
|
||||
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
|
||||
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
|
||||
M(UInt64, s3_upload_part_size_multiply_factor, 2, "Multiply s3_min_upload_part_size by this factor each time s3_multiply_parts_count_threshold parts were uploaded from a single write to S3.", 0) \
|
||||
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \
|
||||
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3, s3_min_upload_part_size is multiplied by s3_upload_part_size_multiply_factor.", 0) \
|
||||
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited. You ", 0) \
|
||||
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
|
||||
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
|
||||
@ -838,6 +838,9 @@ class IColumn;
|
||||
MAKE_OBSOLETE(M, Seconds, drain_timeout, 3) \
|
||||
MAKE_OBSOLETE(M, UInt64, backup_threads, 16) \
|
||||
MAKE_OBSOLETE(M, UInt64, restore_threads, 16) \
|
||||
MAKE_OBSOLETE(M, Bool, input_format_arrow_import_nested, false) \
|
||||
MAKE_OBSOLETE(M, Bool, input_format_parquet_import_nested, false) \
|
||||
MAKE_OBSOLETE(M, Bool, input_format_orc_import_nested, false) \
|
||||
MAKE_OBSOLETE(M, Bool, optimize_duplicate_order_by_and_distinct, false) \
|
||||
|
||||
/** The section above is for obsolete settings. Do not add anything there. */
|
||||
@ -859,12 +862,9 @@ class IColumn;
|
||||
M(Bool, input_format_tsv_empty_as_default, false, "Treat empty fields in TSV input as default values.", 0) \
|
||||
M(Bool, input_format_tsv_enum_as_number, false, "Treat inserted enum values in TSV formats as enum indices.", 0) \
|
||||
M(Bool, input_format_null_as_default, true, "Initialize null fields with default values if the data type of this field is not nullable and it is supported by the input format", 0) \
|
||||
M(Bool, input_format_arrow_import_nested, false, "Allow to insert array of structs into Nested table in Arrow input format.", 0) \
|
||||
M(Bool, input_format_arrow_case_insensitive_column_matching, false, "Ignore case when matching Arrow columns with CH columns.", 0) \
|
||||
M(Bool, input_format_orc_import_nested, false, "Allow to insert array of structs into Nested table in ORC input format.", 0) \
|
||||
M(Int64, input_format_orc_row_batch_size, 100'000, "Batch size when reading ORC stripes.", 0) \
|
||||
M(Bool, input_format_orc_case_insensitive_column_matching, false, "Ignore case when matching ORC columns with CH columns.", 0) \
|
||||
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
|
||||
M(Bool, input_format_parquet_case_insensitive_column_matching, false, "Ignore case when matching Parquet columns with CH columns.", 0) \
|
||||
M(Bool, input_format_parquet_preserve_order, false, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \
|
||||
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
|
||||
|
@ -107,9 +107,6 @@ DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & m
|
||||
{
|
||||
cckMetadataPathForOrdinary(create, metadata_path);
|
||||
|
||||
/// Creates store/xxx/ for Atomic
|
||||
fs::create_directories(fs::path(metadata_path).parent_path());
|
||||
|
||||
DatabasePtr impl = getImpl(create, metadata_path, context);
|
||||
|
||||
if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries)
|
||||
|
@ -77,6 +77,8 @@ DatabaseMySQL::DatabaseMySQL(
|
||||
throw;
|
||||
}
|
||||
|
||||
fs::create_directories(metadata_path);
|
||||
|
||||
thread = ThreadFromGlobalPool{&DatabaseMySQL::cleanOutdatedTables, this};
|
||||
}
|
||||
|
||||
@ -144,6 +146,7 @@ ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const String & table_name, Context
|
||||
auto table_storage_define = database_engine_define->clone();
|
||||
{
|
||||
ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
|
||||
ast_storage->engine->kind = ASTFunction::Kind::TABLE_ENGINE;
|
||||
ASTs storage_children = ast_storage->children;
|
||||
auto storage_engine_arguments = ast_storage->engine->arguments;
|
||||
|
||||
|
@ -54,6 +54,7 @@ DatabasePostgreSQL::DatabasePostgreSQL(
|
||||
, cache_tables(cache_tables_)
|
||||
, log(&Poco::Logger::get("DatabasePostgreSQL(" + dbname_ + ")"))
|
||||
{
|
||||
fs::create_directories(metadata_path);
|
||||
cleaner_task = getContext()->getSchedulePool().createTask("PostgreSQLCleanerTask", [this]{ removeOutdatedTables(); });
|
||||
cleaner_task->deactivate();
|
||||
}
|
||||
@ -390,6 +391,7 @@ ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, Co
|
||||
|
||||
auto create_table_query = std::make_shared<ASTCreateQuery>();
|
||||
auto table_storage_define = database_engine_define->clone();
|
||||
table_storage_define->as<ASTStorage>()->engine->kind = ASTFunction::Kind::TABLE_ENGINE;
|
||||
create_table_query->set(create_table_query->storage, table_storage_define);
|
||||
|
||||
auto columns_declare_list = std::make_shared<ASTColumns>();
|
||||
|
@ -187,6 +187,7 @@ ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, Contex
|
||||
}
|
||||
auto table_storage_define = database_engine_define->clone();
|
||||
ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
|
||||
ast_storage->engine->kind = ASTFunction::Kind::TABLE_ENGINE;
|
||||
auto storage_engine_arguments = ast_storage->engine->arguments;
|
||||
auto table_id = storage->getStorageID();
|
||||
/// Add table_name to engine arguments
|
||||
|
@ -120,7 +120,6 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
|
||||
format_settings.parquet.row_group_rows = settings.output_format_parquet_row_group_size;
|
||||
format_settings.parquet.row_group_bytes = settings.output_format_parquet_row_group_size_bytes;
|
||||
format_settings.parquet.output_version = settings.output_format_parquet_version;
|
||||
format_settings.parquet.import_nested = settings.input_format_parquet_import_nested;
|
||||
format_settings.parquet.case_insensitive_column_matching = settings.input_format_parquet_case_insensitive_column_matching;
|
||||
format_settings.parquet.preserve_order = settings.input_format_parquet_preserve_order;
|
||||
format_settings.parquet.allow_missing_columns = settings.input_format_parquet_allow_missing_columns;
|
||||
@ -170,7 +169,6 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
|
||||
format_settings.with_types_use_header = settings.input_format_with_types_use_header;
|
||||
format_settings.write_statistics = settings.output_format_write_statistics;
|
||||
format_settings.arrow.low_cardinality_as_dictionary = settings.output_format_arrow_low_cardinality_as_dictionary;
|
||||
format_settings.arrow.import_nested = settings.input_format_arrow_import_nested;
|
||||
format_settings.arrow.allow_missing_columns = settings.input_format_arrow_allow_missing_columns;
|
||||
format_settings.arrow.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference;
|
||||
format_settings.arrow.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference;
|
||||
@ -178,11 +176,9 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
|
||||
format_settings.arrow.output_string_as_string = settings.output_format_arrow_string_as_string;
|
||||
format_settings.arrow.output_fixed_string_as_fixed_byte_array = settings.output_format_arrow_fixed_string_as_fixed_byte_array;
|
||||
format_settings.arrow.output_compression_method = settings.output_format_arrow_compression_method;
|
||||
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
|
||||
format_settings.orc.allow_missing_columns = settings.input_format_orc_allow_missing_columns;
|
||||
format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size;
|
||||
format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_orc_skip_columns_with_unsupported_types_in_schema_inference;
|
||||
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
|
||||
format_settings.orc.allow_missing_columns = settings.input_format_orc_allow_missing_columns;
|
||||
format_settings.orc.row_batch_size = settings.input_format_orc_row_batch_size;
|
||||
format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_orc_skip_columns_with_unsupported_types_in_schema_inference;
|
||||
@ -687,14 +683,6 @@ void FormatFactory::markFormatSupportsSubsetOfColumns(const String & name)
|
||||
target = true;
|
||||
}
|
||||
|
||||
void FormatFactory::markFormatSupportsSubcolumns(const String & name)
|
||||
{
|
||||
auto & target = dict[name].supports_subcolumns;
|
||||
if (target)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subcolumns", name);
|
||||
target = true;
|
||||
}
|
||||
|
||||
void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
|
||||
{
|
||||
auto & target = dict[name].prefers_large_blocks;
|
||||
@ -703,12 +691,6 @@ void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
|
||||
target = true;
|
||||
}
|
||||
|
||||
bool FormatFactory::checkIfFormatSupportsSubcolumns(const String & name) const
|
||||
{
|
||||
const auto & target = getCreators(name);
|
||||
return target.supports_subcolumns;
|
||||
}
|
||||
|
||||
bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) const
|
||||
{
|
||||
const auto & target = getCreators(name);
|
||||
|
@ -228,10 +228,8 @@ public:
|
||||
|
||||
void markOutputFormatSupportsParallelFormatting(const String & name);
|
||||
void markOutputFormatPrefersLargeBlocks(const String & name);
|
||||
void markFormatSupportsSubcolumns(const String & name);
|
||||
void markFormatSupportsSubsetOfColumns(const String & name);
|
||||
|
||||
bool checkIfFormatSupportsSubcolumns(const String & name) const;
|
||||
bool checkIfFormatSupportsSubsetOfColumns(const String & name) const;
|
||||
|
||||
bool checkIfFormatHasSchemaReader(const String & name) const;
|
||||
|
@ -113,7 +113,6 @@ struct FormatSettings
|
||||
{
|
||||
UInt64 row_group_size = 1000000;
|
||||
bool low_cardinality_as_dictionary = false;
|
||||
bool import_nested = false;
|
||||
bool allow_missing_columns = false;
|
||||
bool skip_columns_with_unsupported_types_in_schema_inference = false;
|
||||
bool case_insensitive_column_matching = false;
|
||||
@ -227,7 +226,6 @@ struct FormatSettings
|
||||
{
|
||||
UInt64 row_group_rows = 1000000;
|
||||
UInt64 row_group_bytes = 512 * 1024 * 1024;
|
||||
bool import_nested = false;
|
||||
bool allow_missing_columns = false;
|
||||
bool skip_columns_with_unsupported_types_in_schema_inference = false;
|
||||
bool case_insensitive_column_matching = false;
|
||||
@ -338,7 +336,6 @@ struct FormatSettings
|
||||
|
||||
struct
|
||||
{
|
||||
bool import_nested = false;
|
||||
bool allow_missing_columns = false;
|
||||
int64_t row_batch_size = 100'000;
|
||||
bool skip_columns_with_unsupported_types_in_schema_inference = false;
|
||||
|
@ -166,7 +166,6 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int BAD_GET;
|
||||
extern const int UNKNOWN_DATABASE;
|
||||
extern const int UNKNOWN_TABLE;
|
||||
extern const int TABLE_ALREADY_EXISTS;
|
||||
@ -181,6 +180,7 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_FUNCTION;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
|
||||
extern const int CLUSTER_DOESNT_EXIST;
|
||||
}
|
||||
|
||||
#define SHUTDOWN(log, desc, ptr, method) do \
|
||||
@ -1089,52 +1089,32 @@ ConfigurationPtr Context::getUsersConfig()
|
||||
return shared->users_config;
|
||||
}
|
||||
|
||||
void Context::setUser(const UUID & user_id_, bool set_current_profiles_, bool set_current_roles_, bool set_current_database_)
|
||||
void Context::setUser(const UUID & user_id_, const std::optional<const std::vector<UUID>> & current_roles_)
|
||||
{
|
||||
/// Prepare lists of user's profiles, constraints, settings, roles.
|
||||
/// NOTE: AccessControl::read<User>() and other AccessControl's functions may require some IO work,
|
||||
/// so Context::getLock() must be unlocked while we're doing this.
|
||||
|
||||
std::shared_ptr<const User> user;
|
||||
std::shared_ptr<const ContextAccess> temp_access;
|
||||
if (set_current_profiles_ || set_current_roles_ || set_current_database_)
|
||||
{
|
||||
std::optional<ContextAccessParams> params;
|
||||
{
|
||||
auto lock = getLock();
|
||||
params.emplace(ContextAccessParams{user_id_, /* full_access= */ false, /* use_default_roles = */ true, {}, settings, current_database, client_info });
|
||||
}
|
||||
/// `temp_access` is used here only to extract information about the user, not to actually check access.
|
||||
/// NOTE: AccessControl::getContextAccess() may require some IO work, so Context::getLock() must be unlocked while we're doing this.
|
||||
temp_access = getAccessControl().getContextAccess(*params);
|
||||
user = temp_access->getUser();
|
||||
}
|
||||
auto user = getAccessControl().read<User>(user_id_);
|
||||
|
||||
std::shared_ptr<const SettingsProfilesInfo> profiles;
|
||||
if (set_current_profiles_)
|
||||
profiles = temp_access->getDefaultProfileInfo();
|
||||
|
||||
std::optional<std::vector<UUID>> roles;
|
||||
if (set_current_roles_)
|
||||
roles = user->granted_roles.findGranted(user->default_roles);
|
||||
|
||||
String database;
|
||||
if (set_current_database_)
|
||||
database = user->default_database;
|
||||
auto new_current_roles = current_roles_ ? user->granted_roles.findGranted(*current_roles_) : user->granted_roles.findGranted(user->default_roles);
|
||||
auto enabled_roles = getAccessControl().getEnabledRolesInfo(new_current_roles, {});
|
||||
auto enabled_profiles = getAccessControl().getEnabledSettingsInfo(user_id_, user->settings, enabled_roles->enabled_roles, enabled_roles->settings_from_enabled_roles);
|
||||
const auto & database = user->default_database;
|
||||
|
||||
/// Apply user's profiles, constraints, settings, roles.
|
||||
|
||||
auto lock = getLock();
|
||||
|
||||
setUserID(user_id_);
|
||||
|
||||
if (profiles)
|
||||
{
|
||||
/// A profile can specify a value and a readonly constraint for same setting at the same time,
|
||||
/// so we shouldn't check constraints here.
|
||||
setCurrentProfiles(*profiles, /* check_constraints= */ false);
|
||||
}
|
||||
/// A profile can specify a value and a readonly constraint for same setting at the same time,
|
||||
/// so we shouldn't check constraints here.
|
||||
setCurrentProfiles(*enabled_profiles, /* check_constraints= */ false);
|
||||
|
||||
if (roles)
|
||||
setCurrentRoles(*roles);
|
||||
setCurrentRoles(new_current_roles);
|
||||
|
||||
/// It's optional to specify the DEFAULT DATABASE in the user's definition.
|
||||
if (!database.empty())
|
||||
setCurrentDatabase(database);
|
||||
}
|
||||
@ -3073,7 +3053,7 @@ UInt16 Context::getServerPort(const String & port_name) const
|
||||
{
|
||||
auto it = shared->server_ports.find(port_name);
|
||||
if (it == shared->server_ports.end())
|
||||
throw Exception(ErrorCodes::BAD_GET, "There is no port named {}", port_name);
|
||||
throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "There is no port named {}", port_name);
|
||||
else
|
||||
return it->second;
|
||||
}
|
||||
@ -3082,7 +3062,7 @@ std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) c
|
||||
{
|
||||
if (auto res = tryGetCluster(cluster_name))
|
||||
return res;
|
||||
throw Exception(ErrorCodes::BAD_GET, "Requested cluster '{}' not found", cluster_name);
|
||||
throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name);
|
||||
}
|
||||
|
||||
|
||||
@ -4550,14 +4530,6 @@ ReadSettings Context::getReadSettings() const
|
||||
return res;
|
||||
}
|
||||
|
||||
ReadSettings Context::getBackupReadSettings() const
|
||||
{
|
||||
ReadSettings read_settings = getReadSettings();
|
||||
read_settings.remote_throttler = getBackupsThrottler();
|
||||
read_settings.local_throttler = getBackupsThrottler();
|
||||
return read_settings;
|
||||
}
|
||||
|
||||
WriteSettings Context::getWriteSettings() const
|
||||
{
|
||||
WriteSettings res;
|
||||
|
@ -534,12 +534,10 @@ public:
|
||||
|
||||
/// Sets the current user assuming that he/she is already authenticated.
|
||||
/// WARNING: This function doesn't check password!
|
||||
void setUser(const UUID & user_id_, bool set_current_profiles_ = true, bool set_current_roles_ = true, bool set_current_database_ = true);
|
||||
void setUser(const UUID & user_id_, const std::optional<const std::vector<UUID>> & current_roles_ = {});
|
||||
UserPtr getUser() const;
|
||||
|
||||
void setUserID(const UUID & user_id_);
|
||||
std::optional<UUID> getUserID() const;
|
||||
|
||||
String getUserName() const;
|
||||
|
||||
void setCurrentRoles(const std::vector<UUID> & current_roles_);
|
||||
@ -1168,9 +1166,6 @@ public:
|
||||
/** Get settings for reading from filesystem. */
|
||||
ReadSettings getReadSettings() const;
|
||||
|
||||
/** Get settings for reading from filesystem for BACKUPs. */
|
||||
ReadSettings getBackupReadSettings() const;
|
||||
|
||||
/** Get settings for writing to filesystem. */
|
||||
WriteSettings getWriteSettings() const;
|
||||
|
||||
@ -1195,6 +1190,8 @@ private:
|
||||
|
||||
void initGlobal();
|
||||
|
||||
void setUserID(const UUID & user_id_);
|
||||
|
||||
template <typename... Args>
|
||||
void checkAccessImpl(const Args &... args) const;
|
||||
|
||||
|
@ -1,5 +1,4 @@
|
||||
#include <Interpreters/InterpreterCreateNamedCollectionQuery.h>
|
||||
|
||||
#include <Parsers/ASTCreateNamedCollectionQuery.h>
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
@ -22,11 +22,7 @@ BlockIO InterpreterDropNamedCollectionQuery::execute()
|
||||
return executeDDLQueryOnCluster(query_ptr, current_context, params);
|
||||
}
|
||||
|
||||
if (query.if_exists)
|
||||
NamedCollectionUtils::removeIfExistsFromSQL(query.collection_name, current_context);
|
||||
else
|
||||
NamedCollectionUtils::removeFromSQL(query.collection_name, current_context);
|
||||
|
||||
NamedCollectionUtils::removeFromSQL(query, current_context);
|
||||
return {};
|
||||
}
|
||||
|
||||
|
@ -250,6 +250,9 @@ static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & dat
|
||||
{
|
||||
String path = context->getPath() + "metadata/" + database_name;
|
||||
String metadata_file = path + ".sql";
|
||||
if (fs::exists(metadata_file + ".tmp"))
|
||||
fs::remove(metadata_file + ".tmp");
|
||||
|
||||
if (fs::exists(fs::path(metadata_file)))
|
||||
{
|
||||
/// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted.
|
||||
|
@ -15,6 +15,8 @@ ASTPtr ASTAlterNamedCollectionQuery::clone() const
|
||||
void ASTAlterNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
|
||||
{
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << "Alter NAMED COLLECTION ";
|
||||
if (if_exists)
|
||||
settings.ostr << "IF EXISTS ";
|
||||
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
|
||||
formatOnCluster(settings);
|
||||
if (!changes.empty())
|
||||
|
@ -18,6 +18,8 @@ ASTPtr ASTCreateNamedCollectionQuery::clone() const
|
||||
void ASTCreateNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
|
||||
{
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << "CREATE NAMED COLLECTION ";
|
||||
if (if_not_exists)
|
||||
settings.ostr << "IF NOT EXISTS ";
|
||||
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
|
||||
|
||||
formatOnCluster(settings);
|
||||
|
@ -13,6 +13,7 @@ class ASTCreateNamedCollectionQuery : public IAST, public ASTQueryWithOnCluster
|
||||
public:
|
||||
std::string collection_name;
|
||||
SettingsChanges changes;
|
||||
bool if_not_exists = false;
|
||||
|
||||
String getID(char) const override { return "CreateNamedCollectionQuery"; }
|
||||
|
||||
|
@ -13,6 +13,8 @@ ASTPtr ASTDropNamedCollectionQuery::clone() const
|
||||
void ASTDropNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
|
||||
{
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << "DROP NAMED COLLECTION ";
|
||||
if (if_exists)
|
||||
settings.ostr << "IF EXISTS ";
|
||||
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
|
||||
formatOnCluster(settings);
|
||||
}
|
||||
|
@ -243,6 +243,38 @@ bool ParserIdentifier::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
}
|
||||
|
||||
|
||||
bool ParserTableAsStringLiteralIdentifier::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
{
|
||||
if (pos->type != TokenType::StringLiteral)
|
||||
return false;
|
||||
|
||||
ReadBufferFromMemory in(pos->begin, pos->size());
|
||||
String s;
|
||||
|
||||
if (!tryReadQuotedStringInto(s, in))
|
||||
{
|
||||
expected.add(pos, "string literal");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (in.count() != pos->size())
|
||||
{
|
||||
expected.add(pos, "string literal");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (s.empty())
|
||||
{
|
||||
expected.add(pos, "non-empty string literal");
|
||||
return false;
|
||||
}
|
||||
|
||||
node = std::make_shared<ASTTableIdentifier>(s);
|
||||
++pos;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ParserCompoundIdentifier::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
{
|
||||
ASTPtr id_list;
|
||||
|
@ -34,6 +34,19 @@ protected:
|
||||
};
|
||||
|
||||
|
||||
/** An identifier for tables written as string literal, for example, 'mytable.avro'
|
||||
*/
|
||||
class ParserTableAsStringLiteralIdentifier : public IParserBase
|
||||
{
|
||||
public:
|
||||
explicit ParserTableAsStringLiteralIdentifier() {}
|
||||
|
||||
protected:
|
||||
const char * getName() const override { return "string literal table identifier"; }
|
||||
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
|
||||
};
|
||||
|
||||
|
||||
/** An identifier, possibly containing a dot, for example, x_yz123 or `something special` or Hits.EventTime,
|
||||
* possibly with UUID clause like `db name`.`table name` UUID 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
|
||||
*/
|
||||
|
@ -13,8 +13,9 @@ bool ParserAlterNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & nod
|
||||
{
|
||||
ParserKeyword s_alter("ALTER");
|
||||
ParserKeyword s_collection("NAMED COLLECTION");
|
||||
ParserKeyword s_if_exists("IF EXISTS");
|
||||
ParserKeyword s_on("ON");
|
||||
ParserKeyword s_delete("DELETE");
|
||||
|
||||
ParserIdentifier name_p;
|
||||
ParserSetQuery set_p;
|
||||
ParserToken s_comma(TokenType::Comma);
|
||||
@ -32,10 +33,13 @@ bool ParserAlterNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & nod
|
||||
if (!s_collection.ignore(pos, expected))
|
||||
return false;
|
||||
|
||||
if (s_if_exists.ignore(pos, expected))
|
||||
if_exists = true;
|
||||
|
||||
if (!name_p.parse(pos, collection_name, expected))
|
||||
return false;
|
||||
|
||||
if (ParserKeyword{"ON"}.ignore(pos, expected))
|
||||
if (s_on.ignore(pos, expected))
|
||||
{
|
||||
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
|
||||
return false;
|
||||
|
@ -1421,15 +1421,17 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|
||||
bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
{
|
||||
ParserKeyword s_create("CREATE");
|
||||
ParserKeyword s_attach("ATTACH");
|
||||
ParserKeyword s_named_collection("NAMED COLLECTION");
|
||||
ParserKeyword s_if_not_exists("IF NOT EXISTS");
|
||||
ParserKeyword s_on("ON");
|
||||
ParserKeyword s_as("AS");
|
||||
|
||||
ParserToken s_comma(TokenType::Comma);
|
||||
ParserIdentifier name_p;
|
||||
ParserToken s_comma(TokenType::Comma);
|
||||
|
||||
String cluster_str;
|
||||
bool if_not_exists = false;
|
||||
|
||||
ASTPtr collection_name;
|
||||
String cluster_str;
|
||||
|
||||
if (!s_create.ignore(pos, expected))
|
||||
return false;
|
||||
@ -1437,10 +1439,13 @@ bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expec
|
||||
if (!s_named_collection.ignore(pos, expected))
|
||||
return false;
|
||||
|
||||
if (s_if_not_exists.ignore(pos, expected))
|
||||
if_not_exists = true;
|
||||
|
||||
if (!name_p.parse(pos, collection_name, expected))
|
||||
return false;
|
||||
|
||||
if (ParserKeyword{"ON"}.ignore(pos, expected))
|
||||
if (s_on.ignore(pos, expected))
|
||||
{
|
||||
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
|
||||
return false;
|
||||
@ -1465,7 +1470,9 @@ bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expec
|
||||
auto query = std::make_shared<ASTCreateNamedCollectionQuery>();
|
||||
|
||||
tryGetIdentifierNameInto(collection_name, query->collection_name);
|
||||
query->if_not_exists = if_not_exists;
|
||||
query->changes = changes;
|
||||
query->cluster = std::move(cluster_str);
|
||||
|
||||
node = query;
|
||||
return true;
|
||||
|
@ -548,6 +548,7 @@ protected:
|
||||
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
|
||||
};
|
||||
|
||||
/// CREATE NAMED COLLECTION name [ON CLUSTER cluster]
|
||||
class ParserCreateNamedCollectionQuery : public IParserBase
|
||||
{
|
||||
protected:
|
||||
|
@ -12,6 +12,7 @@ bool ParserDropNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node
|
||||
ParserKeyword s_drop("DROP");
|
||||
ParserKeyword s_collection("NAMED COLLECTION");
|
||||
ParserKeyword s_if_exists("IF EXISTS");
|
||||
ParserKeyword s_on("ON");
|
||||
ParserIdentifier name_p;
|
||||
|
||||
String cluster_str;
|
||||
@ -31,7 +32,7 @@ bool ParserDropNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node
|
||||
if (!name_p.parse(pos, collection_name, expected))
|
||||
return false;
|
||||
|
||||
if (ParserKeyword{"ON"}.ignore(pos, expected))
|
||||
if (s_on.ignore(pos, expected))
|
||||
{
|
||||
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
|
||||
return false;
|
||||
|
@ -24,6 +24,8 @@ bool ParserTableExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|
||||
if (!ParserWithOptionalAlias(std::make_unique<ParserSubquery>(), allow_alias_without_as_keyword).parse(pos, res->subquery, expected)
|
||||
&& !ParserWithOptionalAlias(std::make_unique<ParserFunction>(false, true), allow_alias_without_as_keyword).parse(pos, res->table_function, expected)
|
||||
&& !ParserWithOptionalAlias(std::make_unique<ParserCompoundIdentifier>(true, true), allow_alias_without_as_keyword)
|
||||
.parse(pos, res->database_and_table_name, expected)
|
||||
&& !ParserWithOptionalAlias(std::make_unique<ParserTableAsStringLiteralIdentifier>(), allow_alias_without_as_keyword)
|
||||
.parse(pos, res->database_and_table_name, expected))
|
||||
return false;
|
||||
|
||||
|
@ -97,7 +97,6 @@ Chunk IRowInputFormat::generate()
|
||||
|
||||
size_t num_rows = 0;
|
||||
size_t chunk_start_offset = getDataOffsetMaybeCompressed(getReadBuffer());
|
||||
|
||||
try
|
||||
{
|
||||
RowReadExtension info;
|
||||
|
@ -143,7 +143,6 @@ void ArrowBlockInputFormat::prepareReader()
|
||||
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
|
||||
getPort().getHeader(),
|
||||
"Arrow",
|
||||
format_settings.arrow.import_nested,
|
||||
format_settings.arrow.allow_missing_columns,
|
||||
format_settings.null_as_default,
|
||||
format_settings.arrow.case_insensitive_column_matching);
|
||||
@ -190,7 +189,6 @@ void registerInputFormatArrow(FormatFactory & factory)
|
||||
{
|
||||
return std::make_shared<ArrowBlockInputFormat>(buf, sample, false, format_settings);
|
||||
});
|
||||
factory.markFormatSupportsSubcolumns("Arrow");
|
||||
factory.markFormatSupportsSubsetOfColumns("Arrow");
|
||||
factory.registerInputFormat(
|
||||
"ArrowStream",
|
||||
|
@ -1032,13 +1032,11 @@ Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
|
||||
ArrowColumnToCHColumn::ArrowColumnToCHColumn(
|
||||
const Block & header_,
|
||||
const std::string & format_name_,
|
||||
bool import_nested_,
|
||||
bool allow_missing_columns_,
|
||||
bool null_as_default_,
|
||||
bool case_insensitive_matching_)
|
||||
: header(header_)
|
||||
, format_name(format_name_)
|
||||
, import_nested(import_nested_)
|
||||
, allow_missing_columns(allow_missing_columns_)
|
||||
, null_as_default(null_as_default_)
|
||||
, case_insensitive_matching(case_insensitive_matching_)
|
||||
@ -1080,42 +1078,40 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
|
||||
if (!name_to_column_ptr.contains(search_column_name))
|
||||
{
|
||||
bool read_from_nested = false;
|
||||
/// Check if it's a column from nested table.
|
||||
if (import_nested)
|
||||
/// Check if it's a subcolumn from some struct.
|
||||
String nested_table_name = Nested::extractTableName(header_column.name);
|
||||
String search_nested_table_name = nested_table_name;
|
||||
if (case_insensitive_matching)
|
||||
boost::to_lower(search_nested_table_name);
|
||||
if (name_to_column_ptr.contains(search_nested_table_name))
|
||||
{
|
||||
String nested_table_name = Nested::extractTableName(header_column.name);
|
||||
String search_nested_table_name = nested_table_name;
|
||||
if (case_insensitive_matching)
|
||||
boost::to_lower(search_nested_table_name);
|
||||
if (name_to_column_ptr.contains(search_nested_table_name))
|
||||
if (!nested_tables.contains(search_nested_table_name))
|
||||
{
|
||||
if (!nested_tables.contains(search_nested_table_name))
|
||||
NamesAndTypesList nested_columns;
|
||||
for (const auto & name_and_type : header.getNamesAndTypesList())
|
||||
{
|
||||
NamesAndTypesList nested_columns;
|
||||
for (const auto & name_and_type : header.getNamesAndTypesList())
|
||||
{
|
||||
if (name_and_type.name.starts_with(nested_table_name + "."))
|
||||
nested_columns.push_back(name_and_type);
|
||||
}
|
||||
auto nested_table_type = Nested::collect(nested_columns).front().type;
|
||||
if (name_and_type.name.starts_with(nested_table_name + "."))
|
||||
nested_columns.push_back(name_and_type);
|
||||
}
|
||||
auto nested_table_type = Nested::collect(nested_columns).front().type;
|
||||
|
||||
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[search_nested_table_name];
|
||||
ColumnsWithTypeAndName cols = {readColumnFromArrowColumn(
|
||||
arrow_column, nested_table_name, format_name, false, dictionary_infos, true, false, skipped, nested_table_type)};
|
||||
BlockPtr block_ptr = std::make_shared<Block>(cols);
|
||||
auto column_extractor = std::make_shared<NestedColumnExtractHelper>(*block_ptr, case_insensitive_matching);
|
||||
nested_tables[search_nested_table_name] = {block_ptr, column_extractor};
|
||||
}
|
||||
auto nested_column = nested_tables[search_nested_table_name].second->extractColumn(search_column_name);
|
||||
if (nested_column)
|
||||
{
|
||||
column = *nested_column;
|
||||
if (case_insensitive_matching)
|
||||
column.name = header_column.name;
|
||||
read_from_nested = true;
|
||||
}
|
||||
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[search_nested_table_name];
|
||||
ColumnsWithTypeAndName cols = {readColumnFromArrowColumn(
|
||||
arrow_column, nested_table_name, format_name, false, dictionary_infos, true, false, skipped, nested_table_type)};
|
||||
BlockPtr block_ptr = std::make_shared<Block>(cols);
|
||||
auto column_extractor = std::make_shared<NestedColumnExtractHelper>(*block_ptr, case_insensitive_matching);
|
||||
nested_tables[search_nested_table_name] = {block_ptr, column_extractor};
|
||||
}
|
||||
auto nested_column = nested_tables[search_nested_table_name].second->extractColumn(search_column_name);
|
||||
if (nested_column)
|
||||
{
|
||||
column = *nested_column;
|
||||
if (case_insensitive_matching)
|
||||
column.name = header_column.name;
|
||||
read_from_nested = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!read_from_nested)
|
||||
{
|
||||
if (!allow_missing_columns)
|
||||
|
@ -24,7 +24,6 @@ public:
|
||||
ArrowColumnToCHColumn(
|
||||
const Block & header_,
|
||||
const std::string & format_name_,
|
||||
bool import_nested_,
|
||||
bool allow_missing_columns_,
|
||||
bool null_as_default_,
|
||||
bool case_insensitive_matching_ = false);
|
||||
@ -53,7 +52,6 @@ public:
|
||||
private:
|
||||
const Block & header;
|
||||
const std::string format_name;
|
||||
bool import_nested;
|
||||
/// If false, throw exception if some columns in header not exists in arrow table.
|
||||
bool allow_missing_columns;
|
||||
bool null_as_default;
|
||||
|
@ -1258,6 +1258,8 @@ void registerInputFormatAvro(FormatFactory & factory)
|
||||
{
|
||||
return std::make_shared<AvroConfluentRowInputFormat>(sample, buf, params, settings);
|
||||
});
|
||||
|
||||
factory.markFormatSupportsSubsetOfColumns("AvroConfluent");
|
||||
}
|
||||
|
||||
void registerAvroSchemaReader(FormatFactory & factory)
|
||||
|
@ -125,16 +125,12 @@ void ORCBlockInputFormat::prepareReader()
|
||||
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
|
||||
getPort().getHeader(),
|
||||
"ORC",
|
||||
format_settings.orc.import_nested,
|
||||
format_settings.orc.allow_missing_columns,
|
||||
format_settings.null_as_default,
|
||||
format_settings.orc.case_insensitive_column_matching);
|
||||
|
||||
const bool ignore_case = format_settings.orc.case_insensitive_column_matching;
|
||||
std::unordered_set<String> nested_table_names;
|
||||
if (format_settings.orc.import_nested)
|
||||
nested_table_names = Nested::getAllTableNames(getPort().getHeader(), ignore_case);
|
||||
|
||||
std::unordered_set<String> nested_table_names = Nested::getAllTableNames(getPort().getHeader(), ignore_case);
|
||||
for (int i = 0; i < schema->num_fields(); ++i)
|
||||
{
|
||||
const auto & name = schema->field(i)->name();
|
||||
@ -171,7 +167,6 @@ void registerInputFormatORC(FormatFactory & factory)
|
||||
{
|
||||
return std::make_shared<ORCBlockInputFormat>(buf, sample, settings);
|
||||
});
|
||||
factory.markFormatSupportsSubcolumns("ORC");
|
||||
factory.markFormatSupportsSubsetOfColumns("ORC");
|
||||
}
|
||||
|
||||
|
@ -148,7 +148,6 @@ void ParquetBlockInputFormat::initializeRowGroupReader(size_t row_group_idx)
|
||||
row_group.arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
|
||||
getPort().getHeader(),
|
||||
"Parquet",
|
||||
format_settings.parquet.import_nested,
|
||||
format_settings.parquet.allow_missing_columns,
|
||||
format_settings.null_as_default,
|
||||
format_settings.parquet.case_insensitive_column_matching);
|
||||
@ -420,7 +419,6 @@ void registerInputFormatParquet(FormatFactory & factory)
|
||||
max_parsing_threads,
|
||||
min_bytes_for_seek);
|
||||
});
|
||||
factory.markFormatSupportsSubcolumns("Parquet");
|
||||
factory.markFormatSupportsSubsetOfColumns("Parquet");
|
||||
}
|
||||
|
||||
|
@ -504,7 +504,6 @@ void registerInputFormatParquetMetadata(FormatFactory & factory)
|
||||
{
|
||||
return std::make_shared<ParquetMetadataInputFormat>(buf, sample, settings);
|
||||
});
|
||||
factory.markFormatSupportsSubcolumns("ParquetMetadata");
|
||||
factory.markFormatSupportsSubsetOfColumns("ParquetMetadata");
|
||||
}
|
||||
|
||||
|
35
src/Processors/Transforms/ExtractColumnsTransform.cpp
Normal file
35
src/Processors/Transforms/ExtractColumnsTransform.cpp
Normal file
@ -0,0 +1,35 @@
|
||||
#include <Processors/Transforms/ExtractColumnsTransform.h>
|
||||
#include <Interpreters/getColumnFromBlock.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ExtractColumnsTransform::ExtractColumnsTransform(const Block & header_, const NamesAndTypesList & requested_columns_)
|
||||
: ISimpleTransform(header_, transformHeader(header_, requested_columns_), false), requested_columns(requested_columns_)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
Block ExtractColumnsTransform::transformHeader(Block header, const NamesAndTypesList & requested_columns_)
|
||||
{
|
||||
ColumnsWithTypeAndName columns;
|
||||
columns.reserve(requested_columns_.size());
|
||||
for (const auto & required_column : requested_columns_)
|
||||
columns.emplace_back(getColumnFromBlock(header, required_column), required_column.type, required_column.name);
|
||||
|
||||
return Block(std::move(columns));
|
||||
}
|
||||
|
||||
void ExtractColumnsTransform::transform(Chunk & chunk)
|
||||
{
|
||||
size_t num_rows = chunk.getNumRows();
|
||||
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
|
||||
Columns columns;
|
||||
columns.reserve(requested_columns.size());
|
||||
for (const auto & required_column : requested_columns)
|
||||
columns.emplace_back(getColumnFromBlock(block, required_column));
|
||||
|
||||
chunk.setColumns(std::move(columns), num_rows);
|
||||
}
|
||||
|
||||
}
|
26
src/Processors/Transforms/ExtractColumnsTransform.h
Normal file
26
src/Processors/Transforms/ExtractColumnsTransform.h
Normal file
@ -0,0 +1,26 @@
|
||||
#pragma once
|
||||
#include <Processors/ISimpleTransform.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Extracts required columns and subcolumns from the block.
|
||||
class ExtractColumnsTransform final : public ISimpleTransform
|
||||
{
|
||||
public:
|
||||
ExtractColumnsTransform(
|
||||
const Block & header_,
|
||||
const NamesAndTypesList & requested_columns_);
|
||||
|
||||
String getName() const override { return "ExtractColumnsTransform"; }
|
||||
|
||||
static Block transformHeader(Block header, const NamesAndTypesList & requested_columns_);
|
||||
|
||||
protected:
|
||||
void transform(Chunk & chunk) override;
|
||||
|
||||
private:
|
||||
const NamesAndTypesList requested_columns;
|
||||
};
|
||||
|
||||
}
|
@ -281,7 +281,7 @@ Chain buildPushingToViewsChain(
|
||||
/// and switch back to the original thread_status.
|
||||
auto * original_thread = current_thread;
|
||||
SCOPE_EXIT({ current_thread = original_thread; });
|
||||
|
||||
current_thread = nullptr;
|
||||
std::unique_ptr<ThreadStatus> view_thread_status_ptr = std::make_unique<ThreadStatus>(/*check_current_thread_on_destruction=*/ false);
|
||||
/// Copy of a ThreadStatus should be internal.
|
||||
view_thread_status_ptr->setInternalThread();
|
||||
|
@ -281,7 +281,6 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
|
||||
ArrowColumnToCHColumn column_reader(
|
||||
header, "Parquet",
|
||||
format_settings.parquet.import_nested,
|
||||
format_settings.parquet.allow_missing_columns,
|
||||
/* null_as_default */true,
|
||||
/* case_insensitive_column_matching */false);
|
||||
|
@ -42,8 +42,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
|
||||
off_t file_offset = 0;
|
||||
off_t read_until_position = 0;
|
||||
|
||||
std::optional<size_t> file_size;
|
||||
off_t file_size;
|
||||
|
||||
explicit ReadBufferFromHDFSImpl(
|
||||
const std::string & hdfs_uri_,
|
||||
@ -59,7 +58,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
, builder(createHDFSBuilder(hdfs_uri_, config_))
|
||||
, read_settings(read_settings_)
|
||||
, read_until_position(read_until_position_)
|
||||
, file_size(file_size_)
|
||||
{
|
||||
fs = createHDFSFS(builder.get());
|
||||
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
|
||||
@ -68,6 +66,22 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
throw Exception(ErrorCodes::CANNOT_OPEN_FILE,
|
||||
"Unable to open HDFS file: {}. Error: {}",
|
||||
hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError()));
|
||||
|
||||
if (file_size_.has_value())
|
||||
{
|
||||
file_size = file_size_.value();
|
||||
}
|
||||
else
|
||||
{
|
||||
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
|
||||
if (!file_info)
|
||||
{
|
||||
hdfsCloseFile(fs.get(), fin);
|
||||
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
|
||||
}
|
||||
file_size = static_cast<size_t>(file_info->mSize);
|
||||
hdfsFreeFileInfo(file_info, 1);
|
||||
}
|
||||
}
|
||||
|
||||
~ReadBufferFromHDFSImpl() override
|
||||
@ -75,16 +89,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
hdfsCloseFile(fs.get(), fin);
|
||||
}
|
||||
|
||||
size_t getFileSize()
|
||||
size_t getFileSize() const
|
||||
{
|
||||
if (file_size)
|
||||
return *file_size;
|
||||
|
||||
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
|
||||
if (!file_info)
|
||||
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
|
||||
file_size = static_cast<size_t>(file_info->mSize);
|
||||
return *file_size;
|
||||
return file_size;
|
||||
}
|
||||
|
||||
bool nextImpl() override
|
||||
@ -104,6 +111,10 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
{
|
||||
num_bytes_to_read = internal_buffer.size();
|
||||
}
|
||||
if (file_size != 0 && file_offset >= file_size)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
ResourceGuard rlock(read_settings.resource_link, num_bytes_to_read);
|
||||
int bytes_read;
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Processors/Formats/IInputFormat.h>
|
||||
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
||||
#include <Processors/Transforms/ExtractColumnsTransform.h>
|
||||
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/CompressionMethod.h>
|
||||
@ -114,9 +115,9 @@ namespace
|
||||
{
|
||||
if (next_slash_after_glob_pos == std::string::npos)
|
||||
{
|
||||
result.emplace_back(
|
||||
result.emplace_back(StorageHDFS::PathWithInfo{
|
||||
String(ls.file_info[i].mName),
|
||||
StorageHDFS::PathInfo{ls.file_info[i].mLastMod, static_cast<size_t>(ls.file_info[i].mSize)});
|
||||
StorageHDFS::PathInfo{ls.file_info[i].mLastMod, static_cast<size_t>(ls.file_info[i].mSize)}});
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -461,30 +462,21 @@ StorageHDFS::PathWithInfo HDFSSource::URISIterator::next()
|
||||
return pimpl->next();
|
||||
}
|
||||
|
||||
Block HDFSSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
|
||||
{
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
|
||||
|
||||
return sample_block;
|
||||
}
|
||||
|
||||
HDFSSource::HDFSSource(
|
||||
const ReadFromFormatInfo & info,
|
||||
StorageHDFSPtr storage_,
|
||||
const Block & block_for_format_,
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
ContextPtr context_,
|
||||
UInt64 max_block_size_,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
ColumnsDescription columns_description_)
|
||||
: ISource(getHeader(block_for_format_, requested_virtual_columns_), false)
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_)
|
||||
: ISource(info.source_header, false)
|
||||
, WithContext(context_)
|
||||
, storage(std::move(storage_))
|
||||
, block_for_format(block_for_format_)
|
||||
, requested_virtual_columns(requested_virtual_columns_)
|
||||
, block_for_format(info.format_header)
|
||||
, requested_columns(info.requested_columns)
|
||||
, requested_virtual_columns(info.requested_virtual_columns)
|
||||
, max_block_size(max_block_size_)
|
||||
, file_iterator(file_iterator_)
|
||||
, columns_description(std::move(columns_description_))
|
||||
, columns_description(info.columns_description)
|
||||
{
|
||||
initialize();
|
||||
}
|
||||
@ -533,6 +525,14 @@ bool HDFSSource::initialize()
|
||||
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *input_format, getContext());
|
||||
});
|
||||
}
|
||||
|
||||
/// Add ExtractColumnsTransform to extract requested columns/subcolumns
|
||||
/// from chunk read by IInputFormat.
|
||||
builder.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<ExtractColumnsTransform>(header, requested_columns);
|
||||
});
|
||||
|
||||
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
|
||||
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
|
||||
return true;
|
||||
@ -721,7 +721,7 @@ private:
|
||||
|
||||
bool StorageHDFS::supportsSubsetOfColumns() const
|
||||
{
|
||||
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
|
||||
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
|
||||
}
|
||||
|
||||
Pipe StorageHDFS::read(
|
||||
@ -759,50 +759,17 @@ Pipe StorageHDFS::read(
|
||||
});
|
||||
}
|
||||
|
||||
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
|
||||
std::vector<NameAndTypePair> requested_virtual_columns;
|
||||
|
||||
for (const auto & virtual_column : getVirtuals())
|
||||
{
|
||||
if (column_names_set.contains(virtual_column.name))
|
||||
requested_virtual_columns.push_back(virtual_column);
|
||||
}
|
||||
|
||||
ColumnsDescription columns_description;
|
||||
Block block_for_format;
|
||||
if (supportsSubsetOfColumns())
|
||||
{
|
||||
auto fetch_columns = column_names;
|
||||
const auto & virtuals = getVirtuals();
|
||||
std::erase_if(
|
||||
fetch_columns,
|
||||
[&](const String & col)
|
||||
{ return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); });
|
||||
|
||||
if (fetch_columns.empty())
|
||||
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
|
||||
|
||||
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
|
||||
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
|
||||
}
|
||||
else
|
||||
{
|
||||
columns_description = storage_snapshot->metadata->getColumns();
|
||||
block_for_format = storage_snapshot->metadata->getSampleBlock();
|
||||
}
|
||||
|
||||
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
|
||||
Pipes pipes;
|
||||
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
pipes.emplace_back(std::make_shared<HDFSSource>(
|
||||
read_from_format_info,
|
||||
this_ptr,
|
||||
block_for_format,
|
||||
requested_virtual_columns,
|
||||
context_,
|
||||
max_block_size,
|
||||
iterator_wrapper,
|
||||
columns_description));
|
||||
iterator_wrapper));
|
||||
}
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
}
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Processors/ISource.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/Cache/SchemaCache.h>
|
||||
#include <Storages/prepareReadingFromFormat.h>
|
||||
#include <Poco/URI.h>
|
||||
|
||||
namespace DB
|
||||
@ -76,6 +77,8 @@ public:
|
||||
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
|
||||
bool supportsSubsetOfColumns() const override;
|
||||
|
||||
bool supportsSubcolumns() const override { return true; }
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
const String & format,
|
||||
const String & uri,
|
||||
@ -142,16 +145,12 @@ public:
|
||||
using IteratorWrapper = std::function<StorageHDFS::PathWithInfo()>;
|
||||
using StorageHDFSPtr = std::shared_ptr<StorageHDFS>;
|
||||
|
||||
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
|
||||
|
||||
HDFSSource(
|
||||
const ReadFromFormatInfo & info,
|
||||
StorageHDFSPtr storage_,
|
||||
const Block & block_for_format_,
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
ContextPtr context_,
|
||||
UInt64 max_block_size_,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
ColumnsDescription columns_description_);
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_);
|
||||
|
||||
String getName() const override;
|
||||
|
||||
@ -160,7 +159,8 @@ public:
|
||||
private:
|
||||
StorageHDFSPtr storage;
|
||||
Block block_for_format;
|
||||
std::vector<NameAndTypePair> requested_virtual_columns;
|
||||
NamesAndTypesList requested_columns;
|
||||
NamesAndTypesList requested_virtual_columns;
|
||||
UInt64 max_block_size;
|
||||
std::shared_ptr<IteratorWrapper> file_iterator;
|
||||
ColumnsDescription columns_description;
|
||||
|
@ -37,6 +37,8 @@ public:
|
||||
|
||||
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
|
||||
|
||||
bool supportsSubcolumns() const override { return true; }
|
||||
|
||||
private:
|
||||
void addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) override;
|
||||
|
||||
|
@ -331,6 +331,7 @@ void DataPartStorageOnDiskBase::backup(
|
||||
const NameSet & files_without_checksums,
|
||||
const String & path_in_backup,
|
||||
const BackupSettings & backup_settings,
|
||||
const ReadSettings & read_settings,
|
||||
bool make_temporary_hard_links,
|
||||
BackupEntries & backup_entries,
|
||||
TemporaryFilesOnDisks * temp_dirs) const
|
||||
@ -382,7 +383,7 @@ void DataPartStorageOnDiskBase::backup(
|
||||
|
||||
if (files_without_checksums.contains(filepath))
|
||||
{
|
||||
backup_entries.emplace_back(filepath_in_backup, std::make_unique<BackupEntryFromSmallFile>(disk, filepath_on_disk, copy_encrypted));
|
||||
backup_entries.emplace_back(filepath_in_backup, std::make_unique<BackupEntryFromSmallFile>(disk, filepath_on_disk, read_settings, copy_encrypted));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -55,6 +55,7 @@ public:
|
||||
const NameSet & files_without_checksums,
|
||||
const String & path_in_backup,
|
||||
const BackupSettings & backup_settings,
|
||||
const ReadSettings & read_settings,
|
||||
bool make_temporary_hard_links,
|
||||
BackupEntries & backup_entries,
|
||||
TemporaryFilesOnDisks * temp_dirs) const override;
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user