Merge branch 'master' into async-reads

This commit is contained in:
Alexey Milovidov 2021-07-27 23:58:32 +03:00
commit fd5c1561e2
244 changed files with 3495 additions and 1936 deletions

2
contrib/rocksdb vendored

@ -1 +1 @@
Subproject commit dac0e9a68080c837d6b6223921f3fc151abbfcdc
Subproject commit b6480c69bf3ab6e298e0d019a07fd4f69029b26a

View File

@ -61,4 +61,7 @@ ENV TSAN_OPTIONS='halt_on_error=1 history_size=7'
ENV UBSAN_OPTIONS='print_stacktrace=1'
ENV MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1'
ENV TZ=Europe/Moscow
RUN ln -snf "/usr/share/zoneinfo/$TZ" /etc/localtime && echo "$TZ" > /etc/timezone
CMD sleep 1

View File

@ -2,7 +2,7 @@ version: '2.3'
services:
rabbitmq1:
image: rabbitmq:3-management-alpine
image: rabbitmq:3.8-management-alpine
hostname: rabbitmq1
expose:
- ${RABBITMQ_PORT}

View File

@ -2,6 +2,11 @@
set -e -x
# Choose random timezone for this test run
TZ="$(grep -v '#' /usr/share/zoneinfo/zone.tab | awk '{print $3}' | shuf | head -n1)"
echo "Choosen random timezone $TZ"
ln -snf "/usr/share/zoneinfo/$TZ" /etc/localtime && echo "$TZ" > /etc/timezone
dpkg -i package_folder/clickhouse-common-static_*.deb;
dpkg -i package_folder/clickhouse-common-static-dbg_*.deb
dpkg -i package_folder/clickhouse-server_*.deb

View File

@ -3,6 +3,11 @@
# fail on errors, verbose and export all env variables
set -e -x -a
# Choose random timezone for this test run.
TZ="$(grep -v '#' /usr/share/zoneinfo/zone.tab | awk '{print $3}' | shuf | head -n1)"
echo "Choosen random timezone $TZ"
ln -snf "/usr/share/zoneinfo/$TZ" /etc/localtime && echo "$TZ" > /etc/timezone
dpkg -i package_folder/clickhouse-common-static_*.deb
dpkg -i package_folder/clickhouse-common-static-dbg_*.deb
dpkg -i package_folder/clickhouse-server_*.deb

View File

@ -77,9 +77,6 @@ RUN mkdir -p /tmp/clickhouse-odbc-tmp \
&& odbcinst -i -s -l -f /tmp/clickhouse-odbc-tmp/share/doc/clickhouse-odbc/config/odbc.ini.sample \
&& rm -rf /tmp/clickhouse-odbc-tmp
ENV TZ=Europe/Moscow
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
COPY run.sh /
CMD ["/bin/bash", "/run.sh"]

View File

@ -1,8 +1,6 @@
# docker build -t yandex/clickhouse-unit-test .
FROM yandex/clickhouse-stateless-test
ENV TZ=Europe/Moscow
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN apt-get install gdb
COPY run.sh /

View File

@ -8,7 +8,7 @@ toc_title: Third-Party Libraries Used
The list of third-party libraries can be obtained by the following query:
``` sql
SELECT library_name, license_type, license_path FROM system.licenses ORDER BY library_name COLLATE 'en'
SELECT library_name, license_type, license_path FROM system.licenses ORDER BY library_name COLLATE 'en';
```
[Example](https://gh-api.clickhouse.tech/play?user=play#U0VMRUNUIGxpYnJhcnlfbmFtZSwgbGljZW5zZV90eXBlLCBsaWNlbnNlX3BhdGggRlJPTSBzeXN0ZW0ubGljZW5zZXMgT1JERVIgQlkgbGlicmFyeV9uYW1lIENPTExBVEUgJ2VuJw==)

View File

@ -749,7 +749,7 @@ If your code in the `master` branch is not buildable yet, exclude it from the bu
**1.** The C++20 standard library is used (experimental extensions are allowed), as well as `boost` and `Poco` frameworks.
**2.** It is not allowed to use libraries from OS packages. It is also not allowed to use pre-installed libraries. All libraries should be placed in form of source code in `contrib` directory and built with ClickHouse.
**2.** It is not allowed to use libraries from OS packages. It is also not allowed to use pre-installed libraries. All libraries should be placed in form of source code in `contrib` directory and built with ClickHouse. See [Guidelines for adding new third-party libraries](contrib.md#adding-third-party-libraries) for details.
**3.** Preference is always given to libraries that are already in use.

View File

@ -20,7 +20,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
Required parameters:
- `primary_key_name` any column name in the column list.
- `primary_key_name` any column name in the column list.
- `primary key` must be specified, it supports only one column in the primary key. The primary key will be serialized in binary as a `rocksdb key`.
- columns other than the primary key will be serialized in binary as `rocksdb` value in corresponding order.
- queries with key `equals` or `in` filtering will be optimized to multi keys lookup from `rocksdb`.
@ -39,4 +39,46 @@ ENGINE = EmbeddedRocksDB
PRIMARY KEY key
```
## Metrics
There is also `system.rocksdb` table, that expose rocksdb statistics:
```sql
SELECT
name,
value
FROM system.rocksdb
┌─name──────────────────────┬─value─┐
│ no.file.opens │ 1 │
│ number.block.decompressed │ 1 │
└───────────────────────────┴───────┘
```
## Configuration
You can also change any [rocksdb options](https://github.com/facebook/rocksdb/wiki/Option-String-and-Option-Map) using config:
```xml
<rocksdb>
<options>
<max_background_jobs>8</max_background_jobs>
</options>
<column_family_options>
<num_levels>2</num_levels>
</column_family_options>
<tables>
<table>
<name>TABLE</name>
<options>
<max_background_jobs>8</max_background_jobs>
</options>
<column_family_options>
<num_levels>2</num_levels>
</column_family_options>
</table>
</tables>
</rocksdb>
```
[Original article](https://clickhouse.tech/docs/en/engines/table-engines/integrations/embedded-rocksdb/) <!--hide-->

View File

@ -14,6 +14,8 @@ Engines of the family:
- [Log](../../../engines/table-engines/log-family/log.md)
- [TinyLog](../../../engines/table-engines/log-family/tinylog.md)
`Log` family table engines can store data to [HDFS](../../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-hdfs) or [S3](../../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-s3) distributed file systems.
## Common Properties {#common-properties}
Engines:

View File

@ -5,10 +5,8 @@ toc_title: Log
# Log {#log}
Engine belongs to the family of log engines. See the common properties of log engines and their differences in the [Log Engine Family](../../../engines/table-engines/log-family/index.md) article.
The engine belongs to the family of `Log` engines. See the common properties of `Log` engines and their differences in the [Log Engine Family](../../../engines/table-engines/log-family/index.md) article.
Log differs from [TinyLog](../../../engines/table-engines/log-family/tinylog.md) in that a small file of “marks” resides with the column files. These marks are written on every data block and contain offsets that indicate where to start reading the file in order to skip the specified number of rows. This makes it possible to read table data in multiple threads.
`Log` differs from [TinyLog](../../../engines/table-engines/log-family/tinylog.md) in that a small file of "marks" resides with the column files. These marks are written on every data block and contain offsets that indicate where to start reading the file in order to skip the specified number of rows. This makes it possible to read table data in multiple threads.
For concurrent data access, the read operations can be performed simultaneously, while write operations block reads and each other.
The Log engine does not support indexes. Similarly, if writing to a table failed, the table is broken, and reading from it returns an error. The Log engine is appropriate for temporary data, write-once tables, and for testing or demonstration purposes.
[Original article](https://clickhouse.tech/docs/en/operations/table_engines/log/) <!--hide-->
The `Log` engine does not support indexes. Similarly, if writing to a table failed, the table is broken, and reading from it returns an error. The `Log` engine is appropriate for temporary data, write-once tables, and for testing or demonstration purposes.

View File

@ -728,7 +728,9 @@ During this time, they are not moved to other volumes or disks. Therefore, until
## Using S3 for Data Storage {#table_engine-mergetree-s3}
`MergeTree` family table engines is able to store data to [S3](https://aws.amazon.com/s3/) using a disk with type `s3`.
`MergeTree` family table engines can store data to [S3](https://aws.amazon.com/s3/) using a disk with type `s3`.
This feature is under development and not ready for production. There are known drawbacks such as very low performance.
Configuration markup:
``` xml
@ -762,11 +764,13 @@ Configuration markup:
```
Required parameters:
- `endpoint` — S3 endpoint url in `path` or `virtual hosted` [styles](https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html). Endpoint url should contain bucket and root path to store data.
- `endpoint` — S3 endpoint URL in `path` or `virtual hosted` [styles](https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html). Endpoint URL should contain a bucket and root path to store data.
- `access_key_id` — S3 access key id.
- `secret_access_key` — S3 secret access key.
Optional parameters:
- `region` — S3 region name.
- `use_environment_credentials` — Reads AWS credentials from the Environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN if they exist. Default value is `false`.
- `use_insecure_imds_request` — If set to `true`, S3 client will use insecure IMDS request while obtaining credentials from Amazon EC2 metadata. Default value is `false`.
@ -782,7 +786,6 @@ Optional parameters:
- `skip_access_check` — If true, disk access checks will not be performed on disk start-up. Default value is `false`.
- `server_side_encryption_customer_key_base64` — If specified, required headers for accessing S3 objects with SSE-C encryption will be set.
S3 disk can be configured as `main` or `cold` storage:
``` xml
<storage_configuration>
@ -821,4 +824,43 @@ S3 disk can be configured as `main` or `cold` storage:
In case of `cold` option a data can be moved to S3 if local disk free size will be smaller than `move_factor * disk_size` or by TTL move rule.
[Original article](https://clickhouse.tech/docs/ru/operations/table_engines/mergetree/) <!--hide-->
## Using HDFS for Data Storage {#table_engine-mergetree-hdfs}
[HDFS](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html) is a distributed file system for remote data storage.
`MergeTree` family table engines can store data to HDFS using a disk with type `HDFS`.
Configuration markup:
``` xml
<yandex>
<storage_configuration>
<disks>
<hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse/</endpoint>
</hdfs>
</disks>
<policies>
<hdfs>
<volumes>
<main>
<disk>hdfs</disk>
</main>
</volumes>
</hdfs>
</policies>
</storage_configuration>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
</merge_tree>
</yandex>
```
Required parameters:
- `endpoint` — HDFS endpoint URL in `path` format. Endpoint URL should contain a root path to store data.
Optional parameters:
- `min_bytes_for_seek` — The minimal number of bytes to use seek operation instead of sequential read. Default value: `1 Mb`.

View File

@ -280,14 +280,13 @@ Default value: `0`.
## check_sample_column_is_correct {#check_sample_column_is_correct}
Enables to check column for sampling or sampling expression is correct at table creation.
Enables the check at table creation, that the data type of a column for sampling or sampling expression is correct. The data type must be one of unsigned [integer types](../../sql-reference/data-types/int-uint.md): `UInt8`, `UInt16`, `UInt32`, `UInt64`.
Possible values:
- true — Check column or sampling expression is correct at table creation.
- false — Do not check column or sampling expression is correct at table creation.
- true — The check is enabled.
- false — The check is disabled at table creation.
Default value: `true`.
By default, the ClickHouse server check column for sampling or sampling expression at table creation. If you already had tables with incorrect sampling expression, set value `false` to make ClickHouse server do not raise exception when ClickHouse server is starting.
[Original article](https://clickhouse.tech/docs/en/operations/settings/merge_tree_settings/) <!--hide-->
By default, the ClickHouse server checks at table creation the data type of a column for sampling or sampling expression. If you already have tables with incorrect sampling expression and do not want the server to raise an exception during startup, set `check_sample_column_is_correct` to `false`.

View File

@ -153,6 +153,26 @@ Possible values:
Default value: 1048576.
## table_function_remote_max_addresses {#table_function_remote_max_addresses}
Sets the maximum number of addresses generated from patterns for the [remote](../../sql-reference/table-functions/remote.md) function.
Possible values:
- Positive integer.
Default value: `1000`.
## glob_expansion_max_elements {#glob_expansion_max_elements }
Sets the maximum number of addresses generated from patterns for external storages and table functions (like [url](../../sql-reference/table-functions/url.md)) except the `remote` function.
Possible values:
- Positive integer.
Default value: `1000`.
## send_progress_in_http_headers {#settings-send_progress_in_http_headers}
Enables or disables `X-ClickHouse-Progress` HTTP response headers in `clickhouse-server` responses.

View File

@ -13,13 +13,14 @@ toc_title: Strings
Returns 1 for an empty string or 0 for a non-empty string.
The result type is UInt8.
A string is considered non-empty if it contains at least one byte, even if this is a space or a null byte.
The function also works for arrays.
The function also works for arrays or UUID.
UUID is empty if it is all zeros (nil UUID).
## notEmpty {#notempty}
Returns 0 for an empty string or 1 for a non-empty string.
The result type is UInt8.
The function also works for arrays.
The function also works for arrays or UUID.
## length {#length}

View File

@ -68,28 +68,6 @@ Multiple addresses can be comma-separated. In this case, ClickHouse will use dis
example01-01-1,example01-02-1
```
Part of the expression can be specified in curly brackets. The previous example can be written as follows:
``` text
example01-0{1,2}-1
```
Curly brackets can contain a range of numbers separated by two dots (non-negative integers). In this case, the range is expanded to a set of values that generate shard addresses. If the first number starts with zero, the values are formed with the same zero alignment. The previous example can be written as follows:
``` text
example01-{01..02}-1
```
If you have multiple pairs of curly brackets, it generates the direct product of the corresponding sets.
Addresses and parts of addresses in curly brackets can be separated by the pipe symbol (\|). In this case, the corresponding sets of addresses are interpreted as replicas, and the query will be sent to the first healthy replica. However, the replicas are iterated in the order currently set in the [load_balancing](../../operations/settings/settings.md#settings-load_balancing) setting. This example specifies two shards that each have two replicas:
``` text
example01-{01..02}-{1|2}
```
The number of addresses generated is limited by a constant. Right now this is 1000 addresses.
**Examples**
Selecting data from a remote server:
@ -106,4 +84,15 @@ INSERT INTO FUNCTION remote('127.0.0.1', currentDatabase(), 'remote_table') VALU
SELECT * FROM remote_table;
```
[Original article](https://clickhouse.tech/docs/en/sql-reference/table-functions/remote/) <!--hide-->
## Globs in Addresses {globs-in-addresses}
Patterns in curly brackets `{ }` are used to generate a set of shards and to specify replicas. If there are multiple pairs of curly brackets, then the direct product of the corresponding sets is generated.
The following pattern types are supported.
- {*a*,*b*} - Any number of variants separated by a comma. The pattern is replaced with *a* in the first shard address and it is replaced with *b* in the second shard address and so on. For instance, `example0{1,2}-1` generates addresses `example01-1` and `example02-1`.
- {*n*..*m*} - A range of numbers. This pattern generates shard addresses with incrementing indices from *n* to *m*. `example0{1..2}-1` generates `example01-1` and `example02-1`.
- {*0n*..*0m*} - A range of numbers with leading zeroes. This modification preserves leading zeroes in indices. The pattern `example{01..03}-1` generates `example01-1`, `example02-1` and `example03-1`.
- {*a*|*b*} - Any number of variants separated by a `|`. The pattern specifies replicas. For instance, `example01-{1|2}` generates replicas `example01-1` and `example01-2`.
The query will be sent to the first healthy replica. However, for `remote` the replicas are iterated in the order currently set in the [load_balancing](../../operations/settings/settings.md#settings-load_balancing) setting.
The number of generated addresses is limited by [table_function_remote_max_addresses](../../operations/settings/settings.md#table_function_remote_max_addresses) setting.

View File

@ -41,4 +41,7 @@ INSERT INTO FUNCTION url('http://127.0.0.1:8123/?query=INSERT+INTO+test_table+FO
SELECT * FROM test_table;
```
[Original article](https://clickhouse.tech/docs/en/sql-reference/table-functions/url/) <!--hide-->
## Globs in URL {globs-in-url}
Patterns in curly brackets `{ }` are used to generate a set of shards or to specify failover addresses. Supported pattern types and examples see in the description of the [remote](remote.md#globs-in-addresses) function.
Character `|` inside patterns is used to specify failover addresses. They are iterated in the same order as listed in the pattern. The number of generated addresses is limited by [glob_expansion_max_elements](../../operations/settings/settings.md#glob_expansion_max_elements) setting.

View File

@ -38,3 +38,15 @@ toc_title: "Используемые сторонние библиотеки"
| UnixODBC | [LGPL v2.1](https://github.com/ClickHouse-Extras/UnixODBC/tree/b0ad30f7f6289c12b76f04bfb9d466374bb32168) |
| zlib-ng | [Zlib License](https://github.com/ClickHouse-Extras/zlib-ng/blob/develop/LICENSE.md) |
| zstd | [BSD 3-Clause License](https://github.com/facebook/zstd/blob/dev/LICENSE) |
## Рекомендации по добавлению сторонних библиотек и поддержанию в них пользовательских изменений {#adding-third-party-libraries}
1. Весь внешний сторонний код должен находиться в отдельных папках внутри папки `contrib` репозитория ClickHouse. По возможности, используйте сабмодули Git.
2. Клонируйте официальный репозиторий [Clickhouse-extras](https://github.com/ClickHouse-Extras). Используйте официальные репозитории GitHub, если они доступны.
3. Создавайте новую ветку на основе той ветки, которую вы хотите интегрировать: например, `master` -> `clickhouse/master` или `release/vX.Y.Z` -> `clickhouse/release/vX.Y.Z`.
4. Все копии [Clickhouse-extras](https://github.com/ClickHouse-Extras) можно автоматически синхронизировать с удаленными репозиториями. Ветки `clickhouse/...` останутся незатронутыми, поскольку скорее всего никто не будет использовать этот шаблон именования в своих репозиториях.
5. Добавьте сабмодули в папку `contrib` репозитория ClickHouse, на который ссылаются клонированные репозитории. Настройте сабмодули для отслеживания изменений в соответствующих ветках `clickhouse/...`.
6. Каждый раз, когда необходимо внести изменения в код библиотеки, следует создавать отдельную ветку, например `clickhouse/my-fix`. Затем эта ветка должна быть слита (`merge`) в ветку, отслеживаемую сабмодулем, например, в `clickhouse/master` или `clickhouse/release/vX.Y.Z`.
7. Не добавляйте код в клоны репозитория [Clickhouse-extras](https://github.com/ClickHouse-Extras), если имя ветки не соответствует шаблону `clickhouse/...`.
8. Всегда вносите изменения с учетом того, что они попадут в официальный репозиторий. После того как PR будет влит из (ветки разработки/исправлений) вашего личного клона репозитория в [Clickhouse-extras](https://github.com/ClickHouse-Extras), и сабмодуль будет добавлен в репозиторий ClickHouse, рекомендуется сделать еще один PR из (ветки разработки/исправлений) репозитория [Clickhouse-extras](https://github.com/ClickHouse-Extras) в официальный репозиторий библиотеки. Таким образом будут решены следующие задачи: 1) публикуемый код может быть использован многократно и будет иметь более высокую ценность; 2) другие пользователи также смогут использовать его в своих целях; 3) поддержкой кода будут заниматься не только разработчики ClickHouse.
9. Чтобы сабмодуль начал использовать новый код из исходной ветки (например, `master`), сначала следует аккуратно выполнить слияние (`master` -> `clickhouse/master`), и только после этого изменения могут быть добавлены в основной репозиторий ClickHouse. Это связано с тем, что в отслеживаемую ветку (например, `clickhouse/master`) могут быть внесены изменения, и поэтому ветка может отличаться от первоисточника (`master`).

View File

@ -242,6 +242,8 @@ sudo bash -c "$(wget -O - https://apt.llvm.org/llvm.sh)"
Стиль кода: https://clickhouse.tech/docs/ru/development/style/
Рекомендации по добавлению сторонних библиотек и поддержанию в них пользовательских изменений: https://clickhouse.tech/docs/ru/development/contrib/#adding-third-party-libraries
Разработка тестов: https://clickhouse.tech/docs/ru/development/tests/
Список задач: https://github.com/ClickHouse/ClickHouse/issues?q=is%3Aopen+is%3Aissue+label%3A%22easy+task%22

View File

@ -820,11 +820,11 @@ The dictionary is configured incorrectly.
**10.** Ненужный код удаляется из исходников.
## Библиотеки {#biblioteki}
## Библиотеки {#libraries}
**1.** Используются стандартная библиотека C++20 (допустимо использовать экспериментальные расширения) а также фреймворки `boost`, `Poco`.
**1.** Используются стандартные библиотеки C++20 (допустимо использовать экспериментальные расширения), а также фреймворки `boost`, `Poco`.
**2.** Библиотеки должны быть расположены в виде исходников в директории `contrib` и собираться вместе с ClickHouse. Не разрешено использовать библиотеки, доступные в пакетах ОС или любые другие способы установки библиотек в систему.
**2.** Библиотеки должны быть расположены в виде исходников в директории `contrib` и собираться вместе с ClickHouse. Не разрешено использовать библиотеки, доступные в пакетах ОС, или любые другие способы установки библиотек в систему. Подробнее смотрите раздел [Рекомендации по добавлению сторонних библиотек и поддержанию в них пользовательских изменений](contrib.md#adding-third-party-libraries).
**3.** Предпочтение отдаётся уже использующимся библиотекам.
@ -902,4 +902,3 @@ function(
const & RangesInDataParts ranges,
size_t limit)
```

View File

@ -14,6 +14,8 @@ toc_priority: 29
- [Log](log.md)
- [TinyLog](tinylog.md)
Табличные движки семейства `Log` могут хранить данные в распределенных файловых системах [HDFS](../../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-hdfs) или [S3](../../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-s3).
## Общие свойства {#obshchie-svoistva}
Движки:

View File

@ -5,9 +5,8 @@ toc_title: Log
# Log {#log}
Движок относится к семейству движков Log. Смотрите общие свойства и различия движков в статье [Семейство Log](index.md).
Отличается от [TinyLog](tinylog.md) тем, что вместе с файлами столбцов лежит небольшой файл «засечек». Засечки пишутся на каждый блок данных и содержат смещение - с какого места нужно читать файл, чтобы пропустить заданное количество строк. Это позволяет читать данные из таблицы в несколько потоков.
При конкурентном доступе к данным, чтения могут выполняться одновременно, а записи блокируют чтения и друг друга.
Движок Log не поддерживает индексы. Также, если при записи в таблицу произошёл сбой, то таблица станет битой, и чтения из неё будут возвращать ошибку. Движок Log подходит для временных данных, write-once таблиц, а также для тестовых и демонстрационных целей.
Движок относится к семейству движков `Log`. Смотрите общие свойства и различия движков в статье [Семейство Log](../../../engines/table-engines/log-family/index.md).
Отличается от [TinyLog](../../../engines/table-engines/log-family/tinylog.md) тем, что вместе с файлами столбцов лежит небольшой файл "засечек". Засечки пишутся на каждый блок данных и содержат смещение: с какого места нужно читать файл, чтобы пропустить заданное количество строк. Это позволяет читать данные из таблицы в несколько потоков.
При конкурентном доступе к данным чтения могут выполняться одновременно, а записи блокируют чтения и друг друга.
Движок `Log` не поддерживает индексы. Также, если при записи в таблицу произошёл сбой, то таблица станет битой, и чтения из нее будут возвращать ошибку. Движок `Log` подходит для временных данных, write-once таблиц, а также для тестовых и демонстрационных целей.

View File

@ -771,7 +771,6 @@ SETTINGS storage_policy = 'moving_from_ssd_to_hdd'
- `cache_path` — путь в локальной файловой системе, где будут храниться кэш засечек и файлы индекса. Значение по умолчанию: `/var/lib/clickhouse/disks/<disk_name>/cache/`.
- `skip_access_check` — признак, выполнять ли проверку доступов при запуске диска. Если установлено значение `true`, то проверка не выполняется. Значение по умолчанию: `false`.
Диск S3 может быть сконфигурирован как `main` или `cold`:
``` xml
@ -810,3 +809,44 @@ SETTINGS storage_policy = 'moving_from_ssd_to_hdd'
```
Если диск сконфигурирован как `cold`, данные будут переноситься в S3 при срабатывании правил TTL или когда свободное место на локальном диске станет меньше порогового значения, которое определяется как `move_factor * disk_size`.
## Использование сервиса HDFS для хранения данных {#table_engine-mergetree-hdfs}
[HDFS](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html) — это распределенная файловая система для удаленного хранения данных.
Таблицы семейства `MergeTree` могут хранить данные в сервисе HDFS при использовании диска типа `HDFS`.
Пример конфигурации:
``` xml
<yandex>
<storage_configuration>
<disks>
<hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse/</endpoint>
</hdfs>
</disks>
<policies>
<hdfs>
<volumes>
<main>
<disk>hdfs</disk>
</main>
</volumes>
</hdfs>
</policies>
</storage_configuration>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
</merge_tree>
</yandex>
```
Обязательные параметры:
- `endpoint` — URL точки приема запроса на стороне HDFS в формате `path`. URL точки должен содержать путь к корневой директории на сервере, где хранятся данные.
Необязательные параметры:
- `min_bytes_for_seek` — минимальное количество байтов, которые используются для операций поиска вместо последовательного чтения. Значение по умолчанию: 1 МБайт.

View File

@ -277,4 +277,15 @@ Eсли суммарное число активных кусков во все
Значение по умолчанию: `0`.
[Original article](https://clickhouse.tech/docs/ru/operations/settings/merge_tree_settings/) <!--hide-->
## check_sample_column_is_correct {#check_sample_column_is_correct}
Разрешает проверку того, что тип данных столбца для сэмплирования или выражения сэмплирования при создании таблицы верный. Тип данных должен соответствовать одному из беззнаковых [целочисленных типов](../../sql-reference/data-types/int-uint.md): `UInt8`, `UInt16`, `UInt32`, `UInt64`.
Возможные значения:
- true — проверка включена.
- false — проверка при создании таблицы не проводится.
Значение по умолчанию: `true`.
По умолчанию сервер ClickHouse при создании таблицы проверяет тип данных столбца для сэмплирования или выражения сэмплирования. Если уже существуют таблицы с некорректным выражением сэмплирования, то чтобы не возникало исключение при запуске сервера, установите `check_sample_column_is_correct` в значение `false`.

View File

@ -129,6 +129,26 @@ ClickHouse применяет настройку в тех случаях, ко
Значение по умолчанию: 1048576.
## table_function_remote_max_addresses {#table_function_remote_max_addresses}
Задает максимальное количество адресов, которые могут быть сгенерированы из шаблонов для функции [remote](../../sql-reference/table-functions/remote.md).
Возможные значения:
- Положительное целое.
Значение по умолчанию: `1000`.
## glob_expansion_max_elements {#glob_expansion_max_elements }
Задает максимальное количество адресов, которые могут быть сгенерированы из шаблонов при использовании внешних хранилищ и при вызове табличных функциях (например, [url](../../sql-reference/table-functions/url.md)), кроме функции `remote`.
Возможные значения:
- Положительное целое.
Значение по умолчанию: `1000`.
## send_progress_in_http_headers {#settings-send_progress_in_http_headers}
Включает или отключает HTTP-заголовки `X-ClickHouse-Progress` в ответах `clickhouse-server`.

View File

@ -68,28 +68,6 @@ localhost
example01-01-1,example01-02-1
```
Часть выражения может быть указана в фигурных скобках. Предыдущий пример может быть записан следующим образом:
``` text
example01-0{1,2}-1
```
В фигурных скобках может быть указан диапазон (неотрицательных целых) чисел через две точки. В этом случае диапазон раскрывается в множество значений, генерирующих адреса шардов. Если запись первого числа начинается с нуля, то значения формируются с таким же выравниванием нулями. Предыдущий пример может быть записан следующим образом:
``` text
example01-{01..02}-1
```
При наличии нескольких пар фигурных скобок генерируется прямое произведение соответствующих множеств.
Адреса или их фрагменты в фигурных скобках можно указать через символ \|. В этом случае соответствующие множества адресов понимаются как реплики — запрос будет отправлен на первую живую реплику. При этом реплики перебираются в порядке, согласно текущей настройке [load_balancing](../../operations/settings/settings.md#settings-load_balancing). В этом примере указаны два шарда, в каждом из которых имеются две реплики:
``` text
example01-{01..02}-{1|2}
```
Количество генерируемых адресов ограничено константой. Сейчас это 1000 адресов.
**Примеры**
Выборка данных с удаленного сервера:
@ -106,3 +84,15 @@ INSERT INTO FUNCTION remote('127.0.0.1', currentDatabase(), 'remote_table') VALU
SELECT * FROM remote_table;
```
## Символы подстановки в адресах {globs-in-addresses}
Шаблоны в фигурных скобках `{ }` используются, чтобы сгенерировать список шардов или указать альтернативный адрес на случай отказа. В одном URL можно использовать несколько шаблонов.
Поддерживаются следующие типы шаблонов.
- {*a*,*b*} - несколько вариантов, разделенных запятой. Весь шаблон заменяется на *a* в адресе первого шарда, заменяется на *b* в адресе второго шарда и так далее. Например, `example0{1,2}-1` генерирует адреса `example01-1` и `example02-1`.
- {*n*..*m*} - диапазон чисел. Этот шаблон генерирует адреса шардов с увеличивающимися индексами от *n* до *m*. `example0{1..2}-1` генерирует `example01-1` и `example02-1`.
- {*0n*..*0m*} - диапазон чисел с ведущими нулями. Такой вариант сохраняет ведущие нули в индексах. По шаблону `example{01..03}-1` генерируются `example01-1`, `example02-1` и `example03-1`.
- {*a*|*b*} - несколько вариантов, разделенных `|`. Шаблон задает адреса реплик. Например, `example01-{1|2}` генерирует реплики`example01-1` и `example01-2`.
Запрос будет отправлен на первую живую реплику. При этом для `remote` реплики перебираются в порядке, заданном настройкой [load_balancing](../../operations/settings/settings.md#settings-load_balancing).
Количество генерируемых адресов ограничено настройкой [table_function_remote_max_addresses](../../operations/settings/settings.md#table_function_remote_max_addresses).

View File

@ -41,3 +41,7 @@ INSERT INTO FUNCTION url('http://127.0.0.1:8123/?query=INSERT+INTO+test_table+FO
SELECT * FROM test_table;
```
## Символы подстановки в URL {globs-in-url}
Шаблоны в фигурных скобках `{ }` используются, чтобы сгенерировать список шардов или указать альтернативные адреса на случай отказа. Поддерживаемые типы шаблонов и примеры смотрите в описании функции [remote](remote.md#globs-in-addresses).
Символ `|` внутри шаблонов используется, чтобы задать адреса, если предыдущие оказались недоступны. Эти адреса перебираются в том же порядке, в котором они указаны в шаблоне. Количество адресов, которые могут быть сгенерированы, ограничено настройкой [glob_expansion_max_elements](../../operations/settings/settings.md#glob_expansion_max_elements).

View File

@ -1156,4 +1156,27 @@
<!-- Uncomment to disable ClickHouse internal DNS caching. -->
<!-- <disable_internal_dns_cache>1</disable_internal_dns_cache> -->
<!-- You can also configure rocksdb like this: -->
<!--
<rocksdb>
<options>
<max_background_jobs>8</max_background_jobs>
</options>
<column_family_options>
<num_levels>2</num_levels>
</column_family_options>
<tables>
<table>
<name>TABLE</name>
<options>
<max_background_jobs>8</max_background_jobs>
</options>
<column_family_options>
<num_levels>2</num_levels>
</column_family_options>
</table>
</tables>
</rocksdb>
-->
</yandex>

View File

@ -10,7 +10,7 @@
namespace DB
{
/// Information about a role.
/// Information about roles enabled for a user at some specific time.
struct EnabledRolesInfo
{
boost::container::flat_set<UUID> current_roles;

View File

@ -43,6 +43,7 @@ SRCS(
SettingsProfile.cpp
SettingsProfileElement.cpp
SettingsProfilesCache.cpp
SettingsProfilesInfo.cpp
User.cpp
UsersConfigAccessStorage.cpp
tests/gtest_access_rights_ops.cpp

View File

@ -0,0 +1,86 @@
#include <Client/ConnectionPool.h>
#include <boost/functional/hash.hpp>
namespace DB
{
ConnectionPoolPtr ConnectionPoolFactory::get(
unsigned max_connections,
String host,
UInt16 port,
String default_database,
String user,
String password,
String cluster,
String cluster_secret,
String client_name,
Protocol::Compression compression,
Protocol::Secure secure,
Int64 priority)
{
Key key{
max_connections, host, port, default_database, user, password, cluster, cluster_secret, client_name, compression, secure, priority};
std::unique_lock lock(mutex);
auto [it, inserted] = pools.emplace(key, ConnectionPoolPtr{});
if (!inserted)
if (auto res = it->second.lock())
return res;
ConnectionPoolPtr ret
{
new ConnectionPool(
max_connections,
host,
port,
default_database,
user,
password,
cluster,
cluster_secret,
client_name,
compression,
secure,
priority),
[key, this](auto ptr)
{
{
std::lock_guard another_lock(mutex);
pools.erase(key);
}
delete ptr;
}
};
it->second = ConnectionPoolWeakPtr(ret);
return ret;
}
size_t ConnectionPoolFactory::KeyHash::operator()(const ConnectionPoolFactory::Key & k) const
{
using boost::hash_combine;
using boost::hash_value;
size_t seed = 0;
hash_combine(seed, hash_value(k.max_connections));
hash_combine(seed, hash_value(k.host));
hash_combine(seed, hash_value(k.port));
hash_combine(seed, hash_value(k.default_database));
hash_combine(seed, hash_value(k.user));
hash_combine(seed, hash_value(k.password));
hash_combine(seed, hash_value(k.cluster));
hash_combine(seed, hash_value(k.cluster_secret));
hash_combine(seed, hash_value(k.client_name));
hash_combine(seed, hash_value(k.compression));
hash_combine(seed, hash_value(k.secure));
hash_combine(seed, hash_value(k.priority));
return seed;
}
ConnectionPoolFactory & ConnectionPoolFactory::instance()
{
static ConnectionPoolFactory ret;
return ret;
}
}

View File

@ -135,4 +135,60 @@ private:
};
/**
* Connection pool factory. Responsible for creating new connection pools and reuse existing ones.
*/
class ConnectionPoolFactory final : private boost::noncopyable
{
public:
struct Key
{
unsigned max_connections;
String host;
UInt16 port;
String default_database;
String user;
String password;
String cluster;
String cluster_secret;
String client_name;
Protocol::Compression compression;
Protocol::Secure secure;
Int64 priority;
};
struct KeyHash
{
size_t operator()(const ConnectionPoolFactory::Key & k) const;
};
static ConnectionPoolFactory & instance();
ConnectionPoolPtr
get(unsigned max_connections,
String host,
UInt16 port,
String default_database,
String user,
String password,
String cluster,
String cluster_secret,
String client_name,
Protocol::Compression compression,
Protocol::Secure secure,
Int64 priority);
private:
mutable std::mutex mutex;
using ConnectionPoolWeakPtr = std::weak_ptr<IConnectionPool>;
std::unordered_map<Key, ConnectionPoolWeakPtr, KeyHash> pools;
};
inline bool operator==(const ConnectionPoolFactory::Key & lhs, const ConnectionPoolFactory::Key & rhs)
{
return lhs.max_connections == rhs.max_connections && lhs.host == rhs.host && lhs.port == rhs.port
&& lhs.default_database == rhs.default_database && lhs.user == rhs.user && lhs.password == rhs.password
&& lhs.cluster == rhs.cluster && lhs.cluster_secret == rhs.cluster_secret && lhs.client_name == rhs.client_name
&& lhs.compression == rhs.compression && lhs.secure == rhs.secure && lhs.priority == rhs.priority;
}
}

View File

@ -479,6 +479,15 @@ void HedgedConnections::checkNewReplica()
Connection * connection = nullptr;
HedgedConnectionsFactory::State state = hedged_connections_factory.waitForReadyConnections(connection);
if (cancelled)
{
/// Do not start new connection if query is already canceled.
if (connection)
connection->disconnect();
state = HedgedConnectionsFactory::State::CANNOT_CHOOSE;
}
processNewReplicaState(state, connection);
/// Check if we don't need to listen hedged_connections_factory file descriptor in epoll anymore.

View File

@ -12,9 +12,11 @@ PEERDIR(
SRCS(
Connection.cpp
ConnectionEstablisher.cpp
ConnectionPool.cpp
ConnectionPoolWithFailover.cpp
HedgedConnections.cpp
HedgedConnectionsFactory.cpp
IConnections.cpp
MultiplexedConnections.cpp
)

View File

@ -15,6 +15,7 @@
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Core/ExternalTable.h>
#include <Poco/Net/MessageHeader.h>
@ -159,12 +160,11 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
auto temporary_table = TemporaryTableHolder(getContext(), ColumnsDescription{columns}, {});
auto storage = temporary_table.getTable();
getContext()->addExternalTable(data->table_name, std::move(temporary_table));
BlockOutputStreamPtr output = storage->write(ASTPtr(), storage->getInMemoryMetadataPtr(), getContext());
auto sink = storage->write(ASTPtr(), storage->getInMemoryMetadataPtr(), getContext());
/// Write data
data->pipe->resize(1);
auto sink = std::make_shared<SinkToOutputStream>(std::move(output));
connect(*data->pipe->getOutputPort(0), sink->getPort());
auto processors = Pipe::detachProcessors(std::move(*data->pipe));

View File

@ -0,0 +1,114 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <iostream>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class PushingToSinkBlockOutputStream : public IBlockOutputStream
{
public:
explicit PushingToSinkBlockOutputStream(SinkToStoragePtr sink_)
: sink(std::move(sink_)), port(sink->getPort().getHeader(), sink.get()) {}
Block getHeader() const override { return sink->getPort().getHeader(); }
void write(const Block & block) override
{
/// In case writePrefix was not called.
if (!port.isConnected())
writePrefix();
if (!block)
return;
size_t num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
port.push(std::move(chunk));
while (true)
{
auto status = sink->prepare();
switch (status)
{
case IProcessor::Status::Ready:
sink->work();
continue;
case IProcessor::Status::NeedData:
return;
case IProcessor::Status::Async: [[fallthrough]];
case IProcessor::Status::ExpandPipeline: [[fallthrough]];
case IProcessor::Status::Finished: [[fallthrough]];
case IProcessor::Status::PortFull:
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Status {} in not expected in PushingToSinkBlockOutputStream::writePrefix",
IProcessor::statusToName(status));
}
}
}
void writePrefix() override
{
connect(port, sink->getPort());
while (true)
{
auto status = sink->prepare();
switch (status)
{
case IProcessor::Status::Ready:
sink->work();
continue;
case IProcessor::Status::NeedData:
return;
case IProcessor::Status::Async: [[fallthrough]];
case IProcessor::Status::ExpandPipeline: [[fallthrough]];
case IProcessor::Status::Finished: [[fallthrough]];
case IProcessor::Status::PortFull:
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Status {} in not expected in PushingToSinkBlockOutputStream::writePrefix",
IProcessor::statusToName(status));
}
}
}
void writeSuffix() override
{
port.finish();
while (true)
{
auto status = sink->prepare();
switch (status)
{
case IProcessor::Status::Ready:
sink->work();
continue;
case IProcessor::Status::Finished:
///flush();
return;
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::ExpandPipeline:
case IProcessor::Status::PortFull:
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Status {} in not expected in PushingToSinkBlockOutputStream::writeSuffix",
IProcessor::statusToName(status));
}
}
}
private:
SinkToStoragePtr sink;
OutputPort port;
};
}

View File

@ -13,12 +13,12 @@
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <Common/checkStackSize.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Storages/StorageValues.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Storages/StorageMaterializedView.h>
#include <common/logger_useful.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
namespace DB
{
@ -127,8 +127,12 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
/// Do not push to destination table if the flag is set
if (!no_destination)
{
output = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), getContext());
replicated_output = dynamic_cast<ReplicatedMergeTreeBlockOutputStream *>(output.get());
auto sink = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), getContext());
metadata_snapshot->check(sink->getPort().getHeader().getColumnsWithTypeAndName());
replicated_output = dynamic_cast<ReplicatedMergeTreeSink *>(sink.get());
output = std::make_shared<PushingToSinkBlockOutputStream>(std::move(sink));
}
}

View File

@ -13,7 +13,7 @@ class Logger;
namespace DB
{
class ReplicatedMergeTreeBlockOutputStream;
class ReplicatedMergeTreeSink;
/** Writes data to the specified table and to all dependent materialized views.
*/
@ -38,7 +38,7 @@ private:
StoragePtr storage;
StorageMetadataPtr metadata_snapshot;
BlockOutputStreamPtr output;
ReplicatedMergeTreeBlockOutputStream * replicated_output = nullptr;
ReplicatedMergeTreeSink * replicated_output = nullptr;
Poco::Logger * log;
ASTPtr query_ptr;

View File

@ -434,16 +434,13 @@ void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
/// Send the request to abort the execution of the request, if not already sent.
tryCancel("Cancelling query because enough data has been read", read_context);
/// Try to drain connections asynchronously.
if (auto conn = ConnectionCollector::enqueueConnectionCleanup(pool, connections))
{
/// Finish might be called in multiple threads. Make sure we release connections in thread-safe way.
std::lock_guard guard(connection_draining_mutex);
if (auto conn = ConnectionCollector::enqueueConnectionCleanup(pool, connections))
{
/// Drain connections synchronously.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
ConnectionCollector::drainConnections(*conn);
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
}
/// Drain connections synchronously.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
ConnectionCollector::drainConnections(*conn);
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
}
finished = true;
}

View File

@ -168,10 +168,6 @@ private:
std::atomic<bool> was_cancelled { false };
std::mutex was_cancelled_mutex;
/** Thread-safe connection draining.
*/
std::mutex connection_draining_mutex;
/** An exception from replica was received. No need in receiving more packets or
* requesting to cancel query execution
*/

View File

@ -19,6 +19,7 @@ SRCS(
BlockStreamProfileInfo.cpp
CheckConstraintsBlockOutputStream.cpp
ColumnGathererStream.cpp
ConnectionCollector.cpp
ConvertingBlockInputStream.cpp
CountingBlockOutputStream.cpp
DistinctSortedBlockInputStream.cpp

View File

@ -317,7 +317,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
String database_path = safeGetLiteralValue<String>(arguments[0], "SQLite");
return std::make_shared<DatabaseSQLite>(context, engine_define, database_path);
return std::make_shared<DatabaseSQLite>(context, engine_define, create.attach, database_path);
}
#endif

View File

@ -193,7 +193,17 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
UInt16 default_port = getContext()->getTCPPort();
bool secure = db_settings.cluster_secure_connection;
return std::make_shared<Cluster>(getContext()->getSettingsRef(), shards, username, password, default_port, false, secure);
bool treat_local_as_remote = false;
bool treat_local_port_as_remote = getContext()->getApplicationType() == Context::ApplicationType::LOCAL;
return std::make_shared<Cluster>(
getContext()->getSettingsRef(),
shards,
username,
password,
default_port,
treat_local_as_remote,
treat_local_port_as_remote,
secure);
}
void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(bool force_attach)

View File

@ -25,13 +25,15 @@ namespace ErrorCodes
DatabaseSQLite::DatabaseSQLite(
ContextPtr context_,
const ASTStorage * database_engine_define_,
bool is_attach_,
const String & database_path_)
: IDatabase("SQLite")
, WithContext(context_->getGlobalContext())
, database_engine_define(database_engine_define_->clone())
, database_path(database_path_)
, log(&Poco::Logger::get("DatabaseSQLite"))
{
sqlite_db = openSQLiteDB(database_path_, context_);
sqlite_db = openSQLiteDB(database_path_, context_, !is_attach_);
}
@ -57,6 +59,9 @@ DatabaseTablesIteratorPtr DatabaseSQLite::getTablesIterator(ContextPtr local_con
std::unordered_set<std::string> DatabaseSQLite::fetchTablesList() const
{
if (!sqlite_db)
sqlite_db = openSQLiteDB(database_path, getContext(), /* throw_on_error */true);
std::unordered_set<String> tables;
std::string query = "SELECT name FROM sqlite_master "
"WHERE type = 'table' AND name NOT LIKE 'sqlite_%'";
@ -85,6 +90,9 @@ std::unordered_set<std::string> DatabaseSQLite::fetchTablesList() const
bool DatabaseSQLite::checkSQLiteTable(const String & table_name) const
{
if (!sqlite_db)
sqlite_db = openSQLiteDB(database_path, getContext(), /* throw_on_error */true);
const String query = fmt::format("SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';", table_name);
auto callback_get_data = [](void * res, int, char **, char **) -> int
@ -125,6 +133,9 @@ StoragePtr DatabaseSQLite::tryGetTable(const String & table_name, ContextPtr loc
StoragePtr DatabaseSQLite::fetchTable(const String & table_name, ContextPtr local_context, bool table_checked) const
{
if (!sqlite_db)
sqlite_db = openSQLiteDB(database_path, getContext(), /* throw_on_error */true);
if (!table_checked && !checkSQLiteTable(table_name))
return StoragePtr{};
@ -136,6 +147,7 @@ StoragePtr DatabaseSQLite::fetchTable(const String & table_name, ContextPtr loca
auto storage = StorageSQLite::create(
StorageID(database_name, table_name),
sqlite_db,
database_path,
table_name,
ColumnsDescription{*columns},
ConstraintsDescription{},

View File

@ -19,7 +19,8 @@ class DatabaseSQLite final : public IDatabase, protected WithContext
public:
using SQLitePtr = std::shared_ptr<sqlite3>;
DatabaseSQLite(ContextPtr context_, const ASTStorage * database_engine_define_, const String & database_path_);
DatabaseSQLite(ContextPtr context_, const ASTStorage * database_engine_define_,
bool is_attach_, const String & database_path_);
String getEngineName() const override { return "SQLite"; }
@ -47,7 +48,9 @@ protected:
private:
ASTPtr database_engine_define;
SQLitePtr sqlite_db;
String database_path;
mutable SQLitePtr sqlite_db;
Poco::Logger * log;

View File

@ -1,6 +1,7 @@
#include "SQLiteUtils.h"
#if USE_SQLITE
#include <common/logger_useful.h>
#include <filesystem>
namespace fs = std::filesystem;
@ -14,7 +15,16 @@ namespace ErrorCodes
}
String validateSQLiteDatabasePath(const String & path, const String & user_files_path)
void processSQLiteError(const String & message, bool throw_on_error)
{
if (throw_on_error)
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, message);
else
LOG_ERROR(&Poco::Logger::get("SQLiteEngine"), message);
}
String validateSQLiteDatabasePath(const String & path, const String & user_files_path, bool throw_on_error)
{
String canonical_user_files_path = fs::canonical(user_files_path);
@ -27,27 +37,28 @@ String validateSQLiteDatabasePath(const String & path, const String & user_files
canonical_path = fs::canonical(path, err);
if (err)
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "SQLite database path '{}' is invalid. Error: {}", path, err.message());
processSQLiteError(fmt::format("SQLite database path '{}' is invalid. Error: {}", path, err.message()), throw_on_error);
if (!canonical_path.starts_with(canonical_user_files_path))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED,
"SQLite database file path '{}' must be inside 'user_files' directory", path);
processSQLiteError(fmt::format("SQLite database file path '{}' must be inside 'user_files' directory", path), throw_on_error);
return canonical_path;
}
SQLitePtr openSQLiteDB(const String & database_path, ContextPtr context)
SQLitePtr openSQLiteDB(const String & database_path, ContextPtr context, bool throw_on_error)
{
auto validated_path = validateSQLiteDatabasePath(database_path, context->getUserFilesPath());
auto validated_path = validateSQLiteDatabasePath(database_path, context->getUserFilesPath(), throw_on_error);
sqlite3 * tmp_sqlite_db = nullptr;
int status = sqlite3_open(validated_path.c_str(), &tmp_sqlite_db);
if (status != SQLITE_OK)
throw Exception(ErrorCodes::PATH_ACCESS_DENIED,
"Cannot access sqlite database. Error status: {}. Message: {}",
status, sqlite3_errstr(status));
{
processSQLiteError(fmt::format("Cannot access sqlite database. Error status: {}. Message: {}",
status, sqlite3_errstr(status)), throw_on_error);
return nullptr;
}
return std::shared_ptr<sqlite3>(tmp_sqlite_db, sqlite3_close);
}

View File

@ -15,7 +15,7 @@ namespace DB
using SQLitePtr = std::shared_ptr<sqlite3>;
SQLitePtr openSQLiteDB(const String & database_path, ContextPtr context);
SQLitePtr openSQLiteDB(const String & database_path, ContextPtr context, bool throw_on_error = true);
}

View File

@ -15,6 +15,7 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DISK_INDEX;
extern const int DATA_ENCRYPTION_ERROR;
}
namespace
@ -38,32 +39,54 @@ namespace
struct DiskEncryptedSettings
{
Algorithm encryption_algorithm;
String key;
DiskPtr wrapped_disk;
String path_on_wrapped_disk;
std::unordered_map<UInt64, String> keys;
UInt64 current_key_id;
Algorithm current_algorithm;
DiskEncryptedSettings(
const String & disk_name, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DisksMap & map)
{
try
{
encryption_algorithm = DEFAULT_ENCRYPTION_ALGORITHM;
current_algorithm = DEFAULT_ENCRYPTION_ALGORITHM;
if (config.has(config_prefix + ".algorithm"))
parseFromString(encryption_algorithm, config.getString(config_prefix + ".algorithm"));
parseFromString(current_algorithm, config.getString(config_prefix + ".algorithm"));
key = config.getString(config_prefix + ".key", "");
String key_hex = config.getString(config_prefix + ".key_hex", "");
if (!key.empty() && !key_hex.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Both 'key' and 'key_hex' are specified. There should be only one");
if (!key_hex.empty())
Strings config_keys;
config.keys(config_prefix, config_keys);
for (const std::string & config_key : config_keys)
{
assert(key.empty());
key = unhexKey(key_hex);
String key;
UInt64 key_id;
if ((config_key == "key") || config_key.starts_with("key["))
{
key = config.getString(config_prefix + "." + config_key, "");
key_id = config.getUInt64(config_prefix + "." + config_key + "[@id]", 0);
}
else if ((config_key == "key_hex") || config_key.starts_with("key_hex["))
{
key = unhexKey(config.getString(config_prefix + "." + config_key, ""));
key_id = config.getUInt64(config_prefix + "." + config_key + "[@id]", 0);
}
else
continue;
auto it = keys.find(key_id);
if (it != keys.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Multiple keys have the same ID {}", key_id);
keys[key_id] = key;
}
FileEncryption::checkKeySize(encryption_algorithm, key.size());
if (keys.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No keys, an encrypted disk needs keys to work", current_key_id);
current_key_id = config.getUInt64(config_prefix + ".current_key_id", 0);
if (!keys.contains(current_key_id))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Key with ID {} not found", current_key_id);
FileEncryption::checkKeySize(current_algorithm, keys[current_key_id].size());
String wrapped_disk_name = config.getString(config_prefix + ".disk", "");
if (wrapped_disk_name.empty())
@ -133,9 +156,15 @@ DiskEncrypted::DiskEncrypted(
const String & name_,
DiskPtr wrapped_disk_,
const String & path_on_wrapped_disk_,
FileEncryption::Algorithm encryption_algorithm_,
const String & key_)
: DiskDecorator(wrapped_disk_), name(name_), disk_path(path_on_wrapped_disk_), encryption_algorithm(encryption_algorithm_), key(key_)
const std::unordered_map<UInt64, String> & keys_,
UInt64 current_key_id_,
FileEncryption::Algorithm current_algorithm_)
: DiskDecorator(wrapped_disk_)
, name(name_)
, disk_path(path_on_wrapped_disk_)
, keys(keys_)
, current_key_id(current_key_id_)
, current_algorithm(current_algorithm_)
{
initialize();
}
@ -154,6 +183,15 @@ void DiskEncrypted::initialize()
delegate->createDirectories(disk_path);
}
String DiskEncrypted::getKey(UInt64 key_id) const
{
auto it = keys.find(key_id);
if (it == keys.end())
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Key with ID {} not found", key_id);
return it->second;
}
void DiskEncrypted::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
/// Check if we can copy the file without deciphering.
@ -162,9 +200,9 @@ void DiskEncrypted::copy(const String & from_path, const std::shared_ptr<IDisk>
/// Disk type is the same, check if the key is the same too.
if (auto * to_encrypted_disk = typeid_cast<DiskEncrypted *>(to_disk.get()))
{
if ((encryption_algorithm == to_encrypted_disk->encryption_algorithm) && (key == to_encrypted_disk->key))
if (keys == to_encrypted_disk->keys)
{
/// Key is the same so we can simply copy the encrypted file.
/// Keys are the same so we can simply copy the encrypted file.
delegate->copy(wrappedPath(from_path), to_encrypted_disk->delegate, to_encrypted_disk->wrappedPath(to_path));
return;
}
@ -183,31 +221,62 @@ std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const
{
auto wrapped_path = wrappedPath(path);
auto buffer = delegate->readFile(wrapped_path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
InitVector iv;
iv.read(*buffer);
return std::make_unique<ReadBufferFromEncryptedFile>(buf_size, std::move(buffer), encryption_algorithm, key, iv);
try
{
auto wrapped_path = wrappedPath(path);
auto buffer = delegate->readFile(wrapped_path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
FileEncryption::Header header;
header.read(*buffer);
String key = getKey(header.key_id);
if (calculateKeyHash(key) != header.key_hash)
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Wrong key, could not read file");
return std::make_unique<ReadBufferFromEncryptedFile>(buf_size, std::move(buffer), key, header);
}
catch (Exception & e)
{
e.addMessage("File " + quoteString(path));
throw;
}
}
std::unique_ptr<WriteBufferFromFileBase> DiskEncrypted::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
InitVector iv;
UInt64 old_file_size = 0;
auto wrapped_path = wrappedPath(path);
if (mode == WriteMode::Append && exists(path) && getFileSize(path))
try
{
auto read_buffer = delegate->readFile(wrapped_path, InitVector::kSize);
iv.read(*read_buffer);
old_file_size = getFileSize(path);
auto wrapped_path = wrappedPath(path);
FileEncryption::Header header;
String key;
UInt64 old_file_size = 0;
if (mode == WriteMode::Append && exists(path))
{
old_file_size = getFileSize(path);
if (old_file_size)
{
/// Append mode: we continue to use the same header.
auto read_buffer = delegate->readFile(wrapped_path, FileEncryption::Header::kSize);
header.read(*read_buffer);
key = getKey(header.key_id);
if (calculateKeyHash(key) != header.key_hash)
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Wrong key, could not append file");
}
}
if (!old_file_size)
{
/// Rewrite mode: we generate a new header.
key = getKey(current_key_id);
header.algorithm = current_algorithm;
header.key_id = current_key_id;
header.key_hash = calculateKeyHash(key);
header.init_vector = InitVector::random();
}
auto buffer = delegate->writeFile(wrapped_path, buf_size, mode);
return std::make_unique<WriteBufferFromEncryptedFile>(buf_size, std::move(buffer), key, header, old_file_size);
}
catch (Exception & e)
{
e.addMessage("File " + quoteString(path));
throw;
}
else
iv = InitVector::random();
auto buffer = delegate->writeFile(wrapped_path, buf_size, mode);
return std::make_unique<WriteBufferFromEncryptedFile>(buf_size, std::move(buffer), encryption_algorithm, key, iv, old_file_size);
}
@ -215,13 +284,13 @@ size_t DiskEncrypted::getFileSize(const String & path) const
{
auto wrapped_path = wrappedPath(path);
size_t size = delegate->getFileSize(wrapped_path);
return size > InitVector::kSize ? (size - InitVector::kSize) : 0;
return size > FileEncryption::Header::kSize ? (size - FileEncryption::Header::kSize) : 0;
}
void DiskEncrypted::truncateFile(const String & path, size_t size)
{
auto wrapped_path = wrappedPath(path);
delegate->truncateFile(wrapped_path, size ? (size + InitVector::kSize) : 0);
delegate->truncateFile(wrapped_path, size ? (size + FileEncryption::Header::kSize) : 0);
}
SyncGuardPtr DiskEncrypted::getDirectorySyncGuard(const String & path) const
@ -239,8 +308,9 @@ void DiskEncrypted::applyNewSettings(
DiskEncryptedSettings settings{name, config, config_prefix, map};
delegate = settings.wrapped_disk;
disk_path = settings.path_on_wrapped_disk;
encryption_algorithm = settings.encryption_algorithm;
key = settings.key;
keys = settings.keys;
current_key_id = settings.current_key_id;
current_algorithm = settings.current_algorithm;
initialize();
}
@ -254,7 +324,12 @@ void registerDiskEncrypted(DiskFactory & factory)
{
DiskEncryptedSettings settings{name, config, config_prefix, map};
return std::make_shared<DiskEncrypted>(
name, settings.wrapped_disk, settings.path_on_wrapped_disk, settings.encryption_algorithm, settings.key);
name,
settings.wrapped_disk,
settings.path_on_wrapped_disk,
settings.keys,
settings.current_key_id,
settings.current_algorithm);
};
factory.registerDiskType("encrypted", creator);
}

View File

@ -25,8 +25,9 @@ public:
const String & name_,
DiskPtr wrapped_disk_,
const String & path_on_wrapped_disk_,
FileEncryption::Algorithm encryption_algorithm_,
const String & key_);
const std::unordered_map<UInt64, String> & keys_,
UInt64 current_key_id_,
FileEncryption::Algorithm current_algorithm_);
const String & getName() const override { return name; }
const String & getPath() const override { return disk_absolute_path; }
@ -224,11 +225,14 @@ private:
return disk_path + path;
}
String getKey(UInt64 key_id) const;
String name;
String disk_path;
String disk_absolute_path;
FileEncryption::Algorithm encryption_algorithm;
String key;
std::unordered_map<UInt64, String> keys;
UInt64 current_key_id;
FileEncryption::Algorithm current_algorithm;
};
}

View File

@ -1423,18 +1423,23 @@ namespace
};
/// Serializes a ColumnVector<UInt32> containing dates to a field of any type except TYPE_MESSAGE, TYPE_GROUP, TYPE_BOOL, TYPE_ENUM.
/// Serializes a ColumnVector<UInt32> containing datetimes to a field of any type except TYPE_MESSAGE, TYPE_GROUP, TYPE_BOOL, TYPE_ENUM.
class ProtobufSerializerDateTime : public ProtobufSerializerNumber<UInt32>
{
public:
ProtobufSerializerDateTime(
const FieldDescriptor & field_descriptor_, const ProtobufReaderOrWriter & reader_or_writer_)
: ProtobufSerializerNumber<UInt32>(field_descriptor_, reader_or_writer_)
const DataTypeDateTime & type,
const FieldDescriptor & field_descriptor_,
const ProtobufReaderOrWriter & reader_or_writer_)
: ProtobufSerializerNumber<UInt32>(field_descriptor_, reader_or_writer_),
date_lut(type.getTimeZone())
{
setFunctions();
}
protected:
const DateLUTImpl & date_lut;
void setFunctions()
{
switch (field_typeid)
@ -1458,17 +1463,17 @@ namespace
{
write_function = [this](UInt32 value)
{
dateTimeToString(value, text_buffer);
dateTimeToString(value, text_buffer, date_lut);
writeStr(text_buffer);
};
read_function = [this]() -> UInt32
{
readStr(text_buffer);
return stringToDateTime(text_buffer);
return stringToDateTime(text_buffer, date_lut);
};
default_function = [this]() -> UInt32 { return stringToDateTime(field_descriptor.default_value_string()); };
default_function = [this]() -> UInt32 { return stringToDateTime(field_descriptor.default_value_string(), date_lut); };
break;
}
@ -1477,17 +1482,17 @@ namespace
}
}
static void dateTimeToString(time_t tm, String & str)
static void dateTimeToString(time_t tm, String & str, const DateLUTImpl & lut)
{
WriteBufferFromString buf{str};
writeDateTimeText(tm, buf);
writeDateTimeText(tm, buf, lut);
}
static time_t stringToDateTime(const String & str)
static time_t stringToDateTime(const String & str, const DateLUTImpl & lut)
{
ReadBufferFromString buf{str};
time_t tm = 0;
readDateTimeText(tm, buf);
readDateTimeText(tm, buf, lut);
if (tm < 0)
tm = 0;
return tm;
@ -2833,7 +2838,7 @@ namespace
case TypeIndex::Float32: return std::make_unique<ProtobufSerializerNumber<Float32>>(field_descriptor, reader_or_writer);
case TypeIndex::Float64: return std::make_unique<ProtobufSerializerNumber<Float64>>(field_descriptor, reader_or_writer);
case TypeIndex::Date: return std::make_unique<ProtobufSerializerDate>(field_descriptor, reader_or_writer);
case TypeIndex::DateTime: return std::make_unique<ProtobufSerializerDateTime>(field_descriptor, reader_or_writer);
case TypeIndex::DateTime: return std::make_unique<ProtobufSerializerDateTime>(assert_cast<const DataTypeDateTime &>(*data_type), field_descriptor, reader_or_writer);
case TypeIndex::DateTime64: return std::make_unique<ProtobufSerializerDateTime64>(assert_cast<const DataTypeDateTime64 &>(*data_type), field_descriptor, reader_or_writer);
case TypeIndex::String: return std::make_unique<ProtobufSerializerString<false>>(field_descriptor, reader_or_writer);
case TypeIndex::FixedString: return std::make_unique<ProtobufSerializerString<true>>(typeid_cast<std::shared_ptr<const DataTypeFixedString>>(data_type), field_descriptor, reader_or_writer);

View File

@ -0,0 +1,88 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
#include <Access/AccessControlManager.h>
#include <Access/EnabledRolesInfo.h>
#include <Access/User.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
namespace DB
{
namespace
{
enum class Kind
{
CURRENT_ROLES,
ENABLED_ROLES,
DEFAULT_ROLES,
};
template <Kind kind>
class FunctionCurrentRoles : public IFunction
{
public:
static constexpr auto name = (kind == Kind::CURRENT_ROLES) ? "currentRoles" : ((kind == Kind::ENABLED_ROLES) ? "enabledRoles" : "defaultRoles");
static FunctionPtr create(const ContextPtr & context) { return std::make_shared<FunctionCurrentRoles>(context); }
String getName() const override { return name; }
explicit FunctionCurrentRoles(const ContextPtr & context)
{
if constexpr (kind == Kind::CURRENT_ROLES)
{
role_names = context->getRolesInfo()->getCurrentRolesNames();
}
else if constexpr (kind == Kind::ENABLED_ROLES)
{
role_names = context->getRolesInfo()->getEnabledRolesNames();
}
else
{
static_assert(kind == Kind::DEFAULT_ROLES);
const auto & manager = context->getAccessControlManager();
if (auto user = context->getUser())
role_names = manager.tryReadNames(user->granted_roles.findGranted(user->default_roles));
}
/// We sort the names because the result of the function should not depend on the order of UUIDs.
std::sort(role_names.begin(), role_names.end());
}
size_t getNumberOfArguments() const override { return 0; }
bool isDeterministic() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
auto col_res = ColumnArray::create(ColumnString::create());
ColumnString & res_strings = typeid_cast<ColumnString &>(col_res->getData());
ColumnArray::Offsets & res_offsets = col_res->getOffsets();
for (const String & role_name : role_names)
res_strings.insertData(role_name.data(), role_name.length());
res_offsets.push_back(res_strings.size());
return ColumnConst::create(std::move(col_res), input_rows_count);
}
private:
Strings role_names;
};
}
void registerFunctionCurrentRoles(FunctionFactory & factory)
{
factory.registerFunction<FunctionCurrentRoles<Kind::CURRENT_ROLES>>();
factory.registerFunction<FunctionCurrentRoles<Kind::ENABLED_ROLES>>();
factory.registerFunction<FunctionCurrentRoles<Kind::DEFAULT_ROLES>>();
}
}

View File

@ -127,13 +127,16 @@ ColumnPtr FunctionHasColumnInTable::executeImpl(const ColumnsWithTypeAndName & a
{
std::vector<std::vector<String>> host_names = {{ host_name }};
bool treat_local_as_remote = false;
bool treat_local_port_as_remote = getContext()->getApplicationType() == Context::ApplicationType::LOCAL;
auto cluster = std::make_shared<Cluster>(
getContext()->getSettings(),
host_names,
!user_name.empty() ? user_name : "default",
password,
getContext()->getTCPPort(),
false);
treat_local_as_remote,
treat_local_port_as_remote);
// FIXME this (probably) needs a non-constant access to query context,
// because it might initialized a storage. Ideally, the tables required

View File

@ -10,6 +10,7 @@ class FunctionFactory;
void registerFunctionCurrentDatabase(FunctionFactory &);
void registerFunctionCurrentUser(FunctionFactory &);
void registerFunctionCurrentProfiles(FunctionFactory &);
void registerFunctionCurrentRoles(FunctionFactory &);
void registerFunctionHostName(FunctionFactory &);
void registerFunctionFQDN(FunctionFactory &);
void registerFunctionVisibleWidth(FunctionFactory &);
@ -87,6 +88,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionCurrentDatabase(factory);
registerFunctionCurrentUser(factory);
registerFunctionCurrentProfiles(factory);
registerFunctionCurrentRoles(factory);
registerFunctionHostName(factory);
registerFunctionFQDN(factory);
registerFunctionVisibleWidth(factory);

View File

@ -232,6 +232,8 @@ SRCS(
countSubstringsCaseInsensitive.cpp
countSubstringsCaseInsensitiveUTF8.cpp
currentDatabase.cpp
currentProfiles.cpp
currentRoles.cpp
currentUser.cpp
dateDiff.cpp
dateName.cpp
@ -319,6 +321,7 @@ SRCS(
ilike.cpp
in.cpp
indexHint.cpp
initialQueryID.cpp
initializeAggregation.cpp
intDiv.cpp
intDivOrZero.cpp
@ -412,6 +415,7 @@ SRCS(
positionCaseInsensitiveUTF8.cpp
positionUTF8.cpp
pow.cpp
queryID.cpp
rand.cpp
rand64.cpp
randConstant.cpp

View File

@ -5,6 +5,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Common/SipHash.h>
#include <boost/algorithm/string/predicate.hpp>
#include <cassert>
@ -180,6 +181,9 @@ namespace
memcpy(out, plaintext, plaintext_size);
return plaintext_size;
}
constexpr const char kHeaderSignature[] = "ENC";
constexpr const UInt16 kHeaderCurrentVersion = 1;
}
@ -352,9 +356,57 @@ void Encryptor::decrypt(const char * data, size_t size, char * out)
offset += in_size;
}
bool isKeyLengthSupported(size_t key_length)
void Header::read(ReadBuffer & in)
{
return (key_length == 16) || (key_length == 24) || (key_length == 32);
constexpr size_t header_signature_size = std::size(kHeaderSignature) - 1;
char signature[std::size(kHeaderSignature)] = {};
in.readStrict(signature, header_signature_size);
if (strcmp(signature, kHeaderSignature) != 0)
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Wrong signature, this is not an encrypted file");
UInt16 version;
readPODBinary(version, in);
if (version != kHeaderCurrentVersion)
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Version {} of the header is not supported", version);
UInt16 algorithm_u16;
readPODBinary(algorithm_u16, in);
algorithm = static_cast<Algorithm>(algorithm_u16);
readPODBinary(key_id, in);
readPODBinary(key_hash, in);
init_vector.read(in);
constexpr size_t reserved_size = kSize - header_signature_size - sizeof(version) - sizeof(algorithm_u16) - sizeof(key_id) - sizeof(key_hash) - InitVector::kSize;
static_assert(reserved_size < kSize);
in.ignore(reserved_size);
}
void Header::write(WriteBuffer & out) const
{
constexpr size_t header_signature_size = std::size(kHeaderSignature) - 1;
out.write(kHeaderSignature, header_signature_size);
UInt16 version = kHeaderCurrentVersion;
writePODBinary(version, out);
UInt16 algorithm_u16 = static_cast<UInt16>(algorithm);
writePODBinary(algorithm_u16, out);
writePODBinary(key_id, out);
writePODBinary(key_hash, out);
init_vector.write(out);
constexpr size_t reserved_size = kSize - header_signature_size - sizeof(version) - sizeof(algorithm_u16) - sizeof(key_id) - sizeof(key_hash) - InitVector::kSize;
static_assert(reserved_size < kSize);
char reserved_zero_bytes[reserved_size] = {};
out.write(reserved_zero_bytes, reserved_size);
}
UInt8 calculateKeyHash(const String & key)
{
return static_cast<UInt8>(sipHash64(key.data(), key.size())) & 0x0F;
}
}

View File

@ -69,6 +69,7 @@ private:
UInt128 counter = 0;
};
/// Encrypts or decrypts data.
class Encryptor
{
@ -101,6 +102,31 @@ private:
size_t offset = 0;
};
/// File header which is stored at the beginning of encrypted files.
struct Header
{
Algorithm algorithm = Algorithm::AES_128_CTR;
/// Identifier of the key to encrypt or decrypt this file.
UInt64 key_id = 0;
/// Hash of the key to encrypt or decrypt this file.
UInt8 key_hash = 0;
InitVector init_vector;
/// The size of this header in bytes, including reserved bytes.
static constexpr const size_t kSize = 64;
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;
};
/// Calculates the hash of a passed key.
/// 1 byte is enough because this hash is used only for the first check.
UInt8 calculateKeyHash(const String & key);
}
}

View File

@ -9,20 +9,19 @@ namespace ErrorCodes
extern const int ARGUMENT_OUT_OF_BOUND;
}
using InitVector = FileEncryption::InitVector;
ReadBufferFromEncryptedFile::ReadBufferFromEncryptedFile(
size_t buffer_size_,
std::unique_ptr<ReadBufferFromFileBase> in_,
FileEncryption::Algorithm encryption_algorithm_,
const String & key_,
const InitVector & init_vector_)
const FileEncryption::Header & header_,
size_t offset_)
: ReadBufferFromFileBase(buffer_size_, nullptr, 0)
, in(std::move(in_))
, encrypted_buffer(buffer_size_)
, encryptor(encryption_algorithm_, key_, init_vector_)
, encryptor(header_.algorithm, key_, header_.init_vector)
{
/// We should start reading from `in` at the offset == InitVector::kSize.
offset = offset_;
encryptor.setOffset(offset_);
need_seek = true;
}
@ -76,7 +75,7 @@ bool ReadBufferFromEncryptedFile::nextImpl()
{
if (need_seek)
{
off_t raw_offset = offset + InitVector::kSize;
off_t raw_offset = offset + FileEncryption::Header::kSize;
if (in->seek(raw_offset, SEEK_SET) != raw_offset)
return false;
need_seek = false;

View File

@ -19,9 +19,9 @@ public:
ReadBufferFromEncryptedFile(
size_t buffer_size_,
std::unique_ptr<ReadBufferFromFileBase> in_,
FileEncryption::Algorithm encryption_algorithm_,
const String & key_,
const FileEncryption::InitVector & init_vector_);
const FileEncryption::Header & header_,
size_t offset_ = 0);
off_t seek(off_t off, int whence) override;
off_t getPosition() override;

View File

@ -6,20 +6,17 @@
namespace DB
{
using InitVector = FileEncryption::InitVector;
WriteBufferFromEncryptedFile::WriteBufferFromEncryptedFile(
size_t buffer_size_,
std::unique_ptr<WriteBufferFromFileBase> out_,
FileEncryption::Algorithm encryption_algorithm_,
const String & key_,
const InitVector & init_vector_,
const FileEncryption::Header & header_,
size_t old_file_size)
: WriteBufferFromFileBase(buffer_size_, nullptr, 0)
, out(std::move(out_))
, iv(init_vector_)
, flush_iv(!old_file_size)
, encryptor(encryption_algorithm_, key_, init_vector_)
, header(header_)
, flush_header(!old_file_size)
, encryptor(header.algorithm, key_, header.init_vector)
{
encryptor.setOffset(old_file_size);
}
@ -76,10 +73,10 @@ void WriteBufferFromEncryptedFile::nextImpl()
if (!offset())
return;
if (flush_iv)
if (flush_header)
{
iv.write(*out);
flush_iv = false;
header.write(*out);
flush_header = false;
}
encryptor.encrypt(working_buffer.begin(), offset(), *out);

View File

@ -20,9 +20,8 @@ public:
WriteBufferFromEncryptedFile(
size_t buffer_size_,
std::unique_ptr<WriteBufferFromFileBase> out_,
FileEncryption::Algorithm encryption_algorithm_,
const String & key_,
const FileEncryption::InitVector & init_vector_,
const FileEncryption::Header & header_,
size_t old_file_size = 0);
~WriteBufferFromEncryptedFile() override;
@ -40,8 +39,8 @@ private:
bool finished = false;
std::unique_ptr<WriteBufferFromFileBase> out;
FileEncryption::InitVector iv;
bool flush_iv = false;
FileEncryption::Header header;
bool flush_header = false;
FileEncryption::Encryptor encryptor;
};

View File

@ -43,6 +43,7 @@ SRCS(
MySQLPacketPayloadReadBuffer.cpp
MySQLPacketPayloadWriteBuffer.cpp
NullWriteBuffer.cpp
OpenedFile.cpp
PeekableReadBuffer.cpp
Progress.cpp
ReadBufferFromEncryptedFile.cpp

View File

@ -115,23 +115,44 @@ Cluster::Address::Address(
Cluster::Address::Address(
const String & host_port_,
const String & user_,
const String & password_,
UInt16 clickhouse_port,
bool secure_,
Int64 priority_,
UInt32 shard_index_,
UInt32 replica_index_)
: user(user_)
, password(password_)
const String & host_port_,
const String & user_,
const String & password_,
UInt16 clickhouse_port,
bool treat_local_port_as_remote,
bool secure_,
Int64 priority_,
UInt32 shard_index_,
UInt32 replica_index_)
: user(user_), password(password_)
{
auto parsed_host_port = parseAddress(host_port_, clickhouse_port);
bool can_be_local = true;
std::pair<std::string, UInt16> parsed_host_port;
if (!treat_local_port_as_remote)
{
parsed_host_port = parseAddress(host_port_, clickhouse_port);
}
else
{
/// For clickhouse-local (treat_local_port_as_remote) try to read the address without passing a default port
/// If it works we have a full address that includes a port, which means it won't be local
/// since clickhouse-local doesn't listen in any port
/// If it doesn't include a port then use the default one and it could be local (if the address is)
try
{
parsed_host_port = parseAddress(host_port_, 0);
can_be_local = false;
}
catch (...)
{
parsed_host_port = parseAddress(host_port_, clickhouse_port);
}
}
host_name = parsed_host_port.first;
port = parsed_host_port.second;
secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
priority = priority_;
is_local = isLocal(clickhouse_port);
is_local = can_be_local && isLocal(clickhouse_port);
shard_index = shard_index_;
replica_index = replica_index_;
}
@ -329,7 +350,7 @@ Clusters::Impl Clusters::getContainer() const
Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
const Settings & settings,
const String & config_prefix_,
const String & cluster_name)
const String & cluster_name) : name(cluster_name)
{
auto config_prefix = config_prefix_ + "." + cluster_name;
@ -366,7 +387,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
if (address.is_local)
info.local_addresses.push_back(address);
ConnectionPoolPtr pool = std::make_shared<ConnectionPool>(
auto pool = ConnectionPoolFactory::instance().get(
settings.distributed_connections_pool_size,
address.host_name, address.port,
address.default_database, address.user, address.password,
@ -439,7 +460,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
for (const auto & replica : replica_addresses)
{
auto replica_pool = std::make_shared<ConnectionPool>(
auto replica_pool = ConnectionPoolFactory::instance().get(
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
@ -482,9 +503,16 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
}
Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String>> & names,
const String & username, const String & password, UInt16 clickhouse_port, bool treat_local_as_remote,
bool secure, Int64 priority)
Cluster::Cluster(
const Settings & settings,
const std::vector<std::vector<String>> & names,
const String & username,
const String & password,
UInt16 clickhouse_port,
bool treat_local_as_remote,
bool treat_local_port_as_remote,
bool secure,
Int64 priority)
{
UInt32 current_shard_num = 1;
@ -492,7 +520,16 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
{
Addresses current;
for (const auto & replica : shard)
current.emplace_back(replica, username, password, clickhouse_port, secure, priority, current_shard_num, current.size() + 1);
current.emplace_back(
replica,
username,
password,
clickhouse_port,
treat_local_port_as_remote,
secure,
priority,
current_shard_num,
current.size() + 1);
addresses_with_failover.emplace_back(current);
@ -502,7 +539,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
for (const auto & replica : current)
{
auto replica_pool = std::make_shared<ConnectionPool>(
auto replica_pool = ConnectionPoolFactory::instance().get(
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
@ -606,7 +643,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
if (address.is_local)
info.local_addresses.push_back(address);
ConnectionPoolPtr pool = std::make_shared<ConnectionPool>(
auto pool = ConnectionPoolFactory::instance().get(
settings.distributed_connections_pool_size,
address.host_name,
address.port,

View File

@ -39,14 +39,21 @@ public:
/// Construct a cluster by the names of shards and replicas.
/// Local are treated as well as remote ones if treat_local_as_remote is true.
/// Local are also treated as remote if treat_local_port_as_remote is set and the local address includes a port
/// 'clickhouse_port' - port that this server instance listen for queries.
/// This parameter is needed only to check that some address is local (points to ourself).
///
/// Used for remote() function.
Cluster(const Settings & settings, const std::vector<std::vector<String>> & names,
const String & username, const String & password,
UInt16 clickhouse_port, bool treat_local_as_remote,
bool secure = false, Int64 priority = 1);
Cluster(
const Settings & settings,
const std::vector<std::vector<String>> & names,
const String & username,
const String & password,
UInt16 clickhouse_port,
bool treat_local_as_remote,
bool treat_local_port_as_remote,
bool secure = false,
Int64 priority = 1);
Cluster(const Cluster &)= delete;
Cluster & operator=(const Cluster &) = delete;
@ -115,6 +122,7 @@ public:
const String & user_,
const String & password_,
UInt16 clickhouse_port,
bool treat_local_port_as_remote,
bool secure_ = false,
Int64 priority_ = 1,
UInt32 shard_index_ = 0,
@ -265,6 +273,8 @@ private:
size_t remote_shard_count = 0;
size_t local_shard_count = 0;
String name;
};
using ClusterPtr = std::shared_ptr<Cluster>;

View File

@ -15,7 +15,8 @@
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/IAST.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Common/typeid_cast.h>
namespace DB
@ -150,14 +151,13 @@ public:
auto external_table = external_storage_holder->getTable();
auto table_out = external_table->write({}, external_table->getInMemoryMetadataPtr(), getContext());
auto io = interpreter->execute();
PullingPipelineExecutor executor(io.pipeline);
table_out->writePrefix();
Block block;
while (executor.pull(block))
table_out->write(block);
table_out->writeSuffix();
io.pipeline.resize(1);
io.pipeline.setSinks([&](const Block &, Pipe::StreamType) -> ProcessorPtr
{
return table_out;
});
auto executor = io.pipeline.execute();
executor->execute(io.pipeline.getNumStreams());
}
else
{

View File

@ -8,6 +8,7 @@
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
@ -271,7 +272,7 @@ BlockIO InterpreterInsertQuery::execute()
/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
/// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
if (table->noPushingToViews() && !no_destination)
out = table->write(query_ptr, metadata_snapshot, getContext());
out = std::make_shared<PushingToSinkBlockOutputStream>(table->write(query_ptr, metadata_snapshot, getContext()));
else
out = std::make_shared<PushingToViewsBlockOutputStream>(table, metadata_snapshot, getContext(), query_ptr, no_destination);

View File

@ -14,7 +14,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/EmptySink.h>
#include <filesystem>

View File

@ -391,6 +391,9 @@ void PipelineExecutor::finish()
void PipelineExecutor::execute(size_t num_threads)
{
if (num_threads < 1)
num_threads = 1;
try
{
executeImpl(num_threads);

File diff suppressed because it is too large Load Diff

View File

@ -1,24 +1,19 @@
#pragma once
#include "config_formats.h"
#if USE_ARROW || USE_ORC || USE_PARQUET
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeString.h>
#include <arrow/type.h>
#include <Columns/ColumnVector.h>
#include <arrow/table.h>
#include <arrow/array.h>
#include <arrow/buffer.h>
#include <Processors/Chunk.h>
#include <Core/Block.h>
namespace DB
{
class Block;
class Chunk;
class ArrowColumnToCHColumn
{
public:
@ -27,37 +22,16 @@ public:
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
private:
#define FOR_ARROW_NUMERIC_TYPES(M) \
M(arrow::Type::UINT8, DB::UInt8) \
M(arrow::Type::INT8, DB::Int8) \
M(arrow::Type::UINT16, DB::UInt16) \
M(arrow::Type::INT16, DB::Int16) \
M(arrow::Type::UINT32, DB::UInt32) \
M(arrow::Type::INT32, DB::Int32) \
M(arrow::Type::UINT64, DB::UInt64) \
M(arrow::Type::INT64, DB::Int64) \
M(arrow::Type::HALF_FLOAT, DB::Float32) \
M(arrow::Type::FLOAT, DB::Float32) \
M(arrow::Type::DOUBLE, DB::Float64)
#define FOR_ARROW_INDEXES_TYPES(M) \
M(arrow::Type::UINT8, DB::UInt8) \
M(arrow::Type::INT8, DB::UInt8) \
M(arrow::Type::UINT16, DB::UInt16) \
M(arrow::Type::INT16, DB::UInt16) \
M(arrow::Type::UINT32, DB::UInt32) \
M(arrow::Type::INT32, DB::UInt32) \
M(arrow::Type::UINT64, DB::UInt64) \
M(arrow::Type::INT64, DB::UInt64)
const Block & header;
std::unordered_map<std::string, DataTypePtr> name_to_internal_type;
const std::string format_name;
/// Map {column name : dictionary column}.
/// To avoid converting dictionary from Arrow Dictionary
/// to LowCardinality every chunk we save it and reuse.
std::unordered_map<std::string, ColumnPtr> dictionary_values;
};
}
#endif

View File

@ -23,6 +23,30 @@
#include <arrow/type.h>
#include <arrow/util/decimal.h>
#define FOR_INTERNAL_NUMERIC_TYPES(M) \
M(UInt8, arrow::UInt8Builder) \
M(Int8, arrow::Int8Builder) \
M(UInt16, arrow::UInt16Builder) \
M(Int16, arrow::Int16Builder) \
M(UInt32, arrow::UInt32Builder) \
M(Int32, arrow::Int32Builder) \
M(UInt64, arrow::UInt64Builder) \
M(Int64, arrow::Int64Builder) \
M(Float32, arrow::FloatBuilder) \
M(Float64, arrow::DoubleBuilder)
#define FOR_ARROW_TYPES(M) \
M(UINT8, arrow::UInt8Type) \
M(INT8, arrow::Int8Type) \
M(UINT16, arrow::UInt16Type) \
M(INT16, arrow::Int16Type) \
M(UINT32, arrow::UInt32Type) \
M(INT32, arrow::Int32Type) \
M(UINT64, arrow::UInt64Type) \
M(INT64, arrow::Int64Type) \
M(FLOAT, arrow::FloatType) \
M(DOUBLE, arrow::DoubleType) \
M(STRING, arrow::StringType)
namespace DB
{
@ -46,11 +70,8 @@ namespace DB
{"Float32", arrow::float32()},
{"Float64", arrow::float64()},
//{"Date", arrow::date64()},
//{"Date", arrow::date32()},
{"Date", arrow::uint16()}, // CHECK
//{"DateTime", arrow::date64()}, // BUG! saves as date32
{"DateTime", arrow::uint32()},
{"Date", arrow::uint16()}, /// uint16 is used instead of date32, because Apache Arrow cannot correctly serialize Date32Array.
{"DateTime", arrow::uint32()}, /// uint32 is used instead of date64, because we don't need milliseconds.
{"String", arrow::binary()},
{"FixedString", arrow::binary()},
@ -265,11 +286,11 @@ namespace DB
auto value_type = assert_cast<arrow::DictionaryType *>(array_builder->type().get())->value_type();
#define DISPATCH(ARROW_TYPE_ID, ARROW_TYPE) \
if (arrow::Type::ARROW_TYPE_ID == value_type->id()) \
{ \
fillArrowArrayWithLowCardinalityColumnDataImpl<ARROW_TYPE>(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end, dictionary_values); \
return; \
}
if (arrow::Type::ARROW_TYPE_ID == value_type->id()) \
{ \
fillArrowArrayWithLowCardinalityColumnDataImpl<ARROW_TYPE>(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end, dictionary_values); \
return; \
}
FOR_ARROW_TYPES(DISPATCH)
#undef DISPATCH
@ -337,7 +358,6 @@ namespace DB
size_t end)
{
const auto & internal_data = assert_cast<const ColumnVector<UInt32> &>(*write_column).getData();
//arrow::Date64Builder builder;
arrow::UInt32Builder & builder = assert_cast<arrow::UInt32Builder &>(*array_builder);
arrow::Status status;
@ -346,8 +366,6 @@ namespace DB
if (null_bytemap && (*null_bytemap)[value_i])
status = builder.AppendNull();
else
/// Implicitly converts UInt16 to Int32
//status = date_builder.Append(static_cast<int64_t>(internal_data[value_i]) * 1000); // now ms. TODO check other units
status = builder.Append(internal_data[value_i]);
checkStatus(status, write_column->getName(), format_name);
@ -367,7 +385,7 @@ namespace DB
{
const String column_type_name = column_type->getFamilyName();
if ("Nullable" == column_type_name)
if (column_type->isNullable())
{
const ColumnNullable * column_nullable = assert_cast<const ColumnNullable *>(column.get());
ColumnPtr nested_column = column_nullable->getNestedColumnPtr();
@ -376,35 +394,35 @@ namespace DB
const PaddedPODArray<UInt8> & bytemap = assert_cast<const ColumnVector<UInt8> &>(*null_column).getData();
fillArrowArray(column_name, nested_column, nested_type, &bytemap, array_builder, format_name, start, end, dictionary_values);
}
else if ("String" == column_type_name)
else if (isString(column_type))
{
fillArrowArrayWithStringColumnData<ColumnString>(column, null_bytemap, format_name, array_builder, start, end);
}
else if ("FixedString" == column_type_name)
else if (isFixedString(column_type))
{
fillArrowArrayWithStringColumnData<ColumnFixedString>(column, null_bytemap, format_name, array_builder, start, end);
}
else if ("Date" == column_type_name)
else if (isDate(column_type))
{
fillArrowArrayWithDateColumnData(column, null_bytemap, format_name, array_builder, start, end);
}
else if ("DateTime" == column_type_name)
else if (isDateTime(column_type))
{
fillArrowArrayWithDateTimeColumnData(column, null_bytemap, format_name, array_builder, start, end);
}
else if ("Array" == column_type_name)
else if (isArray(column_type))
{
fillArrowArrayWithArrayColumnData<arrow::ListBuilder>(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end, dictionary_values);
}
else if ("Tuple" == column_type_name)
else if (isTuple(column_type))
{
fillArrowArrayWithTupleColumnData(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end, dictionary_values);
}
else if ("LowCardinality" == column_type_name)
else if (column_type->getTypeId() == TypeIndex::LowCardinality)
{
fillArrowArrayWithLowCardinalityColumnData(column_name, column, column_type, null_bytemap, array_builder, format_name, start, end, dictionary_values);
}
else if ("Map" == column_type_name)
else if (isMap(column_type))
{
ColumnPtr column_array = assert_cast<const ColumnMap *>(column.get())->getNestedColumnPtr();
DataTypePtr array_type = assert_cast<const DataTypeMap *>(column_type.get())->getNestedType();
@ -437,10 +455,10 @@ namespace DB
throw Exception{ErrorCodes::LOGICAL_ERROR, "Cannot fill arrow array with decimal data with type {}", column_type_name};
}
#define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \
else if (#CPP_NUMERIC_TYPE == column_type_name) \
{ \
fillArrowArrayWithNumericColumnData<CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE>(column, null_bytemap, format_name, array_builder, start, end); \
}
else if (#CPP_NUMERIC_TYPE == column_type_name) \
{ \
fillArrowArrayWithNumericColumnData<CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE>(column, null_bytemap, format_name, array_builder, start, end); \
}
FOR_INTERNAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
@ -448,7 +466,7 @@ namespace DB
{
throw Exception
{
fmt::format(R"(Internal type "{}" of a column "{}" is not supported for conversion into a {} data format.)", column_type_name, column_name, format_name),
fmt::format("Internal type '{}' of a column '{}' is not supported for conversion into {} data format.", column_type_name, column_name, format_name),
ErrorCodes::UNKNOWN_TYPE
};
}
@ -502,14 +520,15 @@ namespace DB
}
}
static std::shared_ptr<arrow::DataType> getArrowType(DataTypePtr column_type, ColumnPtr column, const std::string & column_name, const std::string & format_name, bool * is_column_nullable)
static std::shared_ptr<arrow::DataType> getArrowType(
DataTypePtr column_type, ColumnPtr column, const std::string & column_name, const std::string & format_name, bool * out_is_column_nullable)
{
if (column_type->isNullable())
{
DataTypePtr nested_type = assert_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
ColumnPtr nested_column = assert_cast<const ColumnNullable *>(column.get())->getNestedColumnPtr();
auto arrow_type = getArrowType(nested_type, nested_column, column_name, format_name, is_column_nullable);
*is_column_nullable = true;
auto arrow_type = getArrowType(nested_type, nested_column, column_name, format_name, out_is_column_nullable);
*out_is_column_nullable = true;
return arrow_type;
}
@ -542,7 +561,7 @@ namespace DB
{
auto nested_type = assert_cast<const DataTypeArray *>(column_type.get())->getNestedType();
auto nested_column = assert_cast<const ColumnArray *>(column.get())->getDataPtr();
auto nested_arrow_type = getArrowType(nested_type, nested_column, column_name, format_name, is_column_nullable);
auto nested_arrow_type = getArrowType(nested_type, nested_column, column_name, format_name, out_is_column_nullable);
return arrow::list(nested_arrow_type);
}
@ -554,8 +573,8 @@ namespace DB
for (size_t i = 0; i != nested_types.size(); ++i)
{
String name = column_name + "." + std::to_string(i);
auto nested_arrow_type = getArrowType(nested_types[i], tuple_column->getColumnPtr(i), name, format_name, is_column_nullable);
nested_fields.push_back(std::make_shared<arrow::Field>(name, nested_arrow_type, *is_column_nullable));
auto nested_arrow_type = getArrowType(nested_types[i], tuple_column->getColumnPtr(i), name, format_name, out_is_column_nullable);
nested_fields.push_back(std::make_shared<arrow::Field>(name, nested_arrow_type, *out_is_column_nullable));
}
return arrow::struct_(std::move(nested_fields));
}
@ -568,7 +587,7 @@ namespace DB
const auto & indexes_column = lc_column->getIndexesPtr();
return arrow::dictionary(
getArrowTypeForLowCardinalityIndexes(indexes_column),
getArrowType(nested_type, nested_column, column_name, format_name, is_column_nullable));
getArrowType(nested_type, nested_column, column_name, format_name, out_is_column_nullable));
}
if (isMap(column_type))
@ -579,9 +598,8 @@ namespace DB
const auto & columns = assert_cast<const ColumnMap *>(column.get())->getNestedData().getColumns();
return arrow::map(
getArrowType(key_type, columns[0], column_name, format_name, is_column_nullable),
getArrowType(val_type, columns[1], column_name, format_name, is_column_nullable)
);
getArrowType(key_type, columns[0], column_name, format_name, out_is_column_nullable),
getArrowType(val_type, columns[1], column_name, format_name, out_is_column_nullable));
}
const std::string type_name = column_type->getFamilyName();
@ -594,8 +612,9 @@ namespace DB
return arrow_type_it->second;
}
throw Exception{fmt::format(R"(The type "{}" of a column "{}" is not supported for conversion into a {} data format.)", column_type->getName(), column_name, format_name),
ErrorCodes::UNKNOWN_TYPE};
throw Exception(ErrorCodes::UNKNOWN_TYPE,
"The type '{}' of a column '{}' is not supported for conversion into {} data format.",
column_type->getName(), column_name, format_name);
}
CHColumnToArrowColumn::CHColumnToArrowColumn(const Block & header, const std::string & format_name_, bool low_cardinality_as_dictionary_)
@ -638,7 +657,8 @@ namespace DB
arrow::Status status = MakeBuilder(pool, arrow_fields[column_i]->type(), &array_builder);
checkStatus(status, column->getName(), format_name);
fillArrowArray(header_column.name, column, header_column.type, nullptr, array_builder.get(), format_name, 0, column->size(), dictionary_values);
fillArrowArray(
header_column.name, column, header_column.type, nullptr, array_builder.get(), format_name, 0, column->size(), dictionary_values);
std::shared_ptr<arrow::Array> arrow_array;
status = array_builder->Finish(&arrow_array);

View File

@ -7,42 +7,18 @@
#include <Processors/Chunk.h>
#include <arrow/table.h>
namespace DB
{
class CHColumnToArrowColumn
{
public:
CHColumnToArrowColumn(const Block & header, const std::string & format_name_, bool low_cardinality_as_dictionary_ = false);
CHColumnToArrowColumn(const Block & header, const std::string & format_name_, bool low_cardinality_as_dictionary_);
void chChunkToArrowTable(std::shared_ptr<arrow::Table> & res, const Chunk & chunk, size_t columns_num);
private:
#define FOR_INTERNAL_NUMERIC_TYPES(M) \
M(UInt8, arrow::UInt8Builder) \
M(Int8, arrow::Int8Builder) \
M(UInt16, arrow::UInt16Builder) \
M(Int16, arrow::Int16Builder) \
M(UInt32, arrow::UInt32Builder) \
M(Int32, arrow::Int32Builder) \
M(UInt64, arrow::UInt64Builder) \
M(Int64, arrow::Int64Builder) \
M(Float32, arrow::FloatBuilder) \
M(Float64, arrow::DoubleBuilder)
#define FOR_ARROW_TYPES(M) \
M(UINT8, arrow::UInt8Type) \
M(INT8, arrow::Int8Type) \
M(UINT16, arrow::UInt16Type) \
M(INT16, arrow::Int16Type) \
M(UINT32, arrow::UInt32Type) \
M(INT32, arrow::Int32Type) \
M(UINT64, arrow::UInt64Type) \
M(INT64, arrow::Int64Type) \
M(FLOAT, arrow::FloatType) \
M(DOUBLE, arrow::DoubleType) \
M(STRING, arrow::StringType)
ColumnsWithTypeAndName header_columns;
std::vector<std::shared_ptr<arrow::Field>> arrow_fields;
const std::string format_name;
@ -52,5 +28,7 @@ private:
/// Dictionary every chunk we save it and reuse.
std::unordered_map<std::string, std::shared_ptr<arrow::Array>> dictionary_values;
};
}
#endif

View File

@ -13,11 +13,15 @@ namespace DB
addChunk(Chunk{}, ProcessingUnitType::FINALIZE, /*can_throw_exception*/ false);
collector_finished.wait();
if (collector_thread.joinable())
collector_thread.join();
{
std::lock_guard<std::mutex> lock(collector_thread_mutex);
if (collector_thread.joinable())
collector_thread.join();
}
{
std::unique_lock<std::mutex> lock(mutex);
if (background_exception)
std::rethrow_exception(background_exception);
}
@ -66,8 +70,11 @@ namespace DB
writer_condvar.notify_all();
}
if (collector_thread.joinable())
collector_thread.join();
{
std::lock_guard<std::mutex> lock(collector_thread_mutex);
if (collector_thread.joinable())
collector_thread.join();
}
try
{

View File

@ -172,6 +172,7 @@ private:
ThreadPool pool;
// Collecting all memory to original ReadBuffer
ThreadFromGlobalPool collector_thread;
std::mutex collector_thread_mutex;
std::exception_ptr background_exception = nullptr;

View File

@ -2,16 +2,7 @@
#if USE_PARQUET
// TODO: clean includes
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Common/assert_cast.h>
#include <Core/callOnTypeIndex.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteHelpers.h>
#include <arrow/api.h>
#include <arrow/util/memory.h>
#include <parquet/arrow/writer.h>
#include "ArrowBufferedStreams.h"
#include "CHColumnToArrowColumn.h"
@ -19,6 +10,7 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_EXCEPTION;
@ -37,7 +29,7 @@ void ParquetBlockOutputFormat::consume(Chunk chunk)
if (!ch_column_to_arrow_column)
{
const Block & header = getPort(PortKind::Main).getHeader();
ch_column_to_arrow_column = std::make_unique<CHColumnToArrowColumn>(header, "Parquet");
ch_column_to_arrow_column = std::make_unique<CHColumnToArrowColumn>(header, "Parquet", false);
}
ch_column_to_arrow_column->chChunkToArrowTable(arrow_table, chunk, columns_num);
@ -91,11 +83,7 @@ void registerOutputFormatProcessorParquet(FormatFactory & factory)
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
auto impl = std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
/// TODO
// auto res = std::make_shared<SquashingBlockOutputStream>(impl, impl->getHeader(), format_settings.parquet.row_group_size, 0);
// res->disableFlush();
return impl;
return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
});
}

View File

@ -11,12 +11,17 @@ ISink::ISink(Block header)
ISink::Status ISink::prepare()
{
if (!was_on_start_called)
return Status::Ready;
if (has_input)
return Status::Ready;
if (input.isFinished())
{
onFinish();
if (!was_on_finish_called)
return Status::Ready;
return Status::Finished;
}
@ -31,9 +36,21 @@ ISink::Status ISink::prepare()
void ISink::work()
{
consume(std::move(current_chunk));
has_input = false;
if (!was_on_start_called)
{
was_on_start_called = true;
onStart();
}
else if (has_input)
{
has_input = false;
consume(std::move(current_chunk));
}
else if (!was_on_finish_called)
{
was_on_finish_called = true;
onFinish();
}
}
}

View File

@ -12,9 +12,11 @@ protected:
InputPort & input;
Chunk current_chunk;
bool has_input = false;
bool was_on_start_called = false;
bool was_on_finish_called = false;
virtual void consume(Chunk block) = 0;
virtual void onStart() {}
virtual void onFinish() {}
public:

View File

@ -2,6 +2,8 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <common/DateLUTImpl.h>
#include <common/DateLUT.h>
#include <DataTypes/DataTypeDateTime.h>
namespace DB
{
@ -16,6 +18,8 @@ static GraphiteRollupSortedAlgorithm::ColumnsDefinition defineColumns(
def.value_column_num = header.getPositionByName(params.value_column_name);
def.version_column_num = header.getPositionByName(params.version_column_name);
def.time_column_type = header.getByPosition(def.time_column_num).type;
size_t num_columns = header.columns();
for (size_t i = 0; i < num_columns; ++i)
if (i != def.time_column_num && i != def.value_column_num && i != def.version_column_num)
@ -122,8 +126,8 @@ UInt32 GraphiteRollupSortedAlgorithm::selectPrecision(const Graphite::Retentions
* In this case, the date should not change. The date is calculated using the local time zone.
*
* If the rounding value is less than an hour,
* then, assuming that time zones that differ from UTC by a non-integer number of hours are not supported,
* just simply round the unix timestamp down to a multiple of 3600.
* then, assuming that time zones that differ from UTC by a multiple of 15-minute intervals
* (that is true for all modern timezones but not true for historical timezones).
* And if the rounding value is greater,
* then we will round down the number of seconds from the beginning of the day in the local time zone.
*
@ -131,7 +135,7 @@ UInt32 GraphiteRollupSortedAlgorithm::selectPrecision(const Graphite::Retentions
*/
static time_t roundTimeToPrecision(const DateLUTImpl & date_lut, time_t time, UInt32 precision)
{
if (precision <= 3600)
if (precision <= 900)
{
return time / precision * precision;
}
@ -145,7 +149,10 @@ static time_t roundTimeToPrecision(const DateLUTImpl & date_lut, time_t time, UI
IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
{
const DateLUTImpl & date_lut = DateLUT::instance();
/// Timestamp column can be DateTime or UInt32. If it is DateTime, we can use its timezone for calculations.
const TimezoneMixin * timezone = dynamic_cast<const TimezoneMixin *>(columns_definition.time_column_type.get());
const DateLUTImpl & date_lut = timezone ? timezone->getTimeZone() : DateLUT::instance();
/// Take rows in needed order and put them into `merged_data` until we get `max_block_size` rows.
///

View File

@ -35,6 +35,8 @@ public:
size_t value_column_num;
size_t version_column_num;
DataTypePtr time_column_type;
/// All columns other than 'time', 'value', 'version'. They are unmodified during rollup.
ColumnNumbers unmodified_column_numbers;
};

View File

@ -4,7 +4,8 @@
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/LimitTransform.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Transforms/ExtremesTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/NullSource.h>

View File

@ -15,7 +15,7 @@
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/VirtualColumnUtils.h>
@ -157,7 +157,7 @@ Pipe ReadFromMergeTree::readFromPool(
for (size_t i = 0; i < max_streams; ++i)
{
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
auto source = std::make_shared<MergeTreeThreadSelectProcessor>(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
@ -662,7 +662,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part and if num_streams > 1 we
/// can use parallel select on such parts. We save such parts in one vector and then use
/// MergeTreeReadPool and MergeTreeThreadSelectBlockInputProcessor for parallel select.
/// MergeTreeReadPool and MergeTreeThreadSelectProcessor for parallel select.
if (num_streams > 1 && settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0)

View File

@ -0,0 +1,18 @@
#pragma once
#include <Processors/ISink.h>
namespace DB
{
/// Sink which reads everything and do nothing with it.
class EmptySink : public ISink
{
public:
explicit EmptySink(Block header) : ISink(std::move(header)) {}
String getName() const override { return "EmptySink"; }
protected:
void consume(Chunk) override {}
};
}

View File

@ -1,5 +1,4 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/ISink.h>
namespace DB
@ -21,15 +20,4 @@ protected:
void consume(Chunk) override {}
};
/// Sink which reads everything and do nothing with it.
class EmptySink : public ISink
{
public:
explicit EmptySink(Block header) : ISink(std::move(header)) {}
String getName() const override { return "EmptySink"; }
protected:
void consume(Chunk) override {}
};
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Processors/ISink.h>
#include <Storages/TableLockHolder.h>
namespace DB
{
/// Sink which is returned from Storage::read.
/// The same as ISink, but also can hold table lock.
class SinkToStorage : public ISink
{
public:
using ISink::ISink;
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
private:
std::vector<TableLockHolder> table_locks;
};
using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;
class NullSinkToStorage : public SinkToStorage
{
public:
using SinkToStorage::SinkToStorage;
std::string getName() const override { return "NullSinkToStorage"; }
void consume(Chunk) override {}
};
}

View File

@ -1,6 +1,6 @@
#include <Processors/Sources/DelayedSource.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/ResizeProcessor.h>
namespace DB

View File

@ -1,4 +1,5 @@
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <DataStreams/IBlockOutputStream.h>
@ -49,7 +50,7 @@ void CreatingSetsTransform::startSubquery()
LOG_TRACE(log, "Filling temporary table.");
if (subquery.table)
table_out = subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext());
table_out = std::make_shared<PushingToSinkBlockOutputStream>(subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext()));
done_with_set = !subquery.set;
done_with_table = !subquery.table;

View File

@ -1619,6 +1619,74 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction
}
};
struct WindowFunctionNthValue final : public WindowFunction
{
WindowFunctionNthValue(const std::string & name_,
const DataTypes & argument_types_, const Array & parameters_)
: WindowFunction(name_, argument_types_, parameters_)
{
if (!parameters.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function {} cannot be parameterized", name_);
}
if (argument_types.size() != 2)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function '{}' accepts 2 arguments, {} given",
name_, argument_types.size());
}
}
DataTypePtr getReturnType() const override
{ return argument_types[0]; }
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
{
const auto & current_block = transform->blockAt(transform->current_row);
IColumn & to = *(current_block.output_columns[function_index]);
const auto & workspace = transform->workspaces[function_index];
int64_t offset = (*current_block.input_columns[
workspace.argument_column_indices[1]])[
transform->current_row.row].get<Int64>() - 1;
if (offset < 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"The offset for function {} must be non-negative, {} given",
getName(), offset);
}
if (offset > INT_MAX)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"The offset for function {} must be less than {}, {} given",
getName(), INT_MAX, offset);
}
const auto [target_row, offset_left] = transform->moveRowNumber(transform->frame_start, offset);
if (offset_left != 0
|| target_row < transform->frame_start
|| transform->frame_end <= target_row)
{
// Offset is outside the frame.
to.insertDefault();
}
else
{
// Offset is inside the frame.
to.insertFrom(*transform->blockAt(target_row).input_columns[
workspace.argument_column_indices[0]],
target_row.row);
}
}
};
void registerWindowFunctions(AggregateFunctionFactory & factory)
{
// Why didn't I implement lag/lead yet? Because they are a mess. I imagine
@ -1682,6 +1750,13 @@ void registerWindowFunctions(AggregateFunctionFactory & factory)
return std::make_shared<WindowFunctionLagLeadInFrame<true>>(
name, argument_types, parameters);
}, properties});
factory.registerFunction("nth_value", {[](const std::string & name,
const DataTypes & argument_types, const Array & parameters, const Settings *)
{
return std::make_shared<WindowFunctionNthValue>(
name, argument_types, parameters);
}, properties});
}
}

View File

@ -1,7 +1,7 @@
#include <gtest/gtest.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/NullSink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Columns/ColumnsNumber.h>

View File

@ -7,6 +7,7 @@
#include <Common/SettingsChanges.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/InternalTextLogsQueue.h>
@ -943,7 +944,7 @@ namespace
{
/// The data will be written directly to the table.
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto out_stream = storage->write(ASTPtr(), metadata_snapshot, query_context);
auto out_stream = std::make_shared<PushingToSinkBlockOutputStream>(storage->write(ASTPtr(), metadata_snapshot, query_context));
ReadBufferFromMemory data(external_table.data().data(), external_table.data().size());
String format = external_table.format();
if (format.empty())

View File

@ -19,6 +19,7 @@
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/TablesStatus.h>
#include <Interpreters/InternalTextLogsQueue.h>
@ -1330,7 +1331,7 @@ bool TCPHandler::receiveData(bool scalar)
}
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
/// The data will be written directly to the table.
auto temporary_table_out = storage->write(ASTPtr(), metadata_snapshot, query_context);
auto temporary_table_out = std::make_shared<PushingToSinkBlockOutputStream>(storage->write(ASTPtr(), metadata_snapshot, query_context));
temporary_table_out->write(block);
temporary_table_out->writeSuffix();

View File

@ -1,4 +1,4 @@
#include <Storages/Distributed/DistributedBlockOutputStream.h>
#include <Storages/Distributed/DistributedSink.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/StorageDistributed.h>
#include <Disks/StoragePolicy.h>
@ -86,7 +86,7 @@ static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & bl
}
DistributedBlockOutputStream::DistributedBlockOutputStream(
DistributedSink::DistributedSink(
ContextPtr context_,
StorageDistributed & storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -95,7 +95,8 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
bool insert_sync_,
UInt64 insert_timeout_,
StorageID main_table_)
: context(Context::createCopy(context_))
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, context(Context::createCopy(context_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, query_ast(query_ast_)
@ -115,24 +116,15 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
}
Block DistributedBlockOutputStream::getHeader() const
void DistributedSink::consume(Chunk chunk)
{
if (!allow_materialized)
return metadata_snapshot->getSampleBlockNonMaterialized();
else
return metadata_snapshot->getSampleBlock();
}
if (is_first_chunk)
{
storage.delayInsertOrThrowIfNeeded();
is_first_chunk = false;
}
void DistributedBlockOutputStream::writePrefix()
{
storage.delayInsertOrThrowIfNeeded();
}
void DistributedBlockOutputStream::write(const Block & block)
{
Block ordinary_block{ block };
auto ordinary_block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
if (!allow_materialized)
{
@ -155,7 +147,7 @@ void DistributedBlockOutputStream::write(const Block & block)
writeAsync(ordinary_block);
}
void DistributedBlockOutputStream::writeAsync(const Block & block)
void DistributedSink::writeAsync(const Block & block)
{
if (random_shard_insert)
{
@ -174,7 +166,7 @@ void DistributedBlockOutputStream::writeAsync(const Block & block)
}
std::string DistributedBlockOutputStream::getCurrentStateDescription()
std::string DistributedSink::getCurrentStateDescription()
{
WriteBufferFromOwnString buffer;
const auto & addresses = cluster->getShardsAddresses();
@ -203,7 +195,7 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription()
}
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, size_t start, size_t end)
void DistributedSink::initWritingJobs(const Block & first_block, size_t start, size_t end)
{
const Settings & settings = context->getSettingsRef();
const auto & addresses_with_failovers = cluster->getShardsAddresses();
@ -249,7 +241,7 @@ void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, si
}
void DistributedBlockOutputStream::waitForJobs()
void DistributedSink::waitForJobs()
{
pool->wait();
@ -279,7 +271,7 @@ void DistributedBlockOutputStream::waitForJobs()
ThreadPool::Job
DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards)
DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, size_t num_shards)
{
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block, num_shards]()
@ -403,7 +395,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
}
void DistributedBlockOutputStream::writeSync(const Block & block)
void DistributedSink::writeSync(const Block & block)
{
const Settings & settings = context->getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
@ -487,7 +479,7 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
}
void DistributedBlockOutputStream::writeSuffix()
void DistributedSink::onFinish()
{
auto log_performance = [this]()
{
@ -537,7 +529,7 @@ void DistributedBlockOutputStream::writeSuffix()
}
IColumn::Selector DistributedBlockOutputStream::createSelector(const Block & source_block) const
IColumn::Selector DistributedSink::createSelector(const Block & source_block) const
{
Block current_block_with_sharding_key_expr = source_block;
storage.getShardingKeyExpr()->execute(current_block_with_sharding_key_expr);
@ -548,7 +540,7 @@ IColumn::Selector DistributedBlockOutputStream::createSelector(const Block & sou
}
Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
Blocks DistributedSink::splitBlock(const Block & block)
{
auto selector = createSelector(block);
@ -572,7 +564,7 @@ Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
}
void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
void DistributedSink::writeSplitAsync(const Block & block)
{
Blocks splitted_blocks = splitBlock(block);
const size_t num_shards = splitted_blocks.size();
@ -585,7 +577,7 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
}
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t shard_id)
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef();
@ -621,7 +613,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t sh
}
void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repeats)
void DistributedSink::writeToLocal(const Block & block, size_t repeats)
{
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
@ -633,7 +625,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repe
}
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();

View File

@ -1,7 +1,7 @@
#pragma once
#include <Parsers/formatAST.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Core/Block.h>
#include <Common/PODArray.h>
@ -34,10 +34,10 @@ class StorageDistributed;
* and the resulting blocks are written in a compressed Native format in separate directories for sending.
* For each destination address (each directory with data to send), a separate thread is created in StorageDistributed,
* which monitors the directory and sends data. */
class DistributedBlockOutputStream : public IBlockOutputStream
class DistributedSink : public SinkToStorage
{
public:
DistributedBlockOutputStream(
DistributedSink(
ContextPtr context_,
StorageDistributed & storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -47,11 +47,9 @@ public:
UInt64 insert_timeout_,
StorageID main_table_);
Block getHeader() const override;
void write(const Block & block) override;
void writePrefix() override;
void writeSuffix() override;
String getName() const override { return "DistributedSink"; }
void consume(Chunk chunk) override;
void onFinish() override;
private:
IColumn::Selector createSelector(const Block & source_block) const;
@ -77,7 +75,7 @@ private:
void initWritingJobs(const Block & first_block, size_t start, size_t end);
struct JobReplica;
ThreadPool::Job runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards);
ThreadPool::Job runWritingJob(JobReplica & job, const Block & current_block, size_t num_shards);
void waitForJobs();
@ -97,6 +95,8 @@ private:
bool random_shard_insert;
bool allow_materialized;
bool is_first_chunk = true;
/// Sync-related stuff
UInt64 insert_timeout; // in seconds
StorageID main_table;

View File

@ -15,7 +15,7 @@
#include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Common/parseGlobs.h>
#include <Poco/URI.h>
@ -172,36 +172,33 @@ private:
Block sample_block;
};
class HDFSBlockOutputStream : public IBlockOutputStream
class HDFSSink : public SinkToStorage
{
public:
HDFSBlockOutputStream(const String & uri,
HDFSSink(const String & uri,
const String & format,
const Block & sample_block_,
const Block & sample_block,
ContextPtr context,
const CompressionMethod compression_method)
: sample_block(sample_block_)
: SinkToStorage(sample_block)
{
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context->getGlobalContext()->getConfigRef()), compression_method, 3);
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context);
}
Block getHeader() const override
String getName() const override { return "HDFSSink"; }
void consume(Chunk chunk) override
{
return sample_block;
if (is_first_chunk)
{
writer->writePrefix();
is_first_chunk = false;
}
writer->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void write(const Block & block) override
{
writer->write(block);
}
void writePrefix() override
{
writer->writePrefix();
}
void writeSuffix() override
void onFinish() override
{
try
{
@ -218,9 +215,9 @@ public:
}
private:
Block sample_block;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
bool is_first_chunk = true;
};
/* Recursive directory listing with matched paths as a result.
@ -314,9 +311,9 @@ Pipe StorageHDFS::read(
return Pipe::unitePipes(std::move(pipes));
}
BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageHDFS::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
return std::make_shared<HDFSBlockOutputStream>(uri,
return std::make_shared<HDFSSink>(uri,
format_name,
metadata_snapshot->getSampleBlock(),
getContext(),

View File

@ -32,7 +32,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void truncate(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_, TableExclusiveLockHolder &) override;

View File

@ -51,6 +51,9 @@ class Pipe;
class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>;
class SinkToStorage;
using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;
class QueryPipeline;
using QueryPipelinePtr = std::unique_ptr<QueryPipeline>;
@ -326,7 +329,7 @@ public:
* changed during lifetime of the returned streams, but the snapshot is
* guaranteed to be immutable.
*/
virtual BlockOutputStreamPtr write(
virtual SinkToStoragePtr write(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*context*/)

View File

@ -6,30 +6,26 @@
namespace DB
{
KafkaBlockOutputStream::KafkaBlockOutputStream(
KafkaSink::KafkaSink(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const ContextPtr & context_)
: storage(storage_)
: SinkToStorage(metadata_snapshot_->getSampleBlockNonMaterialized())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
{
}
Block KafkaBlockOutputStream::getHeader() const
void KafkaSink::onStart()
{
return metadata_snapshot->getSampleBlockNonMaterialized();
}
void KafkaBlockOutputStream::writePrefix()
{
buffer = storage.createWriteBuffer(getHeader());
buffer = storage.createWriteBuffer(getPort().getHeader());
auto format_settings = getFormatSettings(context);
format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer,
getHeader(), context,
getPort().getHeader(), context,
[this](const Columns & columns, size_t row)
{
buffer->countRow(columns, row);
@ -37,20 +33,17 @@ void KafkaBlockOutputStream::writePrefix()
format_settings);
}
void KafkaBlockOutputStream::write(const Block & block)
void KafkaSink::consume(Chunk chunk)
{
child->write(block);
child->write(getPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
void KafkaBlockOutputStream::writeSuffix()
void KafkaSink::onFinish()
{
if (child)
child->writeSuffix();
flush();
}
//flush();
void KafkaBlockOutputStream::flush()
{
if (buffer)
buffer->flush();
}

View File

@ -1,26 +1,25 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/Kafka/StorageKafka.h>
namespace DB
{
class KafkaBlockOutputStream : public IBlockOutputStream
class KafkaSink : public SinkToStorage
{
public:
explicit KafkaBlockOutputStream(
explicit KafkaSink(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const std::shared_ptr<const Context> & context_);
Block getHeader() const override;
void consume(Chunk chunk) override;
void onStart() override;
void onFinish() override;
String getName() const override { return "KafkaSink"; }
void writePrefix() override;
void write(const Block & block) override;
void writeSuffix() override;
void flush() override;
///void flush() override;
private:
StorageKafka & storage;

View File

@ -289,14 +289,14 @@ Pipe StorageKafka::read(
}
BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
auto modified_context = Context::createCopy(local_context);
modified_context->applySettingsChanges(settings_adjustments);
if (topics.size() > 1)
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<KafkaBlockOutputStream>(*this, metadata_snapshot, modified_context);
return std::make_shared<KafkaSink>(*this, metadata_snapshot, modified_context);
}

View File

@ -50,7 +50,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr context) override;

View File

@ -1,6 +1,6 @@
#pragma once
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>

Some files were not shown because too many files have changed in this diff Show More