Merge branch 'master' into fix-arrayfold

This commit is contained in:
Alexey Milovidov 2023-12-18 04:57:07 +01:00 committed by GitHub
commit 33cf8842bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
248 changed files with 2522 additions and 1556 deletions

3
.gitmodules vendored
View File

@ -354,6 +354,9 @@
[submodule "contrib/aklomp-base64"]
path = contrib/aklomp-base64
url = https://github.com/aklomp/base64.git
[submodule "contrib/pocketfft"]
path = contrib/pocketfft
url = https://github.com/mreineck/pocketfft.git
[submodule "contrib/sqids-cpp"]
path = contrib/sqids-cpp
url = https://github.com/sqids/sqids-cpp.git

View File

@ -105,7 +105,6 @@
* Rewrite equality with `is null` check in JOIN ON section. Experimental *Analyzer only*. [#56538](https://github.com/ClickHouse/ClickHouse/pull/56538) ([vdimir](https://github.com/vdimir)).
* Function`concat` now supports arbitrary argument types (instead of only String and FixedString arguments). This makes it behave more similar to MySQL `concat` implementation. For example, `SELECT concat('ab', 42)` now returns `ab42`. [#56540](https://github.com/ClickHouse/ClickHouse/pull/56540) ([Serge Klochkov](https://github.com/slvrtrn)).
* Allow getting cache configuration from 'named_collection' section in config or from SQL created named collections. [#56541](https://github.com/ClickHouse/ClickHouse/pull/56541) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Update `query_masking_rules` when reloading the config ([#56449](https://github.com/ClickHouse/ClickHouse/issues/56449)). [#56573](https://github.com/ClickHouse/ClickHouse/pull/56573) ([Mikhail Koviazin](https://github.com/mkmkme)).
* PostgreSQL database engine: Make the removal of outdated tables less aggressive with unsuccessful postgres connection. [#56609](https://github.com/ClickHouse/ClickHouse/pull/56609) ([jsc0218](https://github.com/jsc0218)).
* It took too much time to connnect to PG when URL is not right, so the relevant query stucks there and get cancelled. [#56648](https://github.com/ClickHouse/ClickHouse/pull/56648) ([jsc0218](https://github.com/jsc0218)).
* Keeper improvement: disable compressed logs by default in Keeper. [#56763](https://github.com/ClickHouse/ClickHouse/pull/56763) ([Antonio Andelic](https://github.com/antonio2368)).

View File

@ -44,6 +44,7 @@ else ()
endif ()
add_contrib (miniselect-cmake miniselect)
add_contrib (pdqsort-cmake pdqsort)
add_contrib (pocketfft-cmake pocketfft)
add_contrib (crc32-vpmsum-cmake crc32-vpmsum)
add_contrib (sparsehash-c11-cmake sparsehash-c11)
add_contrib (abseil-cpp-cmake abseil-cpp)

1
contrib/pocketfft vendored Submodule

@ -0,0 +1 @@
Subproject commit 9efd4da52cf8d28d14531d14e43ad9d913807546

View File

@ -0,0 +1,10 @@
option (ENABLE_POCKETFFT "Enable pocketfft" ${ENABLE_LIBRARIES})
if (NOT ENABLE_POCKETFFT)
message(STATUS "Not using pocketfft")
return()
endif()
add_library(_pocketfft INTERFACE)
target_include_directories(_pocketfft INTERFACE ${ClickHouse_SOURCE_DIR}/contrib/pocketfft)
add_library(ch_contrib::pocketfft ALIAS _pocketfft)

View File

@ -49,6 +49,7 @@ RUN curl https://sh.rustup.rs -sSf | bash -s -- -y && \
chmod 777 -R /rust && \
rustup toolchain install nightly-2023-07-04 && \
rustup default nightly-2023-07-04 && \
rustup toolchain remove stable && \
rustup component add rust-src && \
rustup target add x86_64-unknown-linux-gnu && \
rustup target add aarch64-unknown-linux-gnu && \

View File

@ -74,7 +74,7 @@ RUN python3 -m pip install --no-cache-dir \
delta-spark==2.3.0 \
dict2xml \
dicttoxml \
docker \
docker==6.1.3 \
docker-compose==1.29.2 \
grpcio \
grpcio-tools \

View File

@ -300,9 +300,6 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
rg -Fa "<Fatal>" /var/log/clickhouse-server/clickhouse-server2.log ||:
zstd --threads=0 < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.zst ||:
zstd --threads=0 < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.zst ||:
# FIXME: remove once only github actions will be left
rm /var/log/clickhouse-server/clickhouse-server1.log
rm /var/log/clickhouse-server/clickhouse-server2.log
mv /var/log/clickhouse-server/stderr1.log /test_output/ ||:
mv /var/log/clickhouse-server/stderr2.log /test_output/ ||:
tar -chf /test_output/coordination1.tar /var/lib/clickhouse1/coordination ||:

View File

@ -77,6 +77,7 @@ remove_keeper_config "create_if_not_exists" "[01]"
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml
@ -115,6 +116,7 @@ sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_defau
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml

View File

@ -520,7 +520,7 @@ Indexes of type `set` can be utilized by all functions. The other index types ar
| [empty](/docs/en/sql-reference/functions/array-functions#function-empty) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [notEmpty](/docs/en/sql-reference/functions/array-functions#function-notempty) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [has](/docs/en/sql-reference/functions/array-functions#function-has) | ✗ | ✗ | ✔ | ✔ | ✔ | ✔ |
| [hasAny](/docs/en/sql-reference/functions/array-functions#function-hasAny) | ✗ | ✗ | ✗ | ✗ | ✔ | ✗ |
| [hasAny](/docs/en/sql-reference/functions/array-functions#function-hasAny) | ✗ | ✗ | ✔ | ✔ | ✔ | ✗ |
| [hasAll](/docs/en/sql-reference/functions/array-functions#function-hasAll) | ✗ | ✗ | ✗ | ✗ | ✔ | ✗ |
| hasToken | ✗ | ✗ | ✗ | ✔ | ✗ | ✔ |
| hasTokenOrNull | ✗ | ✗ | ✗ | ✔ | ✗ | ✔ |

View File

@ -25,7 +25,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, clean_deleted_rows=value, ...]
[SETTINGS name=value, ...]
```
For a description of request parameters, see [statement description](../../../sql-reference/statements/create/table.md).
@ -88,53 +88,6 @@ SELECT * FROM mySecondReplacingMT FINAL;
└─────┴─────────┴─────────────────────┘
```
### is_deleted
`is_deleted` — Name of a column used during a merge to determine whether the data in this row represents the state or is to be deleted; `1` is a “deleted“ row, `0` is a “state“ row.
Column data type — `UInt8`.
:::note
`is_deleted` can only be enabled when `ver` is used.
The row is deleted when `OPTIMIZE ... FINAL CLEANUP` or `OPTIMIZE ... FINAL` is used, or if the engine setting `clean_deleted_rows` has been set to `Always`.
No matter the operation on the data, the version must be increased. If two inserted rows have the same version number, the last inserted row is the one kept.
:::
Example:
```sql
-- with ver and is_deleted
CREATE OR REPLACE TABLE myThirdReplacingMT
(
`key` Int64,
`someCol` String,
`eventTime` DateTime,
`is_deleted` UInt8
)
ENGINE = ReplacingMergeTree(eventTime, is_deleted)
ORDER BY key;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 0);
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 1);
select * from myThirdReplacingMT final;
0 rows in set. Elapsed: 0.003 sec.
-- delete rows with is_deleted
OPTIMIZE TABLE myThirdReplacingMT FINAL CLEANUP;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 00:00:00', 0);
select * from myThirdReplacingMT final;
┌─key─┬─someCol─┬───────────eventTime─┬─is_deleted─┐
│ 1 │ first │ 2020-01-01 00:00:00 │ 0 │
└─────┴─────────┴─────────────────────┴────────────┘
```
## Query clauses
When creating a `ReplacingMergeTree` table the same [clauses](../../../engines/table-engines/mergetree-family/mergetree.md) are required, as when creating a `MergeTree` table.

View File

@ -406,7 +406,7 @@ RESTORE TABLE data AS data_restored FROM Disk('s3_plain', 'cloud_backup');
:::note
But keep in mind that:
- This disk should not be used for `MergeTree` itself, only for `BACKUP`/`RESTORE`
- If your tables are backed by S3 storage, it doesn't use `CopyObject` calls to copy parts to the destination bucket, instead, it downloads and uploads them, which is very inefficient. Prefer to use `BACKUP ... TO S3(<endpoint>)` syntax for this use-case.
- If your tables are backed by S3 storage and types of the disks are different, it doesn't use `CopyObject` calls to copy parts to the destination bucket, instead, it downloads and uploads them, which is very inefficient. Prefer to use `BACKUP ... TO S3(<endpoint>)` syntax for this use-case.
:::
## Alternatives

View File

@ -1683,7 +1683,7 @@ Default value: `0.5`.
Asynchronous loading of databases and tables.
If `true` all non-system databases with `Ordinary`, `Atomic` and `Replicated` engine will be loaded asynchronously after the ClickHouse server start up. See `system.async_loader` table, `tables_loader_background_pool_size` and `tables_loader_foreground_pool_size` server settings. Any query that tries to access a table, that is not yet loaded, will wait for exactly this table to be started up. If load job fails, query will rethrow an error (instead of shutting down the whole server in case of `async_load_databases = false`). The table that is waited for by at least one query will be loaded with higher priority. DDL queries on a database will wait for exactly that database to be started up.
If `true` all non-system databases with `Ordinary`, `Atomic` and `Replicated` engine will be loaded asynchronously after the ClickHouse server start up. See `system.asynchronous_loader` table, `tables_loader_background_pool_size` and `tables_loader_foreground_pool_size` server settings. Any query that tries to access a table, that is not yet loaded, will wait for exactly this table to be started up. If load job fails, query will rethrow an error (instead of shutting down the whole server in case of `async_load_databases = false`). The table that is waited for by at least one query will be loaded with higher priority. DDL queries on a database will wait for exactly that database to be started up.
If `false`, all databases are loaded when the server starts.

View File

@ -852,16 +852,6 @@ If the file name for column is too long (more than `max_file_name_length` bytes)
The maximal length of the file name to keep it as is without hashing. Takes effect only if setting `replace_long_file_name_to_hash` is enabled. The value of this setting does not include the length of file extension. So, it is recommended to set it below the maximum filename length (usually 255 bytes) with some gap to avoid filesystem errors. Default value: 127.
## clean_deleted_rows
Enable/disable automatic deletion of rows flagged as `is_deleted` when perform `OPTIMIZE ... FINAL` on a table using the ReplacingMergeTree engine. When disabled, the `CLEANUP` keyword has to be added to the `OPTIMIZE ... FINAL` to have the same behaviour.
Possible values:
- `Always` or `Never`.
Default value: `Never`
## allow_experimental_block_number_column
Persists virtual column `_block_number` on merges.

View File

@ -1,7 +1,7 @@
---
slug: /en/operations/system-tables/async_loader
slug: /en/operations/system-tables/asynchronous_loader
---
# async_loader
# asynchronous_loader
Contains information and status for recent asynchronous jobs (e.g. for tables loading). The table contains a row for every job. There is a tool for visualizing information from this table `utils/async_loader_graph`.
@ -9,7 +9,7 @@ Example:
``` sql
SELECT *
FROM system.async_loader
FROM system.asynchronous_loader
FORMAT Vertical
LIMIT 1
```

View File

@ -53,7 +53,6 @@ clickhouse-benchmark [keys] < queries_file;
- `--confidence=N` — Level of confidence for T-test. Possible values: 0 (80%), 1 (90%), 2 (95%), 3 (98%), 4 (99%), 5 (99.5%). Default value: 5. In the [comparison mode](#clickhouse-benchmark-comparison-mode) `clickhouse-benchmark` performs the [Independent two-sample Students t-test](https://en.wikipedia.org/wiki/Student%27s_t-test#Independent_two-sample_t-test) to determine whether the two distributions arent different with the selected level of confidence.
- `--cumulative` — Printing cumulative data instead of data per interval.
- `--database=DATABASE_NAME` — ClickHouse database name. Default value: `default`.
- `--json=FILEPATH``JSON` output. When the key is set, `clickhouse-benchmark` outputs a report to the specified JSON-file.
- `--user=USERNAME` — ClickHouse user name. Default value: `default`.
- `--password=PSWD` — ClickHouse user password. Default value: empty string.
- `--stacktrace` — Stack traces output. When the key is set, `clickhouse-bencmark` outputs stack traces of exceptions.

View File

@ -216,6 +216,7 @@ Arguments:
- `--logger.level` — Log level.
- `--ignore-error` — do not stop processing if a query failed.
- `-c`, `--config-file` — path to configuration file in same format as for ClickHouse server, by default the configuration empty.
- `--no-system-tables` — do not attach system tables.
- `--help` — arguments references for `clickhouse-local`.
- `-V`, `--version` — print version information and exit.

View File

@ -0,0 +1,59 @@
---
slug: /en/sql-reference/functions/time-series-functions
sidebar_position: 172
sidebar_label: Time Series
---
# Time Series Functions
Below functions are used for time series analysis.
## seriesPeriodDetectFFT
Finds the period of the given time series data using FFT
FFT - [Fast Fourier transform](https://en.wikipedia.org/wiki/Fast_Fourier_transform)
**Syntax**
``` sql
seriesPeriodDetectFFT(series);
```
**Arguments**
- `series` - An array of numeric values
**Returned value**
- A real value equal to the period of time series
- Returns NAN when number of data points are less than four.
Type: [Float64](../../sql-reference/data-types/float.md).
**Examples**
Query:
``` sql
SELECT seriesPeriodDetectFFT([1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6]) AS print_0;
```
Result:
``` text
┌───────────print_0──────┐
│ 3 │
└────────────────────────┘
```
``` sql
SELECT seriesPeriodDetectFFT(arrayMap(x -> abs((x % 6) - 3), range(1000))) AS print_0;
```
Result:
``` text
┌─print_0─┐
│ 6 │
└─────────┘
```

View File

@ -128,17 +128,17 @@ Reading data from `table.csv`, located in `archive1.zip` or/and `archive2.zip`:
SELECT * FROM file('user_files/archives/archive{1..2}.zip :: table.csv');
```
## Globbing {#globs_in_path}
## Globs in path {#globs_in_path}
Paths may use globbing. Files must match the whole path pattern, not only the suffix or prefix.
- `*` — Represents arbitrarily many characters except `/` but including the empty string.
- `?` — Represents an arbitrary single character.
- `{some_string,another_string,yet_another_one}`Represents any of alternative strings `'some_string', 'another_string', 'yet_another_one'`. The strings may contain `/`.
- `{some_string,another_string,yet_another_one}`Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`. The strings can contain the `/` symbol.
- `{N..M}` — Represents any number `>= N` and `<= M`.
- `**` - Represents all files inside a folder recursively.
Constructions with `{}` are similar to the [remote](remote.md) table function.
Constructions with `{}` are similar to the [remote](remote.md) and [hdfs](hdfs.md) table functions.
**Example**

View File

@ -41,14 +41,14 @@ LIMIT 2
## Globs in path {#globs_in_path}
Multiple path components can have globs. For being processed file should exists and matches to the whole path pattern (not only suffix or prefix).
Paths may use globbing. Files must match the whole path pattern, not only the suffix or prefix.
- `*`Substitutes any number of any characters except `/` including empty string.
- `?`Substitutes any single character.
- `*`Represents arbitrarily many characters except `/` but including the empty string.
- `?`Represents an arbitrary single character.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`. The strings can contain the `/` symbol.
- `{N..M}`Substitutes any number in range from N to M including both borders.
- `{N..M}`Represents any number `>= N` and `<= M`.
Constructions with `{}` are similar to the [remote](../../sql-reference/table-functions/remote.md)) table function.
Constructions with `{}` are similar to the [remote](remote.md) and [file](file.md) table functions.
**Example**

View File

@ -369,6 +369,9 @@ INDEX b (u64 * length(str), i32 + f64 * 100, date, str) TYPE set(100) GRANULARIT
| [greaterOrEquals (\>=)](../../../sql-reference/functions/comparison-functions.md#greaterorequals) | ✔ | ✔ | ✗ | ✗ | ✗ |
| [empty](../../../sql-reference/functions/array-functions.md#function-empty) | ✔ | ✔ | ✗ | ✗ | ✗ |
| [notEmpty](../../../sql-reference/functions/array-functions.md#function-notempty) | ✔ | ✔ | ✗ | ✗ | ✗ |
| [has](../../../sql-reference/functions/array-functions.md#function-has) | ✗ | ✗ | ✔ | ✔ | ✔ | ✔ |
| [hasAny](../../../sql-reference/functions/array-functions.md#function-hasAny) | ✗ | ✗ | ✔ | ✔ | ✔ | ✗ |
| [hasAll](../../../sql-reference/functions/array-functions.md#function-hasAll) | ✗ | ✗ | ✗ | ✗ | ✔ | ✗ |
| hasToken | ✗ | ✗ | ✗ | ✔ | ✗ |
Функции с постоянным агрументом, который меньше, чем размер ngram не могут использовать индекс `ngrambf_v1` для оптимизации запроса.

View File

@ -86,59 +86,6 @@ SELECT * FROM mySecondReplacingMT FINAL;
│ 1 │ first │ 2020-01-01 01:01:01 │
└─────┴─────────┴─────────────────────┘
```
### is_deleted
`is_deleted` — Имя столбца, который используется во время слияния для обозначения того, нужно ли отображать строку или она подлежит удалению; `1` - для удаления строки, `0` - для отображения строки.
Тип данных столбца — `UInt8`.
:::note
`is_deleted` может быть использован, если `ver` используется.
Строка удаляется в следующих случаях:
- при использовании инструкции `OPTIMIZE ... FINAL CLEANUP`
- при использовании инструкции `OPTIMIZE ... FINAL`
- параметр движка `clean_deleted_rows` установлен в значение `Always` (по умолчанию - `Never`)
- есть новые версии строки
Не рекомендуется выполнять `FINAL CLEANUP` или использовать параметр движка `clean_deleted_rows` со значением `Always`, это может привести к неожиданным результатам, например удаленные строки могут вновь появиться.
Вне зависимости от производимых изменений над данными, версия должна увеличиваться. Если у двух строк одна и та же версия, то остается только последняя вставленная строка.
:::
Пример:
```sql
-- with ver and is_deleted
CREATE OR REPLACE TABLE myThirdReplacingMT
(
`key` Int64,
`someCol` String,
`eventTime` DateTime,
`is_deleted` UInt8
)
ENGINE = ReplacingMergeTree(eventTime, is_deleted)
ORDER BY key;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 0);
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 01:01:01', 1);
select * from myThirdReplacingMT final;
0 rows in set. Elapsed: 0.003 sec.
-- delete rows with is_deleted
OPTIMIZE TABLE myThirdReplacingMT FINAL CLEANUP;
INSERT INTO myThirdReplacingMT Values (1, 'first', '2020-01-01 00:00:00', 0);
select * from myThirdReplacingMT final;
┌─key─┬─someCol─┬───────────eventTime─┬─is_deleted─┐
│ 1 │ first │ 2020-01-01 00:00:00 │ 0 │
└─────┴─────────┴─────────────────────┴────────────┘
```
## Секции запроса

View File

@ -45,6 +45,7 @@ $ clickhouse-local --structure "table_structure" --input-format "format_of_incom
- `--logger.level` — уровень логирования.
- `--ignore-error` — не прекращать обработку если запрос выдал ошибку.
- `-c`, `--config-file` — путь к файлу конфигурации. По умолчанию `clickhouse-local` запускается с пустой конфигурацией. Конфигурационный файл имеет тот же формат, что и для сервера ClickHouse, и в нём можно использовать все конфигурационные параметры сервера. Обычно подключение конфигурации не требуется; если требуется установить отдельный параметр, то это можно сделать ключом с именем параметра.
- `--no-system-tables` — запуск без использования системных таблиц.
- `--help` — вывод справочной информации о `clickhouse-local`.
- `-V`, `--version` — вывод текущей версии и выход.

View File

@ -76,14 +76,16 @@ SELECT * FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 U
## Шаблоны поиска в компонентах пути {#globs-in-path}
При описании пути к файлу могут использоваться шаблоны поиска. Обрабатываются только те файлы, у которых путь и название соответствуют шаблону полностью (а не только префикс или суффикс).
Путь к файлу может содержать шаблоны в режиме доступа только для чтения.
Шаблоны могут содержаться в разных частях пути.
Обрабатываться будут те и только те файлы, которые существуют в файловой системе и удовлетворяют всему шаблону пути.
- `*` — заменяет любое количество любых символов кроме `/`, включая отсутствие символов.
- `?` — заменяет ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`. Эти строки также могут содержать символ `/`.
- `{N..M}` — заменяет любое число в интервале от `N` до `M` включительно (может содержать ведущие нули).
Конструкция с `{}` аналогична табличной функции [remote](remote.md).
Конструкция с `{}` аналогична табличным функциям [remote](remote.md), [hdfs](hdfs.md).
**Пример**

View File

@ -14,7 +14,7 @@ hdfs(URI, format, structure)
**Входные параметры**
- `URI` — URI файла в HDFS. Путь к файлу поддерживает следующие шаблоны в режиме доступа только для чтения `*`, `?`, `{abc,def}` и `{N..M}`, где `N`, `M` — числа, \``'abc', 'def'` — строки.
- `URI` — URI файла в HDFS.
- `format` — [формат](../../interfaces/formats.md#formats) файла.
- `structure` — структура таблицы. Формат `'column1_name column1_type, column2_name column2_type, ...'`.
@ -41,19 +41,22 @@ LIMIT 2
## Шаблоны поиска в компонентах пути {#globs-in-path}
- `*` — Заменяет любое количество любых символов кроме `/`, включая отсутствие символов.
Путь к файлу может содержать шаблоны в режиме доступа только для чтения.
Шаблоны могут содержаться в разных частях пути.
Обрабатываться будут те и только те файлы, которые существуют в файловой системе и удовлетворяют всему шаблону пути.
- `*` — Заменяет любое количество любых символов (кроме `/`), включая отсутствие символов.
- `?` — Заменяет ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — Заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`. Эти строки также могут содержать символ `/`.
- `{N..M}` — Заменяет любое число в интервале от `N` до `M` включительно (может содержать ведущие нули).
Конструкция с `{}` аналогична табличной функции [remote](remote.md).
Конструкция с `{}` аналогична табличной функции [remote](remote.md), [file](file.md).
:::danger Предупреждение
Если ваш список файлов содержит интервал с ведущими нулями, используйте конструкцию с фигурными скобками для каждой цифры по отдельности или используйте `?`.
Если ваш список файлов содержит интервал с ведущими нулями, используйте отдельную конструкцию с фигурными скобками для каждой цифры или используйте `?`.
:::
Шаблоны могут содержаться в разных частях пути. Обрабатываться будут ровно те файлы, которые и удовлетворяют всему шаблону пути, и существуют в файловой системе.
## Виртуальные столбцы {#virtualnye-stolbtsy}
- `_path` — Путь к файлу.

View File

@ -364,6 +364,9 @@ WHERE 子句中的条件可以包含对某列数据进行运算的函数表达
| [greaterOrEquals (\>=)](../../../sql-reference/functions/comparison-functions.md#greaterorequals) | ✔ | ✔ | ✗ | ✗ | ✗ |
| [empty](../../../sql-reference/functions/array-functions.md#function-empty) | ✔ | ✔ | ✗ | ✗ | ✗ |
| [notEmpty](../../../sql-reference/functions/array-functions.md#function-notempty) | ✔ | ✔ | ✗ | ✗ | ✗ |
| [has](../../../sql-reference/functions/array-functions.md#function-has) | ✗ | ✗ | ✔ | ✔ | ✔ | ✔ |
| [hasAny](../../../sql-reference/functions/array-functions.md#function-hasAny) | ✗ | ✗ | ✔ | ✔ | ✔ | ✗ |
| [hasAll](../../../sql-reference/functions/array-functions.md#function-hasAll) | ✗ | ✗ | ✗ | ✗ | ✔ | ✗ |
| hasToken | ✗ | ✗ | ✗ | ✔ | ✗ |
常量参数小于 ngram 大小的函数不能使用 `ngrambf_v1` 进行查询优化。

View File

@ -45,6 +45,7 @@ clickhouse-local --structure "table_structure" --input-format "format_of_incomin
- `--logger.level` — 日志级别。
- `--ignore-error` — 当查询失败时,不停止处理。
- `-c`, `--config-file` — 与ClickHouse服务器格式相同配置文件的路径默认情况下配置为空。
- `--no-system-tables` — 不附加系统表。
- `--help``clickhouse-local`使用帮助信息。
- `-V`, `--version` — 打印版本信息并退出。

View File

@ -115,6 +115,7 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/LocalDirectorySyncGuard.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/TemporaryFileOnDisk.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/loadLocalDiskConfig.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/DiskType.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/IObjectStorage.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp

View File

@ -744,7 +744,7 @@ void LocalServer::processConfig()
LOG_DEBUG(log, "Loading metadata from {}", path);
auto startup_system_tasks = loadMetadataSystem(global_context);
attachSystemTablesLocal</* lazy= */ true>(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
waitLoad(TablesLoaderForegroundPoolId, startup_system_tasks);
@ -761,9 +761,9 @@ void LocalServer::processConfig()
LOG_DEBUG(log, "Loaded metadata.");
}
else
else if (!config().has("no-system-tables"))
{
attachSystemTablesLocal</* lazy= */ true>(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
}
@ -842,6 +842,7 @@ void LocalServer::addOptions(OptionsDescription & options_description)
("logger.log", po::value<std::string>(), "Log file name")
("logger.level", po::value<std::string>(), "Log level")
("no-system-tables", "do not attach system tables (better startup time)")
("path", po::value<std::string>(), "Storage path")
("only-system-tables", "attach only system tables from specified path")
("top_level_domains_path", po::value<std::string>(), "Path to lists with custom TLDs")
@ -870,6 +871,8 @@ void LocalServer::processOptions(const OptionsDescription &, const CommandLineOp
config().setString("table-file", options["file"].as<std::string>());
if (options.count("structure"))
config().setString("table-structure", options["structure"].as<std::string>());
if (options.count("no-system-tables"))
config().setBool("no-system-tables", true);
if (options.count("only-system-tables"))
config().setBool("only-system-tables", true);
if (options.count("database"))

View File

@ -45,6 +45,7 @@
#include <Common/makeSocketAddress.h>
#include <Common/FailPoint.h>
#include <Server/waitServersToFinish.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Core/ServerUUID.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
@ -1450,8 +1451,6 @@ try
global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config);
global_context->reloadQueryMaskingRulesIfChanged(config);
std::lock_guard lock(servers_lock);
updateServers(*config, server_pool, async_metrics, servers, servers_to_start_before_tables);
}
@ -1472,6 +1471,8 @@ try
#endif
NamedCollectionUtils::reloadFromConfig(*config);
FileCacheFactory::instance().updateSettingsFromConfig(*config);
ProfileEvents::increment(ProfileEvents::MainConfigLoads);
/// Must be the last.

View File

@ -0,0 +1 @@
../../../tests/config/config.d/graphite_alternative.xml

View File

@ -455,6 +455,7 @@
<div id="username-password">
<input spellcheck="false" id="user" type="text" value="" placeholder="user" />
<input spellcheck="false" id="password" type="password" placeholder="password" />
<input id="hidden-submit" type="submit" hidden="true"/>
</div>
</div>
<div id="button-options">
@ -720,7 +721,7 @@ function insertChart(i) {
query_editor_confirm.addEventListener('click', editConfirm);
/// Ctrl+Enter (or Cmd+Enter on Mac) will also confirm editing.
query_editor.addEventListener('keydown', e => {
query_editor.addEventListener('keydown', event => {
if ((event.metaKey || event.ctrlKey) && (event.keyCode == 13 || event.keyCode == 10)) {
editConfirm();
}
@ -895,7 +896,7 @@ document.getElementById('add').addEventListener('click', e => {
});
document.getElementById('reload').addEventListener('click', e => {
reloadAll(false);
reloadAll(queries.length == 0);
});
document.getElementById('search').addEventListener('click', e => {
@ -1291,6 +1292,7 @@ async function drawAll() {
document.getElementById('add').style.display = 'inline-block';
document.getElementById('edit').style.display = 'inline-block';
document.getElementById('search-span').style.display = '';
hideError();
}
else {
const charts = document.getElementById('charts')
@ -1317,9 +1319,11 @@ function disableButtons() {
reloadButton.classList.add('disabled');
const runButton = document.getElementById('run');
runButton.value = 'Reloading…';
runButton.disabled = true;
runButton.classList.add('disabled');
if (runButton) {
runButton.value = 'Reloading…';
runButton.disabled = true;
runButton.classList.add('disabled');
}
const searchButton = document.getElementById('search');
searchButton.value = '…';
@ -1334,9 +1338,11 @@ function enableButtons() {
reloadButton.classList.remove('disabled');
const runButton = document.getElementById('run');
runButton.value = 'Ok';
runButton.disabled = false;
runButton.classList.remove('disabled');
if (runButton) {
runButton.value = 'Ok';
runButton.disabled = false;
runButton.classList.remove('disabled');
}
const searchButton = document.getElementById('search');
searchButton.value = '🔎';
@ -1359,14 +1365,17 @@ async function reloadAll(do_search) {
}
await drawAll();
} catch (e) {
showError(e.toString());
showError(e.message);
}
enableButtons();
}
document.getElementById('params').onsubmit = function(event) {
let do_search = document.activeElement === document.getElementById('search-query');
reloadAll(do_search);
if (document.activeElement === document.getElementById('search-query')) {
reloadAll(true);
} else {
reloadAll(queries.length == 0);
}
event.preventDefault();
}
@ -1405,13 +1414,15 @@ function refreshCustomized(value) {
document.getElementById('search-span').style.opacity = customized ? 0.5 : 1.0;
}
function regenerate() {
function updateFromState() {
document.getElementById('url').value = host;
document.getElementById('user').value = user;
document.getElementById('password').value = password;
document.getElementById('search-query').value = search_query;
refreshCustomized();
}
function regenerate() {
findParamsInQueries();
buildParams();
@ -1430,7 +1441,7 @@ function regenerate() {
window.onpopstate = function(event) {
if (!event.state) { return; }
({host, user, queries, params, search_query, customized} = event.state);
updateFromState();
regenerate();
drawAll();
};
@ -1447,6 +1458,7 @@ if (window.location.hash) {
async function start() {
try {
updateFromState();
if (queries.length == 0) {
await searchQueries();
} else {
@ -1460,7 +1472,7 @@ async function start() {
drawAll();
}
} catch (e) {
showError(e.toString());
showError(e.message);
}
}

View File

@ -1,24 +1,25 @@
extern crate blake3;
extern crate libc;
use std::ffi::{CStr, CString};
use std::ffi::{CString};
use std::slice;
use std::os::raw::c_char;
#[no_mangle]
pub unsafe extern "C" fn blake3_apply_shim(
begin: *const c_char,
_size: u32,
size: u32,
out_char_data: *mut u8,
) -> *mut c_char {
if begin.is_null() {
let err_str = CString::new("input was a null pointer").unwrap();
return err_str.into_raw();
}
let input_res = slice::from_raw_parts(begin as *const u8, size as usize);
let mut hasher = blake3::Hasher::new();
let input_bytes = CStr::from_ptr(begin);
let input_res = input_bytes.to_bytes();
hasher.update(input_res);
let mut reader = hasher.finalize_xof();
reader.fill(std::slice::from_raw_parts_mut(out_char_data, blake3::OUT_LEN));
std::ptr::null_mut()
}

View File

@ -183,6 +183,7 @@ enum class AccessType
M(SYSTEM_REPLICATION_QUEUES, "SYSTEM STOP REPLICATION QUEUES, SYSTEM START REPLICATION QUEUES, STOP REPLICATION QUEUES, START REPLICATION QUEUES", TABLE, SYSTEM) \
M(SYSTEM_DROP_REPLICA, "DROP REPLICA", TABLE, SYSTEM) \
M(SYSTEM_SYNC_REPLICA, "SYNC REPLICA", TABLE, SYSTEM) \
M(SYSTEM_REPLICA_READINESS, "SYSTEM REPLICA READY, SYSTEM REPLICA UNREADY", GLOBAL, SYSTEM) \
M(SYSTEM_RESTART_REPLICA, "RESTART REPLICA", TABLE, SYSTEM) \
M(SYSTEM_RESTORE_REPLICA, "RESTORE REPLICA", TABLE, SYSTEM) \
M(SYSTEM_WAIT_LOADING_PARTS, "WAIT LOADING PARTS", TABLE, SYSTEM) \

View File

@ -436,6 +436,10 @@ dbms_target_link_libraries(PRIVATE ch_contrib::zstd)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::zstd)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::xz)
if (TARGET ch_contrib::pocketfft)
target_link_libraries(clickhouse_common_io PUBLIC ch_contrib::pocketfft)
endif ()
if (TARGET ch_contrib::icu)
dbms_target_link_libraries (PRIVATE ch_contrib::icu)
endif ()

View File

@ -362,7 +362,7 @@ public:
bool is_executing = false;
};
// For introspection and debug only, see `system.async_loader` table.
// For introspection and debug only, see `system.asynchronous_loader` table.
std::vector<JobState> getJobStates() const;
// For deadlock resolution. Should not be used directly.

View File

@ -519,8 +519,9 @@ void ConfigProcessor::doIncludesRecursive(
if (attr_nodes["from_zk"]) /// we have zookeeper subst
{
if (node->hasChildNodes()) /// only allow substitution for nodes with no value
throw Poco::Exception("Element <" + node->nodeName() + "> has value, can't process from_zk substitution");
/// only allow substitution for nodes with no value and without "replace"
if (node->hasChildNodes() && !replace)
throw Poco::Exception("Element <" + node->nodeName() + "> has value and does not have 'replace' attribute, can't process from_zk substitution");
contributing_zk_paths.insert(attr_nodes["from_zk"]->getNodeValue());
@ -544,8 +545,9 @@ void ConfigProcessor::doIncludesRecursive(
if (attr_nodes["from_env"]) /// we have env subst
{
if (node->hasChildNodes()) /// only allow substitution for nodes with no value
throw Poco::Exception("Element <" + node->nodeName() + "> has value, can't process from_env substitution");
/// only allow substitution for nodes with no value and without "replace"
if (node->hasChildNodes() && !replace)
throw Poco::Exception("Element <" + node->nodeName() + "> has value and does not have 'replace' attribute, can't process from_env substitution");
XMLDocumentPtr env_document;
auto get_env_node = [&](const std::string & name) -> const Node *

View File

@ -260,6 +260,7 @@
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M)
#endif
namespace CurrentMetrics
{
#define M(NAME, DOCUMENTATION) extern const Metric NAME = Metric(__COUNTER__);

View File

@ -451,6 +451,7 @@ The server successfully detected this situation and will download merged part fr
M(ThreadpoolReaderSubmitReadSynchronously, "How many times we haven't scheduled a task on the thread pool and read synchronously instead") \
M(ThreadpoolReaderSubmitReadSynchronouslyBytes, "How many bytes were read synchronously") \
M(ThreadpoolReaderSubmitReadSynchronouslyMicroseconds, "How much time we spent reading synchronously") \
M(ThreadpoolReaderSubmitLookupInCacheMicroseconds, "How much time we spent checking if content is cached") \
M(AsynchronousReaderIgnoredBytes, "Number of bytes ignored during asynchronous reading") \
\
M(FileSegmentWaitReadBufferMicroseconds, "Metric per file segment. Time spend waiting for internal read buffer (includes cache waiting)") \

View File

@ -1,6 +1,5 @@
#include "SensitiveDataMasker.h"
#include <mutex>
#include <set>
#include <string>
#include <atomic>
@ -95,28 +94,20 @@ public:
SensitiveDataMasker::~SensitiveDataMasker() = default;
std::unique_ptr<SensitiveDataMasker> SensitiveDataMasker::sensitive_data_masker = nullptr;
std::mutex SensitiveDataMasker::instance_mutex;
void SensitiveDataMasker::setInstance(std::unique_ptr<SensitiveDataMasker> sensitive_data_masker_)
{
if (!sensitive_data_masker_)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: the 'sensitive_data_masker' is not set");
std::lock_guard lock(instance_mutex);
if (sensitive_data_masker_->rulesCount() > 0)
{
sensitive_data_masker = std::move(sensitive_data_masker_);
}
else
{
sensitive_data_masker.reset();
}
}
SensitiveDataMasker * SensitiveDataMasker::getInstance()
{
std::lock_guard lock(instance_mutex);
return sensitive_data_masker.get();
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <memory>
#include <mutex>
#include <vector>
#include <cstdint>
@ -46,7 +45,6 @@ class SensitiveDataMasker
private:
class MaskingRule;
std::vector<std::unique_ptr<MaskingRule>> all_masking_rules;
static std::mutex instance_mutex;
static std::unique_ptr<SensitiveDataMasker> sensitive_data_masker;
public:

View File

@ -99,6 +99,7 @@ struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest
struct TestKeeperGetRequest final : GetRequest, TestKeeperRequest
{
TestKeeperGetRequest() = default;
explicit TestKeeperGetRequest(const GetRequest & base) : GetRequest(base) {}
ResponsePtr createResponse() const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
@ -118,6 +119,8 @@ struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest
struct TestKeeperListRequest : ListRequest, TestKeeperRequest
{
TestKeeperListRequest() = default;
explicit TestKeeperListRequest(const ListRequest & base) : ListRequest(base) {}
ResponsePtr createResponse() const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
@ -176,6 +179,14 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
{
requests.push_back(std::make_shared<TestKeeperCheckRequest>(*concrete_request_check));
}
else if (const auto * concrete_request_get = dynamic_cast<const GetRequest *>(generic_request.get()))
{
requests.push_back(std::make_shared<TestKeeperGetRequest>(*concrete_request_get));
}
else if (const auto * concrete_request_list = dynamic_cast<const ListRequest *>(generic_request.get()))
{
requests.push_back(std::make_shared<TestKeeperListRequest>(*concrete_request_list));
}
else
throw Exception::fromMessage(Error::ZBADARGUMENTS, "Illegal command as part of multi ZooKeeper request");
}

View File

@ -497,6 +497,17 @@ bool ZooKeeper::exists(const std::string & path, Coordination::Stat * stat, cons
return existsWatch(path, stat, callbackForEvent(watch));
}
bool ZooKeeper::anyExists(const std::vector<std::string> & paths)
{
auto exists_multi_response = exists(paths);
for (size_t i = 0; i < exists_multi_response.size(); ++i)
{
if (exists_multi_response[i].error == Coordination::Error::ZOK)
return true;
}
return false;
}
bool ZooKeeper::existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{
Coordination::Error code = existsImpl(path, stat, watch_callback);

View File

@ -286,6 +286,8 @@ public:
return exists(paths.begin(), paths.end());
}
bool anyExists(const std::vector<std::string> & paths);
std::string get(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
std::string getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
@ -422,8 +424,9 @@ public:
/// Performs several operations in a transaction.
/// Throws on every error.
Coordination::Responses multi(const Coordination::Requests & requests);
/// Throws only if some operation has returned an "unexpected" error
/// - an error that would cause the corresponding try- method to throw.
/// Throws only if some operation has returned an "unexpected" error - an error that would cause
/// the corresponding try- method to throw.
/// On exception, `responses` may or may not be populated.
Coordination::Error tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses);
/// Throws nothing (even session expired errors)
Coordination::Error tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses);
@ -567,8 +570,11 @@ public:
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
UInt32 getSessionUptime() const { return static_cast<UInt32>(session_uptime.elapsedSeconds()); }
bool hasReachedDeadline() const { return impl->hasReachedDeadline(); }
uint64_t getSessionTimeoutMS() const { return args.session_timeout_ms; }
void setServerCompletelyStarted();
Int8 getConnectedHostIdx() const;

View File

@ -62,6 +62,7 @@
#cmakedefine01 FIU_ENABLE
#cmakedefine01 USE_BCRYPT
#cmakedefine01 USE_LIBARCHIVE
#cmakedefine01 USE_POCKETFFT
/// This is needed for .incbin in assembly. For some reason, include paths don't work there in presence of LTO.
/// That's why we use absolute paths.

View File

@ -123,7 +123,7 @@ namespace Format
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot switch from automatic field numbering to manual field specification");
is_plain_numbering = true;
if (index_if_plain >= argument_number)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument is too big for formatting");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not enough arguments to fill the placeholders in the format string");
index_positions.back() = index_if_plain++;
}
else

View File

@ -902,66 +902,6 @@ TEST(AsyncLoader, SetMaxThreads)
t.loader.wait();
}
TEST(AsyncLoader, DynamicPools)
{
const size_t max_threads[] { 2, 10 };
const int jobs_in_chain = 16;
AsyncLoaderTest t({
{.max_threads = max_threads[0], .priority{0}},
{.max_threads = max_threads[1], .priority{-1}},
});
t.loader.start();
std::atomic<size_t> executing[2] { 0, 0 }; // Number of currently executing jobs per pool
for (int concurrency = 1; concurrency <= 12; concurrency++)
{
std::atomic<bool> boosted{false}; // Visible concurrency was increased
std::atomic<int> left{concurrency * jobs_in_chain / 2}; // Number of jobs to start before `prioritize()` call
std::shared_mutex prioritization_mutex; // To slow down job execution during prioritization to avoid race condition
LoadJobSet jobs_to_prioritize;
auto job_func = [&] (AsyncLoader & loader, const LoadJobPtr & self)
{
auto pool_id = self->executionPool();
executing[pool_id]++;
if (executing[pool_id] > max_threads[0])
boosted = true;
ASSERT_LE(executing[pool_id], max_threads[pool_id]);
// Dynamic prioritization
if (--left == 0)
{
std::unique_lock lock{prioritization_mutex};
for (const auto & job : jobs_to_prioritize)
loader.prioritize(job, 1);
}
std::shared_lock lock{prioritization_mutex};
t.randomSleepUs(100, 200, 100);
ASSERT_LE(executing[pool_id], max_threads[pool_id]);
executing[pool_id]--;
};
std::vector<LoadTaskPtr> tasks;
tasks.reserve(concurrency);
for (int i = 0; i < concurrency; i++)
tasks.push_back(makeLoadTask(t.loader, t.chainJobSet(jobs_in_chain, job_func, fmt::format("c{}-j", i))));
jobs_to_prioritize = getGoals(tasks); // All jobs
scheduleLoad(tasks);
waitLoad(tasks);
ASSERT_EQ(executing[0], 0);
ASSERT_EQ(executing[1], 0);
ASSERT_EQ(boosted, concurrency > 2);
boosted = false;
}
}
TEST(AsyncLoader, SubJobs)
{
AsyncLoaderTest t(1);
@ -1000,7 +940,7 @@ TEST(AsyncLoader, SubJobs)
std::atomic<int> jobs_left;
// It is a good practice to keep load task inside the component:
// 1) to make sure it outlives its load jobs;
// 2) to avoid removing load jobs from `system.async_loader` while we use the component
// 2) to avoid removing load jobs from `system.asynchronous_loader` while we use the component
LoadTaskPtr load_task;
};
@ -1070,7 +1010,7 @@ TEST(AsyncLoader, RecursiveJob)
std::atomic<int> jobs_left;
// It is a good practice to keep load task inside the component:
// 1) to make sure it outlives its load jobs;
// 2) to avoid removing load jobs from `system.async_loader` while we use the component
// 2) to avoid removing load jobs from `system.asynchronous_loader` while we use the component
LoadTaskPtr load_task;
};

View File

@ -208,6 +208,9 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
return;
}
/// To avoid reference to binding
const auto & snapshot_path_ref = snapshot_path;
SCOPE_EXIT(
{
LOG_INFO(log, "Removing lock file");
@ -223,7 +226,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
}
catch (...)
{
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_file_info.path);
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_path_ref);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});

View File

@ -35,6 +35,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNSUPPORTED_METHOD;
}
struct ContextSharedPart : boost::noncopyable
@ -376,4 +377,9 @@ void Context::updateKeeperConfiguration([[maybe_unused]] const Poco::Util::Abstr
shared->keeper_dispatcher->updateConfiguration(getConfigRef(), getMacros());
}
std::shared_ptr<zkutil::ZooKeeper> Context::getZooKeeper() const
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot connect to ZooKeeper from Keeper");
}
}

View File

@ -21,6 +21,11 @@
#include <memory>
#include "config.h"
namespace zkutil
{
class ZooKeeper;
using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
}
namespace DB
{
@ -153,6 +158,8 @@ public:
void initializeKeeperDispatcher(bool start_async) const;
void shutdownKeeperDispatcher() const;
void updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config);
zkutil::ZooKeeperPtr getZooKeeper() const;
};
}

View File

@ -90,7 +90,8 @@ void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfigura
for (const auto & setting : settings.all())
{
const auto & name = setting.getName();
if (config.has(name) && !setting.isObsolete())
bool should_skip_check = name == "max_table_size_to_drop" || name == "max_partition_size_to_drop";
if (config.has(name) && !setting.isObsolete() && !should_skip_check)
{
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "A setting '{}' appeared at top level in config {}."
" But it is user-level setting that should be located in users.xml inside <profiles> section for specific profile."

View File

@ -186,7 +186,7 @@ class IColumn;
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \
\
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard.", 0) \
\
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \
M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed queries (shards will process query up to the Complete stage, initiator just proxies the data from the shards). If 2 the initiator will apply ORDER BY and LIMIT stages (it is not in case when shard process query up to the Complete stage)", 0) \
@ -527,6 +527,8 @@ class IColumn;
M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited.", 0) \
M(Bool, check_query_single_value_result, true, "Return check query result as single 1/0 value", 0) \
M(Bool, allow_drop_detached, false, "Allow ALTER TABLE ... DROP DETACHED PART[ITION] ... queries", 0) \
M(UInt64, max_table_size_to_drop, 0, "Only available in ClickHouse Cloud", 0) \
M(UInt64, max_partition_size_to_drop, 0, "Only available in ClickHouse Cloud", 0) \
\
M(UInt64, postgresql_connection_pool_size, 16, "Connection pool size for PostgreSQL table engine and database engine.", 0) \
M(UInt64, postgresql_connection_pool_wait_timeout, 5000, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \
@ -620,6 +622,7 @@ class IColumn;
M(Bool, describe_include_subcolumns, false, "If true, subcolumns of all table columns will be included into result of DESCRIBE query", 0) \
M(Bool, describe_include_virtual_columns, false, "If true, virtual columns of table will be included into result of DESCRIBE query", 0) \
M(Bool, describe_compact_output, false, "If true, include only column names and types into result of DESCRIBE query", 0) \
M(Bool, apply_mutations_on_fly, false, "Only available in ClickHouse Cloud", 0) \
M(Bool, mutations_execute_nondeterministic_on_initiator, false, "If true nondeterministic function are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(Bool, mutations_execute_subqueries_on_initiator, false, "If true scalar subqueries are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(UInt64, mutations_max_literal_size_to_replace, 16384, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \
@ -671,6 +674,8 @@ class IColumn;
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(Bool, database_replicated_allow_only_replicated_engine, false, "Allow to create only Replicated tables in database with engine Replicated", 0) \
M(Bool, database_replicated_allow_replicated_engine_arguments, true, "Allow to create only Replicated tables in database with engine Replicated with explicit arguments", 0) \
M(Bool, cloud_mode, false, "Only available in ClickHouse Cloud", 0) \
M(UInt64, cloud_mode_engine, 1, "Only available in ClickHouse Cloud", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result, one of: 'none', 'throw', 'null_status_on_timeout', 'never_throw'", 0) \
M(UInt64, distributed_ddl_entry_format_version, 5, "Compatibility version of distributed DDL (ON CLUSTER) queries", 0) \
\
@ -724,6 +729,7 @@ class IColumn;
M(UInt64, merge_tree_min_bytes_per_task_for_remote_reading, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes to read per task.", 0) \
M(Bool, merge_tree_use_const_size_tasks_for_remote_reading, true, "Whether to use constant size tasks for reading from a remote table.", 0) \
M(Bool, merge_tree_determine_task_size_by_prewhere_columns, true, "Whether to use only prewhere columns size to determine reading task size.", 0) \
M(UInt64, merge_tree_compact_parts_min_granules_to_multibuffer_read, 16, "Only available in ClickHouse Cloud", 0) \
\
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
@ -835,6 +841,10 @@ class IColumn;
M(Bool, print_pretty_type_names, false, "Print pretty type names in DESCRIBE query and toTypeName() function", 0) \
M(Bool, create_table_empty_primary_key_by_default, false, "Allow to create *MergeTree tables with empty primary key when ORDER BY and PRIMARY KEY not specified", 0) \
M(Bool, allow_named_collection_override_by_default, true, "Allow named collections' fields override by default.", 0)\
M(Bool, allow_experimental_shared_merge_tree, false, "Only available in ClickHouse Cloud", 0) \
M(UInt64, cache_warmer_threads, 4, "Only available in ClickHouse Cloud", 0) \
M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \
M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS.

View File

@ -98,8 +98,6 @@ IMPLEMENT_SETTING_AUTO_ENUM(DefaultDatabaseEngine, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_AUTO_ENUM(DefaultTableEngine, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_AUTO_ENUM(CleanDeletedRows, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_MULTI_ENUM(MySQLDataTypesSupport, ErrorCodes::UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL,
{{"decimal", MySQLDataTypesSupport::DECIMAL},
{"datetime64", MySQLDataTypesSupport::DATETIME64},

View File

@ -133,19 +133,13 @@ enum class DefaultTableEngine
ReplacingMergeTree,
ReplicatedMergeTree,
ReplicatedReplacingMergeTree,
SharedMergeTree,
SharedReplacingMergeTree,
Memory,
};
DECLARE_SETTING_ENUM(DefaultTableEngine)
enum class CleanDeletedRows
{
Never = 0, /// Disable.
Always,
};
DECLARE_SETTING_ENUM(CleanDeletedRows)
enum class MySQLDataTypesSupport
{
DECIMAL, // convert MySQL's decimal and number to ClickHouse Decimal when applicable

View File

@ -89,14 +89,15 @@ void DatabaseAtomic::drop(ContextPtr)
fs::remove_all(getMetadataPath());
}
void DatabaseAtomic::attachTableUnlocked(ContextPtr local_context, const String & name, const StoragePtr & table, const String & relative_table_path)
void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name, const StoragePtr & table, const String & relative_table_path)
{
assert(relative_table_path != data_path && !relative_table_path.empty());
DetachedTables not_in_use;
std::lock_guard lock(mutex);
not_in_use = cleanupDetachedTables();
auto table_id = table->getStorageID();
assertDetachedTableNotInUse(table_id.uuid);
DatabaseOrdinary::attachTableUnlocked(local_context, name, table, relative_table_path);
DatabaseOrdinary::attachTableUnlocked(name, table);
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
}
@ -324,7 +325,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
/// It throws if `table_metadata_path` already exists (it's possible if table was detached)
renameNoReplace(table_metadata_tmp_path, table_metadata_path); /// Commit point (a sort of)
DatabaseWithOwnTablesBase::attachTableUnlocked(query_context, query.getTable(), table, /*relative_table_path=*/ {}); /// Should never throw
attachTableUnlocked(query.getTable(), table); /// Should never throw
table_name_to_path.emplace(query.getTable(), table_data_path);
}
catch (...)

View File

@ -38,6 +38,7 @@ public:
void dropTable(ContextPtr context, const String & table_name, bool sync) override;
void dropTableImpl(ContextPtr context, const String & table_name, bool sync);
void attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & name) override;
String getTableDataPath(const String & table_name) const override;
@ -65,8 +66,6 @@ public:
void setDetachedTableNotInUseForce(const UUID & uuid) override;
protected:
void attachTableUnlocked(ContextPtr local_context, const String & name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
void commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path, const String & statement, ContextPtr query_context) override;
void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
const String & table_metadata_tmp_path, const String & table_metadata_path, ContextPtr query_context) override;

View File

@ -168,9 +168,10 @@ bool DatabaseLazy::empty() const
return tables_cache.empty();
}
void DatabaseLazy::attachTableUnlocked(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
void DatabaseLazy::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
{
LOG_DEBUG(log, "Attach table {}.", backQuote(table_name));
std::lock_guard lock(mutex);
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
auto [it, inserted] = tables_cache.emplace(std::piecewise_construct,

View File

@ -64,15 +64,14 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void shutdown() override;
~DatabaseLazy() override;
protected:
void attachTableUnlocked(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
private:
struct CacheExpirationQueueElement
{

View File

@ -33,13 +33,13 @@ DatabaseMemory::DatabaseMemory(const String & name_, ContextPtr context_)
}
void DatabaseMemory::createTable(
ContextPtr local_context,
ContextPtr /*context*/,
const String & table_name,
const StoragePtr & table,
const ASTPtr & query)
{
std::lock_guard lock{mutex};
attachTableUnlocked(local_context, table_name, table, /*relative_table_path=*/ {});
attachTableUnlocked(table_name, table);
/// Clean the query from temporary flags.
ASTPtr query_to_store = query;
@ -56,7 +56,7 @@ void DatabaseMemory::createTable(
}
void DatabaseMemory::dropTable(
ContextPtr local_context,
ContextPtr /*context*/,
const String & table_name,
bool /*sync*/)
{
@ -83,7 +83,7 @@ void DatabaseMemory::dropTable(
catch (...)
{
std::lock_guard lock{mutex};
attachTableUnlocked(local_context, table_name, table, /*relative_table_path=*/ {});
attachTableUnlocked(table_name, table);
throw;
}

View File

@ -7,7 +7,6 @@
#include <Parsers/formatAST.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageFactory.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Common/escapeForFileName.h>
@ -200,7 +199,7 @@ DatabaseWithOwnTablesBase::DatabaseWithOwnTablesBase(const String & name_, const
bool DatabaseWithOwnTablesBase::isTableExist(const String & table_name, ContextPtr) const
{
std::lock_guard lock(mutex);
return tables.find(table_name) != tables.end() || lazy_tables.find(table_name) != lazy_tables.end();
return tables.find(table_name) != tables.end();
}
StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, ContextPtr) const
@ -212,9 +211,6 @@ StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, Con
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
{
std::lock_guard lock(mutex);
loadLazyTables();
if (!filter_by_table_name)
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, database_name);
@ -261,7 +257,13 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
return res;
}
void DatabaseWithOwnTablesBase::attachTableUnlocked(ContextPtr, const String & name, const StoragePtr & table, const String &)
void DatabaseWithOwnTablesBase::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
{
std::lock_guard lock(mutex);
attachTableUnlocked(table_name, table);
}
void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, const StoragePtr & table)
{
auto table_id = table->getStorageID();
if (table_id.database_name != database_name)
@ -274,7 +276,7 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(ContextPtr, const String & n
DatabaseCatalog::instance().addUUIDMapping(table_id.uuid, shared_from_this(), table);
}
if (!tables.emplace(name, table).second)
if (!tables.emplace(table_name, table).second)
{
if (table_id.hasUUID())
DatabaseCatalog::instance().removeUUIDMapping(table_id.uuid);
@ -287,12 +289,6 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(ContextPtr, const String & n
CurrentMetrics::add(CurrentMetrics::AttachedTable, 1);
}
void DatabaseWithOwnTablesBase::registerLazyTableUnlocked(const String & table_name, LazyTableCreator table_creator, const String & relative_table_path)
{
if (!lazy_tables.emplace(table_name, std::make_pair(relative_table_path, std::move(table_creator))).second)
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {} already registered.", table_name);
}
void DatabaseWithOwnTablesBase::shutdown()
{
/// You can not hold a lock during shutdown.
@ -393,45 +389,10 @@ void DatabaseWithOwnTablesBase::createTableRestoredFromBackup(const ASTPtr & cre
StoragePtr DatabaseWithOwnTablesBase::tryGetTableNoWait(const String & table_name) const
{
std::lock_guard lock(mutex);
auto it = tables.find(table_name);
if (it != tables.end())
return it->second;
const auto lazy_it = lazy_tables.find(table_name);
if (lazy_it != lazy_tables.end())
{
LOG_DEBUG(log, "Attaching lazy table {}", backQuoteIfNeed(table_name));
auto relative_table_path = lazy_it->second.first;
auto storage = lazy_it->second.second();
lazy_tables.erase(lazy_it);
(const_cast<DatabaseWithOwnTablesBase *>(this))->attachTableUnlocked(Context::getGlobalContextInstance(), table_name, storage, relative_table_path);
it = tables.find(table_name);
if (it != tables.end())
return it->second;
}
return {};
}
void DatabaseWithOwnTablesBase::loadLazyTables() const
{
if (lazy_tables.empty())
return;
ContextPtr global_context = Context::getGlobalContextInstance();
while (!lazy_tables.empty())
{
auto lazy_it = lazy_tables.begin();
const auto table_name = lazy_it->first;
LOG_DEBUG(log, "Attaching lazy table {}", backQuoteIfNeed(table_name));
auto relative_table_path = lazy_it->second.first;
auto storage = lazy_it->second.second();
lazy_tables.erase(lazy_it);
(const_cast<DatabaseWithOwnTablesBase *>(this))->attachTableUnlocked(global_context, table_name, storage, relative_table_path);
}
}
}

View File

@ -30,6 +30,8 @@ public:
bool empty() const override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
@ -43,19 +45,14 @@ public:
protected:
Tables tables TSA_GUARDED_BY(mutex);
/// Tables that are attached lazily
mutable LazyTables lazy_tables TSA_GUARDED_BY(mutex);
Poco::Logger * log;
DatabaseWithOwnTablesBase(const String & name_, const String & logger, ContextPtr context);
void attachTableUnlocked(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
void registerLazyTableUnlocked(const String & table_name, LazyTableCreator table_creator, const String & relative_table_path) TSA_REQUIRES(mutex) override;
void attachTableUnlocked(const String & table_name, const StoragePtr & table) TSA_REQUIRES(mutex);
StoragePtr detachTableUnlocked(const String & table_name) TSA_REQUIRES(mutex);
StoragePtr getTableUnlocked(const String & table_name) const TSA_REQUIRES(mutex);
StoragePtr tryGetTableNoWait(const String & table_name) const;
void loadLazyTables() const TSA_REQUIRES(mutex);
};
}

View File

@ -27,12 +27,21 @@ StoragePtr IDatabase::getTable(const String & name, ContextPtr context) const
{
if (auto storage = tryGetTable(name, context))
return storage;
TableNameHints hints(this->shared_from_this(), context);
std::vector<String> names = hints.getHints(name);
if (names.empty())
/// hint is a pair which holds a single database_name and table_name suggestion for the given table name.
auto hint = hints.getHintForTable(name);
if (hint.first.empty())
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} does not exist", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name));
else
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} does not exist. Maybe you meant {}?", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name), backQuoteIfNeed(names[0]));
throw Exception(
ErrorCodes::UNKNOWN_TABLE,
"Table {}.{} does not exist. Maybe you meant {}.{}?",
backQuoteIfNeed(getDatabaseName()),
backQuoteIfNeed(name),
backQuoteIfNeed(hint.first),
backQuoteIfNeed(hint.second));
}
IDatabase::IDatabase(String database_name_) : database_name(std::move(database_name_))
@ -62,20 +71,4 @@ void IDatabase::createTableRestoredFromBackup(const ASTPtr & create_table_query,
backQuoteIfNeed(create_table_query->as<const ASTCreateQuery &>().getTable()));
}
/// Add a table to the database, but do not add it to the metadata. The database may not support this method.
///
/// Note: ATTACH TABLE statement actually uses createTable method.
void IDatabase::attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) /// NOLINT
{
std::lock_guard lock(mutex);
attachTableUnlocked(context, name, table, relative_table_path);
}
void IDatabase::registerLazyTable(ContextPtr, const String & table_name, LazyTableCreator table_creator, const String & relative_table_path) /// NOLINT
{
std::lock_guard lock(mutex);
registerLazyTableUnlocked(table_name, std::move(table_creator), relative_table_path);
}
}

View File

@ -125,6 +125,7 @@ public:
using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
/** Database engine.
* It is responsible for:
* - initialization of set of known tables and dictionaries;
@ -137,10 +138,6 @@ using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
class IDatabase : public std::enable_shared_from_this<IDatabase>
{
public:
using LazyTableCreator = std::function<StoragePtr()>;
/// Map{table_name, Pair{relative_table_path, LazyTableCreator}}
using LazyTables = std::map<String, std::pair<String, LazyTableCreator>>;
IDatabase() = delete;
explicit IDatabase(String database_name_);
@ -272,17 +269,11 @@ public:
/// Add a table to the database, but do not add it to the metadata. The database may not support this method.
///
/// @param relative_table_path - only for Atomic engine
///
/// Note:
/// - ATTACH TABLE statement actually uses createTable method.
/// - Instead of overriding this method you should override attachTableUnlocked()
/// (This method is only for DatabasesOverlay to override)
virtual void attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path = {}); /// NOLINT
/// Register tables lazily (attach will be done only when the table will be used) instead of attaching it.
/// This is needed to improve startup time of clickhouse-local.
virtual void registerLazyTable(ContextPtr context, const String & table_name, LazyTableCreator table_creator, const String & relative_table_path = {});
/// Note: ATTACH TABLE statement actually uses createTable method.
virtual void attachTable(ContextPtr /* context */, const String & /*name*/, const StoragePtr & /*table*/, [[maybe_unused]] const String & relative_table_path = {}) /// NOLINT
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no ATTACH TABLE query for Database{}", getEngineName());
}
/// Forget about the table without deleting it, and return it. The database may not support this method.
virtual StoragePtr detachTable(ContextPtr /* context */, const String & /*name*/)
@ -439,16 +430,6 @@ protected:
return nullptr;
}
virtual void attachTableUnlocked(ContextPtr /*context*/, const String & /*name*/, const StoragePtr & /*table*/, const String & /*relative_table_path*/ = {}) TSA_REQUIRES(mutex) /// NOLINT
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no ATTACH TABLE query for Database{}", getEngineName());
}
virtual void registerLazyTableUnlocked(const String & /* table_name */, LazyTableCreator /* table_creator */, const String & /* relative_table_path */) TSA_REQUIRES(mutex) /// NOLINT
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There lazy table initialization support for Database{}", getEngineName());
}
mutable std::mutex mutex;
String database_name TSA_GUARDED_BY(mutex);
String comment TSA_GUARDED_BY(mutex);

View File

@ -101,10 +101,10 @@ void DatabaseMaterializedMySQL::dropTable(ContextPtr context_, const String & na
DatabaseAtomic::dropTable(context_, name, sync);
}
void DatabaseMaterializedMySQL::attachTableUnlocked(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path)
void DatabaseMaterializedMySQL::attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path)
{
checkIsInternalQuery(context_, "ATTACH TABLE");
DatabaseAtomic::attachTableUnlocked(context_, name, table, relative_table_path);
DatabaseAtomic::attachTable(context_, name, table, relative_table_path);
}
StoragePtr DatabaseMaterializedMySQL::detachTable(ContextPtr context_, const String & name)

View File

@ -48,8 +48,6 @@ protected:
LoadTaskPtr startup_mysql_database_task;
void attachTableUnlocked(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
public:
String getEngineName() const override { return "MaterializedMySQL"; }
@ -60,6 +58,8 @@ public:
void dropTable(ContextPtr context_, const String & name, bool sync) override;
void attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context_, const String & name) override;
void renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary) override;

View File

@ -361,8 +361,10 @@ void DatabaseMySQL::cleanOutdatedTables()
}
}
void DatabaseMySQL::attachTableUnlocked(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
void DatabaseMySQL::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
{
std::lock_guard lock{mutex};
if (!local_tables_cache.contains(table_name))
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Cannot attach table {}.{} because it does not exist.",
backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));

View File

@ -84,9 +84,9 @@ public:
void dropTable(ContextPtr context, const String & table_name, bool sync) override;
protected:
void attachTableUnlocked(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) TSA_REQUIRES(mutex) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
protected:
ASTPtr getCreateTableQueryImpl(const String & name, ContextPtr context, bool throw_on_error) const override;
private:

View File

@ -216,8 +216,10 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr
}
void DatabasePostgreSQL::attachTableUnlocked(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
void DatabasePostgreSQL::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
{
std::lock_guard lock{mutex};
if (!checkPostgresTable(table_name))
throw Exception(ErrorCodes::UNKNOWN_TABLE,
"Cannot attach PostgreSQL table {} because it does not exist in PostgreSQL (database: {})",

View File

@ -54,14 +54,13 @@ public:
void createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void dropTable(ContextPtr, const String & table_name, bool sync) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void drop(ContextPtr /*context*/) override;
void shutdown() override;
protected:
void attachTableUnlocked(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) TSA_REQUIRES(mutex) override;
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override;
private:

View File

@ -69,12 +69,6 @@ DictionaryPtr DictionaryFactory::create(
layout_type);
}
DictionaryPtr DictionaryFactory::create(const std::string & name, const ASTCreateQuery & ast, ContextPtr global_context) const
{
auto configuration = getDictionaryConfigurationFromAST(ast, global_context);
return DictionaryFactory::create(name, *configuration, "dictionary", global_context, true);
}
bool DictionaryFactory::isComplex(const std::string & layout_type) const
{
auto it = registered_layouts.find(layout_type);

View File

@ -39,11 +39,6 @@ public:
ContextPtr global_context,
bool created_from_ddl) const;
/// Create dictionary from DDL-query
DictionaryPtr create(const std::string & name,
const ASTCreateQuery & ast,
ContextPtr global_context) const;
using LayoutCreateFunction = std::function<DictionaryPtr(
const std::string & name,
const DictionaryStructure & dict_struct,

View File

@ -177,22 +177,13 @@ CachedOnDiskReadBufferFromFile::getCacheReadBuffer(const FileSegment & file_segm
}
ReadSettings local_read_settings{settings};
local_read_settings.local_fs_prefetch = false;
if (local_read_settings.local_fs_method != LocalFSReadMethod::pread_threadpool)
local_read_settings.local_fs_method = LocalFSReadMethod::pread;
local_read_settings.local_fs_method = LocalFSReadMethod::pread;
if (use_external_buffer)
local_read_settings.local_fs_buffer_size = 0;
cache_file_reader = createReadBufferFromFileBase(
path,
local_read_settings,
std::nullopt,
std::nullopt,
file_segment.getFlagsForLocalRead(),
/*existing_memory=*/nullptr,
/*alignment=*/0,
/*use_external_buffer=*/true);
cache_file_reader
= createReadBufferFromFileBase(path, local_read_settings, std::nullopt, std::nullopt, file_segment.getFlagsForLocalRead());
if (getFileSizeFromReadBuffer(*cache_file_reader) == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read from an empty cache file: {}", path);

View File

@ -26,6 +26,7 @@ namespace ProfileEvents
extern const Event ThreadpoolReaderSubmitReadSynchronously;
extern const Event ThreadpoolReaderSubmitReadSynchronouslyBytes;
extern const Event ThreadpoolReaderSubmitReadSynchronouslyMicroseconds;
extern const Event ThreadpoolReaderSubmitLookupInCacheMicroseconds;
extern const Event AsynchronousReaderIgnoredBytes;
}
@ -83,7 +84,13 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
reader.seek(request.offset, SEEK_SET);
}
if (reader.isContentCached(request.offset, request.size))
bool is_content_cached = false;
{
ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderSubmitLookupInCacheMicroseconds);
is_content_cached = reader.isContentCached(request.offset, request.size);
}
if (is_content_cached)
{
std::promise<Result> promise;
std::future<Result> future = promise.get_future();

View File

@ -98,7 +98,7 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
auto implementation_buffer = object_storage->writeObject(object, mode, attributes, buf_size, modified_write_settings);
bool cache_on_write = modified_write_settings.enable_filesystem_cache_on_write_operations
&& FileCacheFactory::instance().getByName(cache_config_name).settings.cache_on_write_operations
&& FileCacheFactory::instance().getByName(cache_config_name)->getSettings().cache_on_write_operations
&& fs::path(object.remote_path).extension() != ".tmp";
/// Need to remove even if cache_on_write == false.

View File

@ -64,7 +64,7 @@ void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check *
}
}
auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings);
auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings, predefined_configuration ? "" : config_prefix);
auto disk = disk_it->second;
if (!dynamic_cast<const DiskObjectStorage *>(disk.get()))
throw Exception(ErrorCodes::BAD_ARGUMENTS,

View File

@ -46,6 +46,17 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransaction()
send_metadata ? metadata_helper.get() : nullptr);
}
DiskTransactionPtr DiskObjectStorage::createObjectStorageTransactionToAnotherDisk(DiskObjectStorage& to_disk)
{
return std::make_shared<MultipleDisksObjectStorageTransaction>(
*object_storage,
*metadata_storage,
*to_disk.getObjectStorage(),
*to_disk.getMetadataStorage(),
send_metadata ? metadata_helper.get() : nullptr);
}
DiskObjectStorage::DiskObjectStorage(
const String & name_,
const String & object_key_prefix_,
@ -179,12 +190,13 @@ void DiskObjectStorage::copyFile( /// NOLINT
const std::function<void()> & cancellation_hook
)
{
if (this == &to_disk)
if (getDataSourceDescription() == to_disk.getDataSourceDescription())
{
/// It may use s3-server-side copy
auto transaction = createObjectStorageTransaction();
transaction->copyFile(from_file_path, to_file_path);
transaction->commit();
/// It may use s3-server-side copy
auto & to_disk_object_storage = dynamic_cast<DiskObjectStorage &>(to_disk);
auto transaction = createObjectStorageTransactionToAnotherDisk(to_disk_object_storage);
transaction->copyFile(from_file_path, to_file_path);
transaction->commit();
}
else
{

View File

@ -222,6 +222,7 @@ private:
/// Create actual disk object storage transaction for operations
/// execution.
DiskTransactionPtr createObjectStorageTransaction();
DiskTransactionPtr createObjectStorageTransactionToAnotherDisk(DiskObjectStorage& to_disk);
String getReadResourceName() const;
String getWriteResourceName() const;

View File

@ -38,6 +38,29 @@ DiskObjectStorageTransaction::DiskObjectStorageTransaction(
, metadata_helper(metadata_helper_)
{}
DiskObjectStorageTransaction::DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
MetadataTransactionPtr metadata_transaction_)
: object_storage(object_storage_)
, metadata_storage(metadata_storage_)
, metadata_transaction(metadata_transaction_)
, metadata_helper(metadata_helper_)
{}
MultipleDisksObjectStorageTransaction::MultipleDisksObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
IObjectStorage& destination_object_storage_,
IMetadataStorage& destination_metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_)
: DiskObjectStorageTransaction(object_storage_, metadata_storage_, metadata_helper_, destination_metadata_storage_.createTransaction())
, destination_object_storage(destination_object_storage_)
, destination_metadata_storage(destination_metadata_storage_)
{}
namespace
{
/// Operation which affects only metadata. Simplest way to
@ -485,10 +508,12 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
std::string to_path;
StoredObjects created_objects;
IObjectStorage& destination_object_storage;
CopyFileObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
IObjectStorage & destination_object_storage_,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const std::string & from_path_,
@ -498,6 +523,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
, write_settings(write_settings_)
, from_path(from_path_)
, to_path(to_path_)
, destination_object_storage(destination_object_storage_)
{}
std::string getInfoForLog() const override
@ -515,7 +541,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
auto object_key = object_storage.generateObjectKeyForPath(to_path);
auto object_to = StoredObject(object_key.serialize());
object_storage.copyObject(object_from, object_to, read_settings, write_settings);
object_storage.copyObjectToAnotherObjectStorage(object_from, object_to,read_settings,write_settings, destination_object_storage);
tx->addBlobToMetadata(to_path, object_key, object_from.bytes_size);
@ -526,7 +552,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
void undo() override
{
for (const auto & object : created_objects)
object_storage.removeObject(object);
destination_object_storage.removeObject(object);
}
void finalize() override
@ -859,7 +885,13 @@ void DiskObjectStorageTransaction::createFile(const std::string & path)
void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
{
operations_to_execute.emplace_back(
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, read_settings, write_settings, from_file_path, to_file_path));
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, object_storage, read_settings, write_settings, from_file_path, to_file_path));
}
void MultipleDisksObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
{
operations_to_execute.emplace_back(
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, destination_object_storage, read_settings, write_settings, from_file_path, to_file_path));
}
void DiskObjectStorageTransaction::commit()

View File

@ -50,9 +50,9 @@ using DiskObjectStorageOperations = std::vector<DiskObjectStorageOperation>;
///
/// If something wrong happen on step 1 or 2 reverts all applied operations.
/// If finalize failed -- nothing is reverted, garbage is left in blob storage.
struct DiskObjectStorageTransaction final : public IDiskTransaction, std::enable_shared_from_this<DiskObjectStorageTransaction>
struct DiskObjectStorageTransaction : public IDiskTransaction, std::enable_shared_from_this<DiskObjectStorageTransaction>
{
private:
protected:
IObjectStorage & object_storage;
IMetadataStorage & metadata_storage;
@ -63,6 +63,12 @@ private:
DiskObjectStorageOperations operations_to_execute;
DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
MetadataTransactionPtr metadata_transaction_);
public:
DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
@ -118,6 +124,21 @@ public:
void createHardLink(const std::string & src_path, const std::string & dst_path) override;
};
struct MultipleDisksObjectStorageTransaction final : public DiskObjectStorageTransaction, std::enable_shared_from_this<MultipleDisksObjectStorageTransaction>
{
IObjectStorage& destination_object_storage;
IMetadataStorage& destination_metadata_storage;
MultipleDisksObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
IObjectStorage& destination_object_storage,
IMetadataStorage& destination_metadata_storage,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_);
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override;
};
using DiskObjectStorageTransactionPtr = std::shared_ptr<DiskObjectStorageTransaction>;
}

View File

@ -458,27 +458,39 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
/// Shortcut for S3
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
{
auto client_ = client.get();
auto client_ = dest_s3->client.get();
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
copyS3File(
client.get(),
bucket,
object_from.remote_path,
0,
size,
dest_s3->bucket,
object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
BlobStorageLogWriter::create(disk_name),
object_to_attributes,
scheduler,
/* for_disk_s3= */ true);
try {
copyS3File(
client_,
bucket,
object_from.remote_path,
0,
size,
dest_s3->bucket,
object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
BlobStorageLogWriter::create(disk_name),
object_to_attributes,
scheduler,
/* for_disk_s3= */ true);
return;
}
catch (S3Exception & exc)
{
/// If authentication/permissions error occurs then fallthrough to copy with buffer.
if (exc.getS3ErrorCode() != Aws::S3::S3Errors::ACCESS_DENIED)
throw;
LOG_WARNING(&Poco::Logger::get("S3ObjectStorage"),
"S3-server-side copy object from the disk {} to the disk {} can not be performed: {}\n",
getName(), dest_s3->getName(), exc.what());
}
}
else
IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, object_storage_to, object_to_attributes);
IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, object_storage_to, object_to_attributes);
}
void S3ObjectStorage::copyObject( // NOLINT

View File

@ -10,6 +10,13 @@
namespace DB
{
/// It's a bug in clang with three-way comparison operator
/// https://github.com/llvm/llvm-project/issues/55919
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
/** Mark is the position in the compressed file. The compressed file consists of adjacent compressed blocks.
* Mark is a tuple - the offset in the file to the start of the compressed block, the offset in the decompressed block to the start of the data.
*/
@ -18,12 +25,7 @@ struct MarkInCompressedFile
size_t offset_in_compressed_file;
size_t offset_in_decompressed_block;
bool operator==(const MarkInCompressedFile & rhs) const
{
return std::tie(offset_in_compressed_file, offset_in_decompressed_block)
== std::tie(rhs.offset_in_compressed_file, rhs.offset_in_decompressed_block);
}
bool operator!=(const MarkInCompressedFile & rhs) const { return !(*this == rhs); }
auto operator<=>(const MarkInCompressedFile &) const = default;
auto asTuple() const { return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block); }
@ -39,6 +41,10 @@ struct MarkInCompressedFile
}
};
#ifdef __clang__
#pragma clang diagnostic pop
#endif
/**
* In-memory representation of an array of marks.
*

View File

@ -99,6 +99,10 @@ if (TARGET ch_contrib::rapidjson)
list (APPEND PRIVATE_LIBS ch_contrib::rapidjson)
endif()
if (TARGET ch_contrib::pocketfft)
list (APPEND PRIVATE_LIBS ch_contrib::pocketfft)
endif()
if (TARGET ch_contrib::crc32-vpmsum)
list (APPEND PUBLIC_LIBS ch_contrib::crc32-vpmsum)
endif()

View File

@ -0,0 +1,227 @@
#include "config.h"
#if USE_POCKETFFT
# ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wshadow"
# pragma clang diagnostic ignored "-Wextra-semi-stmt"
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
# endif
# include <pocketfft_hdronly.h>
# ifdef __clang__
# pragma clang diagnostic pop
# endif
# include <cmath>
# include <Columns/ColumnArray.h>
# include <Columns/ColumnsNumber.h>
# include <DataTypes/DataTypeArray.h>
# include <DataTypes/DataTypesNumber.h>
# include <Functions/FunctionFactory.h>
# include <Functions/FunctionHelpers.h>
# include <Functions/IFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
/*Detect Period in time series data using FFT.
* FFT - Fast Fourier transform (https://en.wikipedia.org/wiki/Fast_Fourier_transform)
* 1. Convert time series data to frequency domain using FFT.
* 2. Remove the 0th(the Dc component) and n/2th the Nyquist frequency
* 3. Find the peak value (highest) for dominant frequency component.
* 4. Inverse of the dominant frequency component is the period.
*/
class FunctionSeriesPeriodDetectFFT : public IFunction
{
public:
static constexpr auto name = "seriesPeriodDetectFFT";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionSeriesPeriodDetectFFT>(); }
std::string getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
FunctionArgumentDescriptors args{{"time_series", &isArray<IDataType>, nullptr, "Array"}};
validateFunctionArgumentTypes(*this, arguments, args);
return std::make_shared<DataTypeFloat64>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
ColumnPtr array_ptr = arguments[0].column;
const ColumnArray * array = checkAndGetColumn<ColumnArray>(array_ptr.get());
const IColumn & src_data = array->getData();
const ColumnArray::Offsets & offsets = array->getOffsets();
auto res = ColumnFloat64::create(input_rows_count);
auto & res_data = res->getData();
ColumnArray::Offset prev_src_offset = 0;
Float64 period;
for (size_t i = 0; i < input_rows_count; ++i)
{
ColumnArray::Offset curr_offset = offsets[i];
if (executeNumbers<UInt8>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<UInt16>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<UInt32>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<UInt64>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Int8>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Int16>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Int32>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Int64>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Float32>(src_data, period, prev_src_offset, curr_offset)
|| executeNumbers<Float64>(src_data, period, prev_src_offset, curr_offset))
{
res_data[i] = period;
prev_src_offset = curr_offset;
}
else
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of first argument of function {}",
arguments[0].column->getName(),
getName());
}
return res;
}
template <typename T>
bool executeNumbers(const IColumn & src_data, Float64 & period, ColumnArray::Offset & start, ColumnArray::Offset & end) const
{
const ColumnVector<T> * src_data_concrete = checkAndGetColumn<ColumnVector<T>>(&src_data);
if (!src_data_concrete)
return false;
const PaddedPODArray<T> & src_vec = src_data_concrete->getData();
chassert(start <= end);
size_t len = end - start;
if (len < 4)
{
period = NAN; // At least four data points are required to detect period
return true;
}
std::vector<Float64> src((src_vec.begin() + start), (src_vec.begin() + end));
std::vector<std::complex<double>> out((len / 2) + 1);
pocketfft::shape_t shape{len};
pocketfft::shape_t axes;
axes.reserve(shape.size());
for (size_t i = 0; i < shape.size(); ++i)
axes.push_back(i);
pocketfft::stride_t stride_src{sizeof(double)};
pocketfft::stride_t stride_out{sizeof(std::complex<double>)};
pocketfft::r2c(shape, stride_src, stride_out, axes, pocketfft::FORWARD, src.data(), out.data(), static_cast<double>(1));
size_t spec_len = (len - 1) / 2; //removing the nyquist element when len is even
double max_mag = 0;
size_t idx = 1;
for (size_t i = 1; i < spec_len; ++i)
{
double magnitude = sqrt(out[i].real() * out[i].real() + out[i].imag() * out[i].imag());
if (magnitude > max_mag)
{
max_mag = magnitude;
idx = i;
}
}
// In case all FFT values are zero, it means the input signal is flat.
// It implies the period of the series should be 0.
if (max_mag == 0)
{
period = 0;
return true;
}
std::vector<double> xfreq(spec_len);
double step = 0.5 / (spec_len - 1);
for (size_t i = 0; i < spec_len; ++i)
xfreq[i] = i * step;
auto freq = xfreq[idx];
period = std::round(1 / freq);
return true;
}
};
REGISTER_FUNCTION(SeriesPeriodDetectFFT)
{
factory.registerFunction<FunctionSeriesPeriodDetectFFT>(FunctionDocumentation{
.description = R"(
Finds the period of the given time series data using FFT
FFT - Fast Fourier transform (https://en.wikipedia.org/wiki/Fast_Fourier_transform)
**Syntax**
``` sql
seriesPeriodDetectFFT(series);
```
**Arguments**
- `series` - An array of numeric values
**Returned value**
- A real value equal to the period of time series
- Returns NAN when number of data points are less than four.
Type: [Float64](../../sql-reference/data-types/float.md).
**Examples**
Query:
``` sql
SELECT seriesPeriodDetectFFT([1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6]) AS print_0;
```
Result:
``` text
print_0
3
```
``` sql
SELECT seriesPeriodDetectFFT(arrayMap(x -> abs((x % 6) - 3), range(1000))) AS print_0;
```
Result:
``` text
print_0
6
```
)",
.categories{"Time series analysis"}});
}
}
#endif

View File

@ -515,7 +515,9 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
// We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below
constexpr ResourceCost estimated_cost = 1;
ResourceGuard rlock(read_settings.resource_link, estimated_cost);
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
rlock.unlock();
if (outcome.IsSuccess())

View File

@ -13,9 +13,9 @@
namespace DB::S3
{
std::shared_ptr<Aws::Http::HttpClient>
PocoHTTPClientFactory::CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const
PocoHTTPClientFactory::CreateHttpClient(const Aws::Client::ClientConfiguration & client_configuration) const
{
return std::make_shared<PocoHTTPClient>(static_cast<const PocoHTTPClientConfiguration &>(clientConfiguration));
return std::make_shared<PocoHTTPClient>(static_cast<const PocoHTTPClientConfiguration &>(client_configuration));
}
std::shared_ptr<Aws::Http::HttpRequest> PocoHTTPClientFactory::CreateHttpRequest(

View File

@ -15,7 +15,7 @@ class PocoHTTPClientFactory : public Aws::Http::HttpClientFactory
public:
~PocoHTTPClientFactory() override = default;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpClient>
CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const override;
CreateHttpClient(const Aws::Client::ClientConfiguration & client_configuration) const override;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest>
CreateHttpRequest(const Aws::String & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest>

View File

@ -655,6 +655,7 @@ namespace
void performCopy()
{
LOG_TEST(log, "Copy object {} to {} using native copy", src_key, dest_key);
if (!supports_multipart_copy || size <= upload_settings.max_single_operation_copy_size)
performSingleOperationCopy();
else

View File

@ -16,6 +16,7 @@
#include <Common/Throttler_fwd.h>
#include <IO/S3/URI.h>
#include <IO/S3/Credentials.h>
#include <aws/core/Aws.h>
#include <aws/s3/S3Errors.h>

View File

@ -76,10 +76,9 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
: max_file_segment_size(settings.max_file_segment_size)
, bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0)
, boundary_alignment(settings.boundary_alignment)
, background_download_threads(settings.background_download_threads)
, metadata_download_threads(settings.load_metadata_threads)
, load_metadata_threads(settings.load_metadata_threads)
, log(&Poco::Logger::get("FileCache(" + cache_name + ")"))
, metadata(settings.base_path, settings.background_download_queue_size_limit)
, metadata(settings.base_path, settings.background_download_queue_size_limit, settings.background_download_threads)
{
if (settings.cache_policy == "LRU")
main_priority = std::make_unique<LRUFileCachePriority>(settings.max_size, settings.max_elements);
@ -159,12 +158,8 @@ void FileCache::initialize()
throw;
}
metadata.startup();
is_initialized = true;
for (size_t i = 0; i < background_download_threads; ++i)
download_threads.emplace_back([this] { metadata.downloadThreadFunc(); });
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(std::function{ [this]{ metadata.cleanupThreadFunc(); }});
}
CacheGuard::Lock FileCache::lockCache() const
@ -299,7 +294,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
size_t size,
FileSegment::State state,
size_t file_segments_limit,
const CreateFileSegmentSettings & settings)
const CreateFileSegmentSettings & create_settings)
{
assert(size > 0);
@ -316,7 +311,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
remaining_size -= current_file_segment_size;
auto file_segment_metadata_it = addFileSegment(
locked_key, current_pos, current_file_segment_size, state, settings, nullptr);
locked_key, current_pos, current_file_segment_size, state, create_settings, nullptr);
file_segments.push_back(file_segment_metadata_it->second->file_segment);
current_pos += current_file_segment_size;
@ -331,7 +326,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
const FileSegment::Range & range,
size_t file_segments_limit,
bool fill_with_detached_file_segments,
const CreateFileSegmentSettings & settings)
const CreateFileSegmentSettings & create_settings)
{
/// There are segments [segment1, ..., segmentN]
/// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
@ -388,7 +383,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
if (fill_with_detached_file_segments)
{
auto file_segment = std::make_shared<FileSegment>(
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, settings);
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, create_settings);
file_segments.insert(it, file_segment);
++processed_count;
@ -399,7 +394,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
FileSegments hole;
for (const auto & r : ranges)
{
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr);
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, create_settings, nullptr);
hole.push_back(metadata_it->second->file_segment);
++processed_count;
@ -444,7 +439,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
if (fill_with_detached_file_segments)
{
auto file_segment = std::make_shared<FileSegment>(
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, settings);
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, create_settings);
file_segments.insert(file_segments.end(), file_segment);
}
@ -454,7 +449,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
FileSegments hole;
for (const auto & r : ranges)
{
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr);
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, create_settings, nullptr);
hole.push_back(metadata_it->second->file_segment);
++processed_count;
@ -473,7 +468,7 @@ FileSegmentsHolderPtr FileCache::set(
const Key & key,
size_t offset,
size_t size,
const CreateFileSegmentSettings & settings)
const CreateFileSegmentSettings & create_settings)
{
assertInitialized();
@ -484,17 +479,17 @@ FileSegmentsHolderPtr FileCache::set(
if (!file_segments.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Having intersection with already existing cache");
if (settings.unbounded)
if (create_settings.unbounded)
{
/// If the file is unbounded, we can create a single file_segment_metadata for it.
auto file_segment_metadata_it = addFileSegment(
*locked_key, offset, size, FileSegment::State::EMPTY, settings, nullptr);
*locked_key, offset, size, FileSegment::State::EMPTY, create_settings, nullptr);
file_segments = {file_segment_metadata_it->second->file_segment};
}
else
{
file_segments = splitRangeIntoFileSegments(
*locked_key, offset, size, FileSegment::State::EMPTY, /* file_segments_limit */0, settings);
*locked_key, offset, size, FileSegment::State::EMPTY, /* file_segments_limit */0, create_settings);
}
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
@ -506,7 +501,7 @@ FileCache::getOrSet(
size_t offset,
size_t size,
size_t file_size,
const CreateFileSegmentSettings & settings,
const CreateFileSegmentSettings & create_settings,
size_t file_segments_limit)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheGetOrSetMicroseconds);
@ -612,7 +607,7 @@ FileCache::getOrSet(
if (file_segments.empty())
{
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, file_segments_limit, settings);
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, file_segments_limit, create_settings);
}
else
{
@ -620,7 +615,7 @@ FileCache::getOrSet(
chassert(file_segments.back()->range().left <= range.right);
fillHolesWithEmptyFileSegments(
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */false, settings);
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */false, create_settings);
if (!file_segments.front()->range().contains(offset))
{
@ -675,7 +670,7 @@ KeyMetadata::iterator FileCache::addFileSegment(
size_t offset,
size_t size,
FileSegment::State state,
const CreateFileSegmentSettings & settings,
const CreateFileSegmentSettings & create_settings,
const CacheGuard::Lock * lock)
{
/// Create a file_segment_metadata and put it in `files` map by [key][offset].
@ -729,7 +724,7 @@ KeyMetadata::iterator FileCache::addFileSegment(
result_state = state;
}
auto file_segment = std::make_shared<FileSegment>(key, offset, size, result_state, settings, background_download_threads > 0, this, locked_key.getKeyMetadata());
auto file_segment = std::make_shared<FileSegment>(key, offset, size, result_state, create_settings, metadata.isBackgroundDownloadEnabled(), this, locked_key.getKeyMetadata());
auto file_segment_metadata = std::make_shared<FileSegmentMetadata>(std::move(file_segment));
auto [file_segment_metadata_it, inserted] = locked_key.emplace(offset, file_segment_metadata);
@ -839,6 +834,15 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
return true;
}
void FileCache::iterate(IterateFunc && func)
{
return metadata.iterate([&](const LockedKey & locked_key)
{
for (const auto & file_segment_metadata : locked_key)
func(FileSegment::getInfo(file_segment_metadata.second->file_segment));
});
}
void FileCache::removeKey(const Key & key)
{
assertInitialized();
@ -933,9 +937,9 @@ void FileCache::loadMetadataImpl()
std::mutex set_exception_mutex;
std::atomic<bool> stop_loading = false;
LOG_INFO(log, "Loading filesystem cache with {} threads", metadata_download_threads);
LOG_INFO(log, "Loading filesystem cache with {} threads", load_metadata_threads);
for (size_t i = 0; i < metadata_download_threads; ++i)
for (size_t i = 0; i < load_metadata_threads; ++i)
{
try
{
@ -1137,15 +1141,8 @@ FileCache::~FileCache()
void FileCache::deactivateBackgroundOperations()
{
metadata.cancelDownload();
metadata.cancelCleanup();
for (auto & thread : download_threads)
if (thread.joinable())
thread.join();
if (cleanup_thread && cleanup_thread->joinable())
cleanup_thread->join();
shutdown.store(true);
metadata.shutdown();
}
std::vector<FileSegment::Info> FileCache::getFileSegmentInfos()
@ -1159,7 +1156,7 @@ std::vector<FileSegment::Info> FileCache::getFileSegmentInfos()
metadata.iterate([&](const LockedKey & locked_key)
{
for (const auto & [_, file_segment_metadata] : locked_key)
file_segments.push_back(FileSegment::getInfo(file_segment_metadata->file_segment, *this));
file_segments.push_back(FileSegment::getInfo(file_segment_metadata->file_segment));
});
return file_segments;
}
@ -1169,14 +1166,14 @@ std::vector<FileSegment::Info> FileCache::getFileSegmentInfos(const Key & key)
std::vector<FileSegment::Info> file_segments;
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW_LOGICAL);
for (const auto & [_, file_segment_metadata] : *locked_key)
file_segments.push_back(FileSegment::getInfo(file_segment_metadata->file_segment, *this));
file_segments.push_back(FileSegment::getInfo(file_segment_metadata->file_segment));
return file_segments;
}
std::vector<FileSegment::Info> FileCache::dumpQueue()
{
assertInitialized();
return main_priority->dump(*this, lockCache());
return main_priority->dump(lockCache());
}
std::vector<String> FileCache::tryGetCachePaths(const Key & key)
@ -1213,13 +1210,48 @@ void FileCache::assertCacheCorrectness()
{
for (const auto & [_, file_segment_metadata] : locked_key)
{
const auto & file_segment = *file_segment_metadata->file_segment;
UNUSED(file_segment);
chassert(file_segment.assertCorrectness());
chassert(file_segment_metadata->file_segment->assertCorrectness());
}
});
}
void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings, FileCacheSettings & actual_settings)
{
if (!is_initialized || shutdown || new_settings == actual_settings)
return;
std::lock_guard lock(apply_settings_mutex);
if (metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit))
{
LOG_INFO(log, "Changed background_download_queue_size from {} to {}",
actual_settings.background_download_queue_size_limit,
new_settings.background_download_queue_size_limit);
actual_settings.background_download_queue_size_limit = new_settings.background_download_queue_size_limit;
}
bool updated;
try
{
updated = metadata.setBackgroundDownloadThreads(new_settings.background_download_threads);
}
catch (...)
{
actual_settings.background_download_threads = metadata.getBackgroundDownloadThreads();
throw;
}
if (updated)
{
LOG_INFO(log, "Changed background_download_threads from {} to {}",
actual_settings.background_download_threads,
new_settings.background_download_threads);
actual_settings.background_download_threads = new_settings.background_download_threads;
}
}
FileCache::QueryContextHolder::QueryContextHolder(
const String & query_id_,
FileCache * cache_,
@ -1242,13 +1274,13 @@ FileCache::QueryContextHolder::~QueryContextHolder()
}
FileCache::QueryContextHolderPtr FileCache::getQueryContextHolder(
const String & query_id, const ReadSettings & settings)
const String & query_id, const ReadSettings & read_settings)
{
if (!query_limit || settings.filesystem_cache_max_download_size == 0)
if (!query_limit || read_settings.filesystem_cache_max_download_size == 0)
return {};
auto lock = lockCache();
auto context = query_limit->getOrSetQueryContext(query_id, settings, lock);
auto context = query_limit->getOrSetQueryContext(query_id, read_settings, lock);
return std::make_unique<QueryContextHolder>(query_id, this, std::move(context));
}
@ -1257,7 +1289,7 @@ std::vector<FileSegment::Info> FileCache::sync()
std::vector<FileSegment::Info> file_segments;
metadata.iterate([&](LockedKey & locked_key)
{
auto broken = locked_key.sync(*this);
auto broken = locked_key.sync();
file_segments.insert(file_segments.end(), broken.begin(), broken.end());
});
return file_segments;

View File

@ -16,6 +16,7 @@
#include <Interpreters/Cache/Metadata.h>
#include <Interpreters/Cache/QueryLimit.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <filesystem>
@ -150,14 +151,18 @@ public:
std::vector<FileSegment::Info> sync();
using IterateFunc = std::function<void(const FileSegmentInfo &)>;
void iterate(IterateFunc && func);
void applySettingsIfPossible(const FileCacheSettings & new_settings, FileCacheSettings & actual_settings);
private:
using KeyAndOffset = FileCacheKeyAndOffset;
const size_t max_file_segment_size;
const size_t bypass_cache_threshold = 0;
const size_t bypass_cache_threshold;
const size_t boundary_alignment;
const size_t background_download_threads; /// 0 means background download is disabled.
const size_t metadata_download_threads;
size_t load_metadata_threads;
Poco::Logger * log;
@ -165,6 +170,9 @@ private:
std::atomic<bool> is_initialized = false;
mutable std::mutex init_mutex;
std::unique_ptr<StatusFile> status_file;
std::atomic<bool> shutdown = false;
std::mutex apply_settings_mutex;
CacheMetadata metadata;
@ -195,12 +203,6 @@ private:
* then allowed loaded cache size is std::min(n - k, max_query_cache_size).
*/
FileCacheQueryLimitPtr query_limit;
/**
* A background cleanup task.
* Clears removed cache entries from metadata.
*/
std::vector<ThreadFromGlobalPool> download_threads;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
void assertInitialized() const;
void assertCacheCorrectness();

View File

@ -9,6 +9,22 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
FileCacheFactory::FileCacheData::FileCacheData(
FileCachePtr cache_,
const FileCacheSettings & settings_,
const std::string & config_path_)
: cache(cache_)
, config_path(config_path_)
, settings(settings_)
{
}
FileCacheSettings FileCacheFactory::FileCacheData::getSettings() const
{
std::lock_guard lock(settings_mutex);
return settings;
}
FileCacheFactory & FileCacheFactory::instance()
{
static FileCacheFactory ret;
@ -22,7 +38,9 @@ FileCacheFactory::CacheByName FileCacheFactory::getAll()
}
FileCachePtr FileCacheFactory::getOrCreate(
const std::string & cache_name, const FileCacheSettings & file_cache_settings)
const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path)
{
std::lock_guard lock(mutex);
@ -31,13 +49,16 @@ FileCachePtr FileCacheFactory::getOrCreate(
{
auto cache = std::make_shared<FileCache>(cache_name, file_cache_settings);
it = caches_by_name.emplace(
cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings)).first;
cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings, config_path)).first;
}
return it->second->cache;
}
FileCachePtr FileCacheFactory::create(const std::string & cache_name, const FileCacheSettings & file_cache_settings)
FileCachePtr FileCacheFactory::create(
const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path)
{
std::lock_guard lock(mutex);
@ -47,12 +68,12 @@ FileCachePtr FileCacheFactory::create(const std::string & cache_name, const File
auto cache = std::make_shared<FileCache>(cache_name, file_cache_settings);
it = caches_by_name.emplace(
cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings)).first;
cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings, config_path)).first;
return it->second->cache;
}
FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string & cache_name)
FileCacheFactory::FileCacheDataPtr FileCacheFactory::getByName(const std::string & cache_name)
{
std::lock_guard lock(mutex);
@ -60,7 +81,41 @@ FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string &
if (it == caches_by_name.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no cache by name: {}", cache_name);
return *it->second;
return it->second;
}
void FileCacheFactory::updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config)
{
CacheByName caches_by_name_copy;
{
std::lock_guard lock(mutex);
caches_by_name_copy = caches_by_name;
}
for (const auto & [_, cache_info] : caches_by_name_copy)
{
if (cache_info->config_path.empty())
continue;
FileCacheSettings new_settings;
new_settings.loadFromConfig(config, cache_info->config_path);
FileCacheSettings old_settings;
{
std::lock_guard lock(cache_info->settings_mutex);
if (new_settings == cache_info->settings)
continue;
old_settings = cache_info->settings;
}
cache_info->cache->applySettingsIfPossible(new_settings, old_settings);
{
std::lock_guard lock(cache_info->settings_mutex);
cache_info->settings = old_settings;
}
}
}
}

View File

@ -6,7 +6,6 @@
#include <boost/noncopyable.hpp>
#include <unordered_map>
#include <mutex>
#include <list>
namespace DB
{
@ -17,26 +16,42 @@ namespace DB
class FileCacheFactory final : private boost::noncopyable
{
public:
struct FileCacheData
class FileCacheData
{
FileCachePtr cache;
FileCacheSettings settings;
friend class FileCacheFactory;
public:
FileCacheData(FileCachePtr cache_, const FileCacheSettings & settings_, const std::string & config_path_);
FileCacheData() = default;
FileCacheData(FileCachePtr cache_, const FileCacheSettings & settings_) : cache(cache_), settings(settings_) {}
FileCacheSettings getSettings() const;
const FileCachePtr cache;
const std::string config_path;
private:
FileCacheSettings settings;
mutable std::mutex settings_mutex;
};
using FileCacheDataPtr = std::shared_ptr<FileCacheData>;
using CacheByName = std::unordered_map<std::string, FileCacheDataPtr>;
static FileCacheFactory & instance();
FileCachePtr getOrCreate(const std::string & cache_name, const FileCacheSettings & file_cache_settings);
FileCachePtr getOrCreate(
const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path);
FileCachePtr create(const std::string & cache_name, const FileCacheSettings & file_cache_settings);
FileCachePtr create(
const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path);
CacheByName getAll();
FileCacheData getByName(const std::string & cache_name);
FileCacheDataPtr getByName(const std::string & cache_name);
void updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config);
private:
std::mutex mutex;

View File

@ -38,6 +38,8 @@ struct FileCacheSettings
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
void loadFromCollection(const NamedCollection & collection);
bool operator ==(const FileCacheSettings &) const = default;
private:
using FuncHas = std::function<bool(std::string_view)>;
using FuncGetUInt = std::function<size_t(std::string_view)>;

View File

@ -120,6 +120,14 @@ String FileSegment::getPathInLocalCache() const
return getKeyMetadata()->getFileSegmentPath(*this);
}
String FileSegment::tryGetPathInLocalCache() const
{
auto metadata = tryGetKeyMetadata();
if (!metadata)
return "";
return metadata->getFileSegmentPath(*this);
}
FileSegmentGuard::Lock FileSegment::lockFileSegment() const
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentLockMicroseconds);
@ -833,13 +841,13 @@ void FileSegment::assertNotDetachedUnlocked(const FileSegmentGuard::Lock & lock)
}
}
FileSegment::Info FileSegment::getInfo(const FileSegmentPtr & file_segment, FileCache & cache)
FileSegment::Info FileSegment::getInfo(const FileSegmentPtr & file_segment)
{
auto lock = file_segment->lockFileSegment();
return Info{
.key = file_segment->key(),
.offset = file_segment->offset(),
.path = cache.getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->segment_kind),
.path = file_segment->tryGetPathInLocalCache(),
.range_left = file_segment->range().left,
.range_right = file_segment->range().right,
.kind = file_segment->segment_kind,

View File

@ -157,7 +157,7 @@ public:
/// exception.
void detach(const FileSegmentGuard::Lock &, const LockedKey &);
static FileSegmentInfo getInfo(const FileSegmentPtr & file_segment, FileCache & cache);
static FileSegmentInfo getInfo(const FileSegmentPtr & file_segment);
bool isDetached() const;
@ -243,6 +243,8 @@ private:
LockedKeyPtr lockKeyMetadata(bool assert_exists = true) const;
FileSegmentGuard::Lock lockFileSegment() const;
String tryGetPathInLocalCache() const;
Key file_key;
Range segment_range;
const FileSegmentKind segment_kind;

View File

@ -75,7 +75,7 @@ public:
virtual void shuffle(const CacheGuard::Lock &) = 0;
virtual std::vector<FileSegmentInfo> dump(FileCache & cache, const CacheGuard::Lock &) = 0;
virtual std::vector<FileSegmentInfo> dump(const CacheGuard::Lock &) = 0;
using FinalizeEvictionFunc = std::function<void(const CacheGuard::Lock & lk)>;
virtual bool collectCandidatesForEviction(

View File

@ -277,12 +277,12 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, L
return LRUIterator(this, it.iterator);
}
std::vector<FileSegmentInfo> LRUFileCachePriority::dump(FileCache & cache, const CacheGuard::Lock & lock)
std::vector<FileSegmentInfo> LRUFileCachePriority::dump(const CacheGuard::Lock & lock)
{
std::vector<FileSegmentInfo> res;
iterate([&](LockedKey &, const FileSegmentMetadataPtr & segment_metadata)
{
res.emplace_back(FileSegment::getInfo(segment_metadata->file_segment, cache));
res.emplace_back(FileSegment::getInfo(segment_metadata->file_segment));
return IterationResult::CONTINUE;
}, lock);
return res;

View File

@ -44,7 +44,7 @@ public:
void shuffle(const CacheGuard::Lock &) override;
std::vector<FileSegmentInfo> dump(FileCache & cache, const CacheGuard::Lock &) override;
std::vector<FileSegmentInfo> dump(const CacheGuard::Lock &) override;
void pop(const CacheGuard::Lock & lock) { remove(queue.begin(), lock); }

View File

@ -134,11 +134,12 @@ std::string KeyMetadata::getFileSegmentPath(const FileSegment & file_segment) co
/ CacheMetadata::getFileNameForFileSegment(file_segment.offset(), file_segment.getKind());
}
CacheMetadata::CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_)
CacheMetadata::CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_, size_t background_download_threads_)
: path(path_)
, cleanup_queue(std::make_shared<CleanupQueue>())
, download_queue(std::make_shared<DownloadQueue>(background_download_queue_size_limit_))
, log(&Poco::Logger::get("CacheMetadata"))
, download_threads_num(background_download_threads_)
{
}
@ -458,11 +459,6 @@ void CacheMetadata::cleanupThreadFunc()
}
}
void CacheMetadata::cancelCleanup()
{
cleanup_queue->cancel();
}
class DownloadQueue
{
friend struct CacheMetadata;
@ -473,7 +469,7 @@ public:
{
{
std::lock_guard lock(mutex);
if (cancelled || (queue_size_limit && queue.size() == queue_size_limit))
if (cancelled || (queue_size_limit && queue.size() >= queue_size_limit))
return false;
queue.push(DownloadInfo{file_segment->key(), file_segment->offset(), file_segment});
}
@ -483,6 +479,8 @@ public:
return true;
}
bool setQueueLimit(size_t size) { return queue_size_limit.exchange(size) != size; }
private:
void cancel()
{
@ -493,8 +491,8 @@ private:
cv.notify_all();
}
const size_t queue_size_limit;
std::mutex mutex;
std::atomic<size_t> queue_size_limit;
mutable std::mutex mutex;
std::condition_variable cv;
bool cancelled = false;
@ -515,7 +513,7 @@ private:
std::queue<DownloadInfo> queue;
};
void CacheMetadata::downloadThreadFunc()
void CacheMetadata::downloadThreadFunc(const bool & stop_flag)
{
std::optional<Memory<>> memory;
while (true)
@ -526,13 +524,13 @@ void CacheMetadata::downloadThreadFunc()
{
std::unique_lock lock(download_queue->mutex);
if (download_queue->cancelled)
if (download_queue->cancelled || stop_flag)
return;
if (download_queue->queue.empty())
{
download_queue->cv.wait(lock, [&](){ return download_queue->cancelled || !download_queue->queue.empty(); });
if (download_queue->cancelled)
download_queue->cv.wait(lock, [&](){ return download_queue->cancelled || !download_queue->queue.empty() || stop_flag; });
if (download_queue->cancelled || stop_flag)
return;
}
@ -607,6 +605,11 @@ void CacheMetadata::downloadThreadFunc()
}
}
bool CacheMetadata::setBackgroundDownloadQueueSizeLimit(size_t size)
{
return download_queue->setQueueLimit(size);
}
void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memory<>> & memory)
{
LOG_TEST(
@ -670,9 +673,86 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
LOG_TEST(log, "Downloaded file segment: {}", file_segment.getInfoForLog());
}
void CacheMetadata::cancelDownload()
void CacheMetadata::startup()
{
download_threads.reserve(download_threads_num);
for (size_t i = 0; i < download_threads_num; ++i)
{
download_threads.emplace_back(std::make_shared<DownloadThread>());
download_threads.back()->thread = std::make_unique<ThreadFromGlobalPool>([this, thread = download_threads.back()] { downloadThreadFunc(thread->stop_flag); });
}
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(std::function{ [this]{ cleanupThreadFunc(); }});
}
void CacheMetadata::shutdown()
{
download_queue->cancel();
cleanup_queue->cancel();
for (auto & download_thread : download_threads)
{
if (download_thread->thread && download_thread->thread->joinable())
download_thread->thread->join();
}
if (cleanup_thread && cleanup_thread->joinable())
cleanup_thread->join();
}
bool CacheMetadata::isBackgroundDownloadEnabled()
{
return download_threads_num;
}
bool CacheMetadata::setBackgroundDownloadThreads(size_t threads_num)
{
if (threads_num == download_threads_num)
return false;
if (threads_num > download_threads_num)
{
SCOPE_EXIT({ download_threads_num = download_threads.size(); });
size_t add_threads = threads_num - download_threads_num;
for (size_t i = 0; i < add_threads; ++i)
{
download_threads.emplace_back(std::make_shared<DownloadThread>());
try
{
download_threads.back()->thread = std::make_unique<ThreadFromGlobalPool>(
[this, thread = download_threads.back()] { downloadThreadFunc(thread->stop_flag); });
}
catch (...)
{
download_threads.pop_back();
throw;
}
}
}
else if (threads_num < download_threads_num)
{
size_t remove_threads = download_threads_num - threads_num;
{
std::lock_guard lock(download_queue->mutex);
for (size_t i = 0; i < remove_threads; ++i)
download_threads[download_threads.size() - 1 - i]->stop_flag = true;
}
download_queue->cv.notify_all();
SCOPE_EXIT({ download_threads_num = download_threads.size(); });
for (size_t i = 0; i < remove_threads; ++i)
{
chassert(download_threads.back()->stop_flag);
auto & thread = download_threads.back()->thread;
if (thread && thread->joinable())
thread->join();
download_threads.pop_back();
}
}
return true;
}
LockedKey::LockedKey(std::shared_ptr<KeyMetadata> key_metadata_)
@ -928,7 +1008,7 @@ std::string LockedKey::toString() const
}
std::vector<FileSegment::Info> LockedKey::sync(FileCache & cache)
std::vector<FileSegment::Info> LockedKey::sync()
{
std::vector<FileSegment::Info> broken;
for (auto it = key_metadata->begin(); it != key_metadata->end();)
@ -961,7 +1041,7 @@ std::vector<FileSegment::Info> LockedKey::sync(FileCache & cache)
"File segment has DOWNLOADED state, but file does not exist ({})",
file_segment->getInfoForLog());
broken.push_back(FileSegment::getInfo(file_segment, cache));
broken.push_back(FileSegment::getInfo(file_segment));
it = removeFileSegment(file_segment->offset(), file_segment->lock(), /* can_be_broken */true);
continue;
}
@ -980,7 +1060,7 @@ std::vector<FileSegment::Info> LockedKey::sync(FileCache & cache)
"File segment has unexpected size. Having {}, expected {} ({})",
actual_size, expected_size, file_segment->getInfoForLog());
broken.push_back(FileSegment::getInfo(file_segment, cache));
broken.push_back(FileSegment::getInfo(file_segment));
it = removeFileSegment(file_segment->offset(), file_segment->lock(), /* can_be_broken */false);
}
return broken;

View File

@ -5,6 +5,7 @@
#include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h>
#include <Common/ThreadPool.h>
#include <shared_mutex>
namespace DB
@ -102,7 +103,9 @@ public:
using Key = FileCacheKey;
using IterateFunc = std::function<void(LockedKey &)>;
explicit CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_);
explicit CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_, size_t background_download_threads_);
void startup();
const String & getBaseDirectory() const { return path; }
@ -115,6 +118,7 @@ public:
static String getFileNameForFileSegment(size_t offset, FileSegmentKind segment_kind);
void iterate(IterateFunc && func);
bool isEmpty() const;
enum class KeyNotFoundPolicy
@ -138,21 +142,13 @@ public:
void removeKey(const Key & key, bool if_exists, bool if_releasable);
void removeAllKeys(bool if_releasable);
void cancelCleanup();
void shutdown();
/// Firstly, this cleanup does not delete cache files,
/// but only empty keys from cache_metadata_map and key (prefix) directories from fs.
/// Secondly, it deletes those only if arose as a result of
/// (1) eviction in FileCache::tryReserve();
/// (2) removal of cancelled non-downloaded file segments after FileSegment::complete().
/// which does not include removal of cache files because of FileCache::removeKey/removeAllKeys,
/// triggered by removal of source files from objects storage.
/// E.g. number of elements submitted to background cleanup should remain low.
void cleanupThreadFunc();
bool setBackgroundDownloadThreads(size_t threads_num);
size_t getBackgroundDownloadThreads() const { return download_threads.size(); }
bool setBackgroundDownloadQueueSizeLimit(size_t size);
void downloadThreadFunc();
void cancelDownload();
bool isBackgroundDownloadEnabled();
private:
const std::string path; /// Cache base path
@ -172,6 +168,16 @@ private:
static constexpr size_t buckets_num = 1024;
std::vector<MetadataBucket> metadata_buckets{buckets_num};
struct DownloadThread
{
std::unique_ptr<ThreadFromGlobalPool> thread;
bool stop_flag{false};
};
std::vector<std::shared_ptr<DownloadThread>> download_threads;
std::atomic<size_t> download_threads_num;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
MetadataBucket & getMetadataBucket(const Key & key);
void downloadImpl(FileSegment & file_segment, std::optional<Memory<>> & memory);
MetadataBucket::iterator removeEmptyKey(
@ -179,6 +185,18 @@ private:
MetadataBucket::iterator it,
LockedKey &,
const CacheMetadataGuard::Lock &);
void downloadThreadFunc(const bool & stop_flag);
/// Firstly, this cleanup does not delete cache files,
/// but only empty keys from cache_metadata_map and key (prefix) directories from fs.
/// Secondly, it deletes those only if arose as a result of
/// (1) eviction in FileCache::tryReserve();
/// (2) removal of cancelled non-downloaded file segments after FileSegment::complete().
/// which does not include removal of cache files because of FileCache::removeKey/removeAllKeys,
/// triggered by removal of source files from objects storage.
/// E.g. number of elements submitted to background cleanup should remain low.
void cleanupThreadFunc();
};
@ -243,7 +261,7 @@ struct LockedKey : private boost::noncopyable
void markAsRemoved();
std::vector<FileSegment::Info> sync(FileCache & cache);
std::vector<FileSegment::Info> sync();
std::string toString() const;

Some files were not shown because too many files have changed in this diff Show More