Merge branch 'master' into persistent_nukeeper_snapshot_storage

This commit is contained in:
alesapin 2021-03-02 14:03:42 +03:00
commit e7399bf66f
81 changed files with 1999 additions and 445 deletions

View File

@ -26,4 +26,4 @@ The name of an additional section can be any, for example, **Usage**.
- [link](#)
[Original article](https://clickhouse.tech/docs/en/data_types/<data-type-name>/) <!--hide-->
[Original article](https://clickhouse.tech/docs/en/data-types/<data-type-name>/) <!--hide-->

View File

@ -38,20 +38,20 @@ SETTINGS
Required parameters:
- `kafka_broker_list` A comma-separated list of brokers (for example, `localhost:9092`).
- `kafka_topic_list` A list of Kafka topics.
- `kafka_group_name` A group of Kafka consumers. Reading margins are tracked for each group separately. If you dont want messages to be duplicated in the cluster, use the same group name everywhere.
- `kafka_format` Message format. Uses the same notation as the SQL `FORMAT` function, such as `JSONEachRow`. For more information, see the [Formats](../../../interfaces/formats.md) section.
- `kafka_broker_list` A comma-separated list of brokers (for example, `localhost:9092`).
- `kafka_topic_list` A list of Kafka topics.
- `kafka_group_name` A group of Kafka consumers. Reading margins are tracked for each group separately. If you dont want messages to be duplicated in the cluster, use the same group name everywhere.
- `kafka_format` Message format. Uses the same notation as the SQL `FORMAT` function, such as `JSONEachRow`. For more information, see the [Formats](../../../interfaces/formats.md) section.
Optional parameters:
- `kafka_row_delimiter` Delimiter character, which ends the message.
- `kafka_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition.
- `kafka_max_block_size` - The maximum batch size (in messages) for poll (default: `max_block_size`).
- `kafka_skip_broken_messages` Kafka message parser tolerance to schema-incompatible messages per block. Default: `0`. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data).
- `kafka_commit_every_batch` - Commit every consumed and handled batch instead of a single commit after writing a whole block (default: `0`).
- `kafka_thread_per_consumer` - Provide independent thread for each consumer (default: `0`). When enabled, every consumer flush the data independently, in parallel (otherwise - rows from several consumers squashed to form one block).
- `kafka_row_delimiter` Delimiter character, which ends the message.
- `kafka_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition.
- `kafka_max_block_size` The maximum batch size (in messages) for poll (default: `max_block_size`).
- `kafka_skip_broken_messages` Kafka message parser tolerance to schema-incompatible messages per block. Default: `0`. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data).
- `kafka_commit_every_batch` Commit every consumed and handled batch instead of a single commit after writing a whole block (default: `0`).
- `kafka_thread_per_consumer` Provide independent thread for each consumer (default: `0`). When enabled, every consumer flush the data independently, in parallel (otherwise rows from several consumers squashed to form one block).
Examples:

View File

@ -1956,8 +1956,8 @@ Default value: 16.
**See Also**
- [Kafka](../../engines/table-engines/integrations/kafka.md#kafka) engine
- [RabbitMQ](../../engines/table-engines/integrations/rabbitmq.md#rabbitmq-engine) engine
- [Kafka](../../engines/table-engines/integrations/kafka.md#kafka) engine.
- [RabbitMQ](../../engines/table-engines/integrations/rabbitmq.md#rabbitmq-engine) engine.
## validate_polygons {#validate_polygons}
@ -2658,8 +2658,6 @@ Result:
Note that this setting influences [Materialized view](../../sql-reference/statements/create/view.md#materialized) and [MaterializeMySQL](../../engines/database-engines/materialize-mysql.md) behaviour.
[Original article](https://clickhouse.tech/docs/en/operations/settings/settings/) <!-- hide -->
## engine_file_empty_if_not_exists {#engine-file-empty_if-not-exists}
Allows to select data from a file engine table without file.
@ -2679,3 +2677,16 @@ Possible values:
- 1 — Enabled.
Default value: `0`.
## allow_experimental_geo_types {#allow-experimental-geo-types}
Allows working with experimental [geo data types](../../sql-reference/data-types/geo.md).
Possible values:
- 0 — Working with geo data types is disabled.
- 1 — Working with geo data types is enabled.
Default value: `0`.
[Original article](https://clickhouse.tech/docs/en/operations/settings/settings/) <!-- hide -->

View File

@ -20,7 +20,7 @@ System tables:
Most of system tables store their data in RAM. A ClickHouse server creates such system tables at the start.
Unlike other system tables, the system log tables [metric_log](../../operations/system-tables/metric_log.md), [query_log](../../operations/system-tables/query_log.md), [query_thread_log](../../operations/system-tables/query_thread_log.md), [trace_log](../../operations/system-tables/trace_log.md), [part_log](../../operations/system-tables/part_log.md), crash_log and [text_log](../../operations/system-tables/text_log.md) are served by [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md) table engine and store their data in a storage filesystem by default. If you remove a table from a filesystem, the ClickHouse server creates the empty one again at the time of the next data writing. If system table schema changed in a new release, then ClickHouse renames the current table and creates a new one.
Unlike other system tables, the system log tables [metric_log](../../operations/system-tables/metric_log.md), [query_log](../../operations/system-tables/query_log.md), [query_thread_log](../../operations/system-tables/query_thread_log.md), [trace_log](../../operations/system-tables/trace_log.md), [part_log](../../operations/system-tables/part_log.md), [crash_log](../../operations/system-tables/crash-log.md) and [text_log](../../operations/system-tables/text_log.md) are served by [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md) table engine and store their data in a filesystem by default. If you remove a table from a filesystem, the ClickHouse server creates the empty one again at the time of the next data writing. If system table schema changed in a new release, then ClickHouse renames the current table and creates a new one.
System log tables can be customized by creating a config file with the same name as the table under `/etc/clickhouse-server/config.d/`, or setting corresponding elements in `/etc/clickhouse-server/config.xml`. Elements can be customized are:
@ -33,7 +33,7 @@ System log tables can be customized by creating a config file with the same name
An example:
```
```xml
<yandex>
<query_log>
<database>system</database>

View File

@ -253,8 +253,8 @@ windowFunnel(window, [mode])(timestamp, cond1, cond2, ..., condN)
**Parameters**
- `window` — Length of the sliding window. The unit of `window` depends on the timestamp itself and varies. Determined using the expression `timestamp of cond2 <= timestamp of cond1 + window`.
- `mode` - It is an optional parameter.
- `window` — Length of the sliding window. The unit of `window` depends on the `timestamp` itself and varies. Determined using the expression `timestamp of cond2 <= timestamp of cond1 + window`.
- `mode` - It is an optional argument.
- `'strict'` - When the `'strict'` is set, the windowFunnel() applies conditions only for the unique values.
**Returned value**

View File

@ -0,0 +1,106 @@
---
toc_priority: 62
toc_title: Geo
---
# Geo Data Types {#geo-data-types}
Clickhouse supports data types for representing geographical objects — locations, lands, etc.
!!! warning "Warning"
Currently geo data types are an experimental feature. To work with them you must set `allow_experimental_geo_types = 1`.
**See Also**
- [Representing simple geographical features](https://en.wikipedia.org/wiki/GeoJSON).
- [allow_experimental_geo_types](../../operations/settings/settings.md#allow-experimental-geo-types) setting.
## Point {#point-data-type}
`Point` is represented by its X and Y coordinates, stored as a [Tuple](tuple.md)([Float64](float.md), [Float64](float.md)).
**Example**
Query:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_point (p Point) ENGINE = Memory();
INSERT INTO geo_point VALUES((10, 10));
SELECT p, toTypeName(p) FROM geo_point;
```
Result:
``` text
┌─p─────┬─toTypeName(p)─┐
│ (10,10) │ Point │
└───────┴───────────────┘
```
## Ring {#ring-data-type}
`Ring` is a simple polygon without holes stored as an array of points: [Array](array.md)([Point](#point-data-type)).
**Example**
Query:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_ring (r Ring) ENGINE = Memory();
INSERT INTO geo_ring VALUES([(0, 0), (10, 0), (10, 10), (0, 10)]);
SELECT r, toTypeName(r) FROM geo_ring;
```
Result:
``` text
┌─r─────────────────────────────┬─toTypeName(r)─┐
│ [(0,0),(10,0),(10,10),(0,10)] │ Ring │
└───────────────────────────────┴───────────────┘
```
## Polygon {#polygon-data-type}
`Polygon` is a polygon with holes stored as an array of rings: [Array](array.md)([Ring](#ring-data-type)). First element of outer array is the outer shape of polygon and all the following elements are holes.
**Example**
This is a polygon with one hole:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_polygon (pg Polygon) ENGINE = Memory();
INSERT INTO geo_polygon VALUES([[(20, 20), (50, 20), (50, 50), (20, 50)], [(30, 30), (50, 50), (50, 30)]]);
SELECT pg, toTypeName(pg) FROM geo_polygon;
```
Result:
``` text
┌─pg────────────────────────────────────────────────────────────┬─toTypeName(pg)─┐
│ [[(20,20),(50,20),(50,50),(20,50)],[(30,30),(50,50),(50,30)]] │ Polygon │
└───────────────────────────────────────────────────────────────┴────────────────┘
```
## MultiPolygon {#multipolygon-data-type}
`MultiPolygon` consists of multiple polygons and is stored as an array of polygons: [Array](array.md)([Polygon](#polygon-data-type)).
**Example**
This multipolygon consists of two separate polygons — the first one without holes, and the second with one hole:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_multipolygon (mpg MultiPolygon) ENGINE = Memory();
INSERT INTO geo_multipolygon VALUES([[[(0, 0), (10, 0), (10, 10), (0, 10)]], [[(20, 20), (50, 20), (50, 50), (20, 50)],[(30, 30), (50, 50), (50, 30)]]]);
SELECT mpg, toTypeName(mpg) FROM geo_multipolygon;
```
Result:
``` text
┌─mpg─────────────────────────────────────────────────────────────────────────────────────────────┬─toTypeName(mpg)─┐
│ [[[(0,0),(10,0),(10,10),(0,10)]],[[(20,20),(50,20),(50,50),(20,50)],[(30,30),(50,50),(50,30)]]] │ MultiPolygon │
└─────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────┘
```
[Original article](https://clickhouse.tech/docs/en/data-types/geo/) <!--hide-->

View File

@ -55,7 +55,7 @@ CREATE TABLE encryption_test
`comment` String,
`secret` String
)
ENGINE = Memory
ENGINE = Memory;
```
Insert some data (please avoid storing the keys/ivs in the database as this undermines the whole concept of encryption), also storing 'hints' is unsafe too and used only for illustrative purposes:
@ -110,7 +110,7 @@ Result:
Compatible with mysql encryption and resulting ciphertext can be decrypted with [AES_DECRYPT](https://dev.mysql.com/doc/refman/8.0/en/encryption-functions.html#function_aes-decrypt) function.
Will produce same ciphertext as `encrypt` on equal inputs. But when `key` or `iv` are longer than they should normally be, `aes_encrypt_mysql` will stick to what MySQL's `aes_encrypt` does: 'fold' `key` and ignore excess bits of `IV`.
Will produce the same ciphertext as `encrypt` on equal inputs. But when `key` or `iv` are longer than they should normally be, `aes_encrypt_mysql` will stick to what MySQL's `aes_encrypt` does: 'fold' `key` and ignore excess bits of `iv`.
Supported encryption modes:
@ -138,7 +138,6 @@ aes_encrypt_mysql('mode', 'plaintext', 'key' [, iv])
- Ciphertext binary string. [String](../../sql-reference/data-types/string.md#string).
**Examples**
Given equal input `encrypt` and `aes_encrypt_mysql` produce the same ciphertext:
@ -157,7 +156,6 @@ Result:
└───────────────────┘
```
But `encrypt` fails when `key` or `iv` is longer than expected:
Query:
@ -252,7 +250,7 @@ decrypt('mode', 'ciphertext', 'key' [, iv, aad])
**Examples**
Re-using table from [encrypt](./encryption-functions.md#encrypt).
Re-using table from [encrypt](#encrypt).
Query:
@ -284,6 +282,7 @@ SELECT comment, decrypt('aes-256-cfb128', secret, '12345678910121314151617181920
```
Result:
``` text
┌─comment─────────────────────────────┬─plaintext─┐
│ aes-256-cfb128 no IV │ Secret │
@ -294,7 +293,7 @@ Result:
└─────────────────────────────────────┴───────────┘
```
Notice how only portion of the data was properly decrypted, and the rest is gibberish since either `mode`, `key`, or `iv` were different upon encryption.
Notice how only a portion of the data was properly decrypted, and the rest is gibberish since either `mode`, `key`, or `iv` were different upon encryption.
## aes_decrypt_mysql {#aes_decrypt_mysql}
@ -331,6 +330,7 @@ aes_decrypt_mysql('mode', 'ciphertext', 'key' [, iv])
**Examples**
Let's decrypt data we've previously encrypted with MySQL:
``` sql
mysql> SET block_encryption_mode='aes-256-cfb128';
Query OK, 0 rows affected (0.00 sec)
@ -345,11 +345,13 @@ mysql> SELECT aes_encrypt('Secret', '123456789101213141516171819202122', 'iviviv
```
Query:
``` sql
SELECT aes_decrypt_mysql('aes-256-cfb128', unhex('24E9E4966469'), '123456789101213141516171819202122', 'iviviviviviviviv123456') AS plaintext
```
Result:
``` text
┌─plaintext─┐
│ Secret │

View File

@ -4,10 +4,8 @@ toc_title: ALL
# ALL Clause {#select-all}
`SELECT ALL` is identical to `SELECT` without `DISTINCT`.
If there are multiple matching rows in the table, then `ALL` returns all of them. `SELECT ALL` is identical to `SELECT` without `DISTINCT`. If both `ALL` and `DISTINCT` specified, exception will be thrown.
- If `ALL` specified, ignore it.
- If both `ALL` and `DISTINCT` specified, exception will be thrown.
`ALL` can also be specified inside aggregate function with the same effect(noop), for instance:
@ -19,3 +17,5 @@ equals to
```sql
SELECT sum(number) FROM numbers(10);
```
[Original article](https://clickhouse.tech/docs/en/sql-reference/statements/select/all) <!--hide-->

View File

@ -31,21 +31,26 @@ SETTINGS
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_skip_broken_messages = N]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]
```
Обязательные параметры:
- `kafka_broker_list` перечень брокеров, разделенный запятыми (`localhost:9092`).
- `kafka_topic_list` перечень необходимых топиков Kafka.
- `kafka_group_name` группа потребителя Kafka. Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.
- `kafka_format` формат сообщений. Названия форматов должны быть теми же, что можно использовать в секции `FORMAT`, например, `JSONEachRow`. Подробнее читайте в разделе [Форматы](../../../interfaces/formats.md).
- `kafka_broker_list` перечень брокеров, разделенный запятыми (`localhost:9092`).
- `kafka_topic_list` перечень необходимых топиков Kafka.
- `kafka_group_name` группа потребителя Kafka. Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.
- `kafka_format` формат сообщений. Названия форматов должны быть теми же, что можно использовать в секции `FORMAT`, например, `JSONEachRow`. Подробнее читайте в разделе [Форматы](../../../interfaces/formats.md).
Опциональные параметры:
- `kafka_row_delimiter` символ-разделитель записей (строк), которым завершается сообщение.
- `kafka_schema` опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Capn Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `kafka_num_consumers` количество потребителей (consumer) на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.
- `kafka_skip_broken_messages` максимальное количество некорректных сообщений в блоке. Если `kafka_skip_broken_messages = N`, то движок отбрасывает `N` сообщений Кафки, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию 0.
- `kafka_row_delimiter` — символ-разделитель записей (строк), которым завершается сообщение.
- `kafka_schema` — опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Capn Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `kafka_num_consumers` — количество потребителей (consumer) на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.
- `kafka_max_block_size` — максимальный размер пачек (в сообщениях) для poll (по умолчанию `max_block_size`).
- `kafka_skip_broken_messages` — максимальное количество некорректных сообщений в блоке. Если `kafka_skip_broken_messages = N`, то движок отбрасывает `N` сообщений Кафки, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию 0.
- `kafka_commit_every_batch` — включает или отключает режим записи каждой принятой и обработанной пачки по отдельности вместо единой записи целого блока (по умолчанию `0`).
- `kafka_thread_per_consumer` — включает или отключает предоставление отдельного потока каждому потребителю (по умолчанию `0`). При включенном режиме каждый потребитель сбрасывает данные независимо и параллельно, при отключённом — строки с данными от нескольких потребителей собираются в один блок.
Примеры

View File

@ -1937,6 +1937,21 @@ SELECT idx, i FROM null_in WHERE i IN (1, NULL) SETTINGS transform_null_in = 1;
Значение по умолчанию: 16.
## background_message_broker_schedule_pool_size {#background_message_broker_schedule_pool_size}
Задает количество потоков для фонового потокового вывода сообщений. Настройка применяется при запуске сервера ClickHouse и не может быть изменена в пользовательском сеансе.
Допустимые значения:
- Положительное целое число.
Значение по умолчанию: 16.
**Смотрите также**
- Движок [Kafka](../../engines/table-engines/integrations/kafka.md#kafka).
- Движок [RabbitMQ](../../engines/table-engines/integrations/rabbitmq.md#rabbitmq-engine).
## format_avro_schema_registry_url {#format_avro_schema_registry_url}
Задает URL реестра схем [Confluent](https://docs.confluent.io/current/schema-registry/index.html) для использования с форматом [AvroConfluent](../../interfaces/formats.md#data-format-avro-confluent).
@ -2537,4 +2552,15 @@ SELECT * FROM test2;
Обратите внимание на то, что эта настройка влияет на поведение [материализованных представлений](../../sql-reference/statements/create/view.md#materialized) и БД [MaterializeMySQL](../../engines/database-engines/materialize-mysql.md).
## allow_experimental_geo_types {#allow-experimental-geo-types}
Разрешает использование экспериментальных типов данных для работы с [географическими структурами](../../sql-reference/data-types/geo.md).
Возможные значения:
- 0 — Использование типов данных для работы с географическими структурами не поддерживается.
- 1 — Использование типов данных для работы с географическими структурами поддерживается.
Значение по умолчанию: `0`.
[Оригинальная статья](https://clickhouse.tech/docs/ru/operations/settings/settings/) <!--hide-->

View File

@ -9,25 +9,54 @@ toc_title: "\u0421\u0438\u0441\u0442\u0435\u043c\u043d\u044b\u0435\u0020\u0442\u
Системные таблицы содержат информацию о:
- Состоянии сервера, процессов и окружении.
- Внутренних процессах сервера.
- состоянии сервера, процессов и окружении.
- внутренних процессах сервера.
Системные таблицы:
- Находятся в базе данных `system`.
- Доступны только для чтения данных.
- Не могут быть удалены или изменены, но их можно отсоединить.
- находятся в базе данных `system`.
- доступны только для чтения данных.
- не могут быть удалены или изменены, но их можно отсоединить.
Системные таблицы `metric_log`, `query_log`, `query_thread_log`, `trace_log` системные таблицы хранят данные в файловой системе. Остальные системные таблицы хранят свои данные в оперативной памяти. Сервер ClickHouse создает такие системные таблицы при запуске.
Большинство системных таблиц хранят свои данные в оперативной памяти. Сервер ClickHouse создает эти системные таблицы при старте.
В отличие от других системных таблиц, таблицы с системными логами [metric_log](../../operations/system-tables/metric_log.md), [query_log](../../operations/system-tables/query_log.md), [query_thread_log](../../operations/system-tables/query_thread_log.md), [trace_log](../../operations/system-tables/trace_log.md), [part_log](../../operations/system-tables/part_log.md), [crash_log](../../operations/system-tables/crash-log.md) и [text_log](../../operations/system-tables/text_log.md) используют движок таблиц [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md) и по умолчанию хранят свои данные в файловой системе. Если удалить таблицу из файловой системы, сервер ClickHouse снова создаст пустую таблицу во время следующей записи данных. Если схема системной таблицы изменилась в новом релизе, то ClickHouse переименует текущую таблицу и создаст новую.
Таблицы с системными логами `log` можно настроить, создав конфигурационный файл с тем же именем, что и таблица в разделе `/etc/clickhouse-server/config.d/`, или указав соответствующие элементы в `/etc/clickhouse-server/config.xml`. Настраиваться могут следующие элементы:
- `database` — база данных, к которой принадлежит системная таблица. Эта опция на текущий момент устарела. Все системные таблицы находятся в базе данных `system`.
- `table` — таблица для добавления данных.
- `partition_by` — [ключ партиционирования](../../engines/table-engines/mergetree-family/custom-partitioning-key.md).
- `ttl` — [время жизни](../../sql-reference/statements/alter/ttl.md) таблицы.
- `flush_interval_milliseconds` — интервал сброса данных на диск, в миллисекундах.
- `engine` — полное имя движка (начиная с `ENGINE =` ) с параметрами. Эта опция противоречит `partition_by` и `ttl`. Если указать оба параметра вместе, сервер вернет ошибку и завершит работу.
Пример:
```xml
<yandex>
<query_log>
<database>system</database>
<table>query_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<ttl>event_date + INTERVAL 30 DAY DELETE</ttl>
<!--
<engine>ENGINE = MergeTree PARTITION BY toYYYYMM(event_date) ORDER BY (event_date, event_time) SETTINGS index_granularity = 1024</engine>
-->
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_log>
</yandex>
```
По умолчанию размер таблицы не ограничен. Управлять размером таблицы можно используя [TTL](../../sql-reference/statements/alter/ttl.md#manipuliatsii-s-ttl-tablitsy) для удаления устаревших записей журнала. Также вы можете использовать функцию партиционирования для таблиц `MergeTree`.
### Источники системных показателей
Для сбора системных показателей сервер ClickHouse использует:
- Возможности `CAP_NET_ADMIN`.
- возможности `CAP_NET_ADMIN`.
- [procfs](https://ru.wikipedia.org/wiki/Procfs) (только Linux).
**procfs**
Если для сервера ClickHouse не включено `CAP_NET_ADMIN`, он пытается обратиться к `ProcfsMetricsProvider`. `ProcfsMetricsProvider` позволяет собирать системные показатели для каждого запроса (для CPU и I/O).

View File

@ -239,7 +239,7 @@ windowFunnel(window, [mode])(timestamp, cond1, cond2, ..., condN)
**Параметры**
- `window` — ширина скользящего окна по времени в секундах. [UInt](../../sql-reference/aggregate-functions/parametric-functions.md).
- `window` — ширина скользящего окна по времени. Единица измерения зависит от `timestamp` и может варьироваться. Должно соблюдаться условие `timestamp события cond2 <= timestamp события cond1 + window`.
- `mode` - необязательный параметр. Если установлено значение `'strict'`, то функция `windowFunnel()` применяет условия только для уникальных значений.
- `timestamp` — имя столбца, содержащего временные отметки. [Date](../../sql-reference/aggregate-functions/parametric-functions.md), [DateTime](../../sql-reference/aggregate-functions/parametric-functions.md#data_type-datetime) и другие параметры с типом `Integer`. В случае хранения меток времени в столбцах с типом `UInt64`, максимально допустимое значение соответствует ограничению для типа `Int64`, т.е. равно `2^63-1`.
- `cond` — условия или данные, описывающие цепочку событий. [UInt8](../../sql-reference/aggregate-functions/parametric-functions.md).

View File

@ -0,0 +1,106 @@
---
toc_priority: 62
toc_title: Географические структуры
---
# Типы данных для работы с географическими структурами {#geo-data-types}
ClickHouse поддерживает типы данных для отображения географических объектов — точек (местоположений), территорий и т.п.
!!! warning "Предупреждение"
Сейчас использование типов данных для работы с географическими структурами является экспериментальной возможностью. Чтобы использовать эти типы данных, включите настройку `allow_experimental_geo_types = 1`.
**См. также**
- [Хранение географических структур данных](https://ru.wikipedia.org/wiki/GeoJSON).
- Настройка [allow_experimental_geo_types](../../operations/settings/settings.md#allow-experimental-geo-types).
## Point {#point-data-type}
Тип `Point` (точка) определяется парой координат X и Y и хранится в виде кортежа [Tuple](tuple.md)([Float64](float.md), [Float64](float.md)).
**Пример**
Запрос:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_point (p Point) ENGINE = Memory();
INSERT INTO geo_point VALUES((10, 10));
SELECT p, toTypeName(p) FROM geo_point;
```
Результат:
``` text
┌─p─────┬─toTypeName(p)─┐
│ (10,10) │ Point │
└───────┴───────────────┘
```
## Ring {#ring-data-type}
Тип `Ring` описывает простой многоугольник без внутренних областей (дыр) и хранится в виде массива точек: [Array](array.md)([Point](#point-data-type)).
**Пример**
Запрос:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_ring (r Ring) ENGINE = Memory();
INSERT INTO geo_ring VALUES([(0, 0), (10, 0), (10, 10), (0, 10)]);
SELECT r, toTypeName(r) FROM geo_ring;
```
Результат:
``` text
┌─r─────────────────────────────┬─toTypeName(r)─┐
│ [(0,0),(10,0),(10,10),(0,10)] │ Ring │
└───────────────────────────────┴───────────────┘
```
## Polygon {#polygon-data-type}
Тип `Polygon` описывает многоугольник с внутренними областями (дырами) и хранится в виде массива: [Array](array.md)([Ring](#ring-data-type)). Первый элемент массива описывает внешний многоугольник (контур), а остальные элементы описывают дыры.
**Пример**
Запись в этой таблице описывает многоугольник с одной дырой:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_polygon (pg Polygon) ENGINE = Memory();
INSERT INTO geo_polygon VALUES([[(20, 20), (50, 20), (50, 50), (20, 50)], [(30, 30), (50, 50), (50, 30)]]);
SELECT pg, toTypeName(pg) FROM geo_polygon;
```
Результат:
``` text
┌─pg────────────────────────────────────────────────────────────┬─toTypeName(pg)─┐
│ [[(20,20),(50,20),(50,50),(20,50)],[(30,30),(50,50),(50,30)]] │ Polygon │
└───────────────────────────────────────────────────────────────┴────────────────┘
```
## MultiPolygon {#multipolygon-data-type}
Тип `MultiPolygon` описывает элемент, состоящий из нескольких простых многоугольников (полигональную сетку). Он хранится в виде массива многоугольников: [Array](array.md)([Polygon](#polygon-data-type)).
**Пример**
Запись в этой таблице описывает элемент, состоящий из двух многоугольников — первый без дыр, а второй с одной дырой:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_multipolygon (mpg MultiPolygon) ENGINE = Memory();
INSERT INTO geo_multipolygon VALUES([[[(0, 0), (10, 0), (10, 10), (0, 10)]], [[(20, 20), (50, 20), (50, 50), (20, 50)],[(30, 30), (50, 50), (50, 30)]]]);
SELECT mpg, toTypeName(mpg) FROM geo_multipolygon;
```
Result:
``` text
┌─mpg─────────────────────────────────────────────────────────────────────────────────────────────┬─toTypeName(mpg)─┐
│ [[[(0,0),(10,0),(10,10),(0,10)]],[[(20,20),(50,20),(50,50),(20,50)],[(30,30),(50,50),(50,30)]]] │ MultiPolygon │
└─────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────┘
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/data-types/geo/) <!--hide-->

View File

@ -572,7 +572,7 @@ SOURCE(CLICKHOUSE(
или
``` sql
SOURCE(MONGO(
SOURCE(MONGODB(
host 'localhost'
port 27017
user ''

View File

@ -11,7 +11,7 @@ toc_title: "\u0424\u0443\u043d\u043a\u0446\u0438\u0438 \u0434\u043b\u044f \u0448
Длина инициализирующего вектора всегда 16 байт (лишнии байты игнорируются).
Обратите внимание, что эти функции работают медленно.
Обратите внимание, что до версии Clickhouse 21.1 эти функции работали медленно.
## encrypt {#encrypt}
@ -41,7 +41,7 @@ encrypt('mode', 'plaintext', 'key' [, iv, aad])
**Возвращаемое значение**
- Зашифрованная строка. [String](../../sql-reference/data-types/string.md#string).
- Бинарная зашифрованная строка. [String](../../sql-reference/data-types/string.md#string).
**Примеры**
@ -52,57 +52,38 @@ encrypt('mode', 'plaintext', 'key' [, iv, aad])
``` sql
CREATE TABLE encryption_test
(
input String,
key String DEFAULT unhex('fb9958e2e897ef3fdb49067b51a24af645b3626eed2f9ea1dc7fd4dd71b7e38f9a68db2a3184f952382c783785f9d77bf923577108a88adaacae5c141b1576b0'),
iv String DEFAULT unhex('8CA3554377DFF8A369BC50A89780DD85'),
key32 String DEFAULT substring(key, 1, 32),
key24 String DEFAULT substring(key, 1, 24),
key16 String DEFAULT substring(key, 1, 16)
) Engine = Memory;
`comment` String,
`secret` String
)
ENGINE = Memory;
```
Вставим эти данные:
Вставим некоторые данные (замечание: не храните ключи или инициализирующие векторы в базе данных, так как это компрометирует всю концепцию шифрования), также хранение "подсказок" небезопасно и используется только для наглядности:
Запрос:
``` sql
INSERT INTO encryption_test (input) VALUES (''), ('text'), ('What Is ClickHouse?');
INSERT INTO encryption_test VALUES('aes-256-cfb128 no IV', encrypt('aes-256-cfb128', 'Secret', '12345678910121314151617181920212')),\
('aes-256-cfb128 no IV, different key', encrypt('aes-256-cfb128', 'Secret', 'keykeykeykeykeykeykeykeykeykeyke')),\
('aes-256-cfb128 with IV', encrypt('aes-256-cfb128', 'Secret', '12345678910121314151617181920212', 'iviviviviviviviv')),\
('aes-256-cbc no IV', encrypt('aes-256-cbc', 'Secret', '12345678910121314151617181920212'));
```
Пример без `iv`:
Запрос:
``` sql
SELECT 'aes-128-ecb' AS mode, hex(encrypt(mode, input, key16)) FROM encryption_test;
SELECT comment, hex(secret) FROM encryption_test;
```
Результат:
``` text
┌─mode────────┬─hex(encrypt('aes-128-ecb', input, key16))────────────────────────┐
│ aes-128-ecb │ 4603E6862B0D94BBEC68E0B0DF51D60F │
│ aes-128-ecb │ 3004851B86D3F3950672DE7085D27C03 │
│ aes-128-ecb │ E807F8C8D40A11F65076361AFC7D8B68D8658C5FAA6457985CAA380F16B3F7E4 │
└─────────────┴──────────────────────────────────────────────────────────────────┘
```
Пример с `iv`:
Запрос:
``` sql
SELECT 'aes-256-ctr' AS mode, hex(encrypt(mode, input, key32, iv)) FROM encryption_test;
```
Результат:
``` text
┌─mode────────┬─hex(encrypt('aes-256-ctr', input, key32, iv))─┐
│ aes-256-ctr │ │
│ aes-256-ctr │ 7FB039F7 │
│ aes-256-ctr │ 5CBD20F7ABD3AC41FCAA1A5C0E119E2B325949 │
└─────────────┴───────────────────────────────────────────────┘
┌─comment─────────────────────────────┬─hex(secret)──────────────────────┐
│ aes-256-cfb128 no IV │ B4972BDC4459 │
│ aes-256-cfb128 no IV, different key │ 2FF57C092DC9 │
│ aes-256-cfb128 with IV │ 5E6CB398F653 │
│ aes-256-cbc no IV │ 1BC0629A92450D9E73A00E7D02CF4142 │
└─────────────────────────────────────┴──────────────────────────────────┘
```
Пример в режиме `-gcm`:
@ -110,41 +91,27 @@ SELECT 'aes-256-ctr' AS mode, hex(encrypt(mode, input, key32, iv)) FROM encrypti
Запрос:
``` sql
SELECT 'aes-256-gcm' AS mode, hex(encrypt(mode, input, key32, iv)) FROM encryption_test;
INSERT INTO encryption_test VALUES('aes-256-gcm', encrypt('aes-256-gcm', 'Secret', '12345678910121314151617181920212', 'iviviviviviviviv')), \
('aes-256-gcm with AAD', encrypt('aes-256-gcm', 'Secret', '12345678910121314151617181920212', 'iviviviviviviviv', 'aad'));
SELECT comment, hex(secret) FROM encryption_test WHERE comment LIKE '%gcm%';
```
Результат:
``` text
┌─mode────────┬─hex(encrypt('aes-256-gcm', input, key32, iv))──────────────────────────┐
│ aes-256-gcm │ E99DBEBC01F021758352D7FBD9039EFA │
│ aes-256-gcm │ 8742CE3A7B0595B281C712600D274CA881F47414 │
│ aes-256-gcm │ A44FD73ACEB1A64BDE2D03808A2576EDBB60764CC6982DB9AF2C33C893D91B00C60DC5 │
└─────────────┴────────────────────────────────────────────────────────────────────────┘
```
Пример в режиме `-gcm` и с `aad`:
Запрос:
``` sql
SELECT 'aes-192-gcm' AS mode, hex(encrypt(mode, input, key24, iv, 'AAD')) FROM encryption_test;
```
Результат:
``` text
┌─mode────────┬─hex(encrypt('aes-192-gcm', input, key24, iv, 'AAD'))───────────────────┐
│ aes-192-gcm │ 04C13E4B1D62481ED22B3644595CB5DB │
│ aes-192-gcm │ 9A6CF0FD2B329B04EAD18301818F016DF8F77447 │
│ aes-192-gcm │ B961E9FD9B940EBAD7ADDA75C9F198A40797A5EA1722D542890CC976E21113BBB8A7AA │
└─────────────┴────────────────────────────────────────────────────────────────────────┘
┌─comment──────────────┬─hex(secret)──────────────────────────────────┐
│ aes-256-gcm │ A8A3CCBC6426CFEEB60E4EAE03D3E94204C1B09E0254 │
│ aes-256-gcm with AAD │ A8A3CCBC6426D9A1017A0A932322F1852260A4AD6837 │
└──────────────────────┴──────────────────────────────────────────────┘
```
## aes_encrypt_mysql {#aes_encrypt_mysql}
Совместима с шифрованием myqsl, результат может быть расшифрован функцией [AES_DECRYPT](https://dev.mysql.com/doc/refman/8.0/en/encryption-functions.html#function_aes-decrypt).
При одинаковых входящих значениях зашифрованный текст будет совпадать с результатом, возвращаемым функцией `encrypt`. Однако если `key` или `iv` длиннее, чем должны быть, `aes_encrypt_mysql` будет работать аналогично функции `aes_encrypt` в MySQL: свернет ключ и проигнорирует лишнюю часть `iv`.
Функция поддерживает шифрофание данных следующими режимами:
- aes-128-ecb, aes-192-ecb, aes-256-ecb
@ -164,78 +131,96 @@ aes_encrypt_mysql('mode', 'plaintext', 'key' [, iv])
- `mode` — режим шифрования. [String](../../sql-reference/data-types/string.md#string).
- `plaintext` — текст, который будет зашифрован. [String](../../sql-reference/data-types/string.md#string).
- `key` — ключ шифрования. [String](../../sql-reference/data-types/string.md#string).
- `iv` — инициализирующий вектор. Необязателен. [String](../../sql-reference/data-types/string.md#string).
- `key` — ключ шифрования. Если ключ длиннее, чем требует режим шифрования, производится специфичная для MySQL свертка ключа. [String](../../sql-reference/data-types/string.md#string).
- `iv` — инициализирующий вектор. Необязателен, учитываются только первые 16 байтов. [String](../../sql-reference/data-types/string.md#string).
**Возвращаемое значение**
- Зашифрованная строка. [String](../../sql-reference/data-types/string.md#string).
- Бинарная зашифрованная строка. [String](../../sql-reference/data-types/string.md#string).
**Примеры**
Создадим такую таблицу:
При одинаковых входящих значениях результаты шифрования у функций `encrypt` и `aes_encrypt_mysql` совпадают.
Запрос:
``` sql
CREATE TABLE encryption_test
(
input String,
key String DEFAULT unhex('fb9958e2e897ef3fdb49067b51a24af645b3626eed2f9ea1dc7fd4dd71b7e38f9a68db2a3184f952382c783785f9d77bf923577108a88adaacae5c141b1576b0'),
iv String DEFAULT unhex('8CA3554377DFF8A369BC50A89780DD85'),
key32 String DEFAULT substring(key, 1, 32),
key24 String DEFAULT substring(key, 1, 24),
key16 String DEFAULT substring(key, 1, 16)
) Engine = Memory;
```
Вставим эти данные:
Запрос:
``` sql
INSERT INTO encryption_test (input) VALUES (''), ('text'), ('What Is ClickHouse?');
```
Пример без `iv`:
Запрос:
``` sql
SELECT 'aes-128-cbc' AS mode, hex(aes_encrypt_mysql(mode, input, key32)) FROM encryption_test;
SELECT encrypt('aes-256-cfb128', 'Secret', '12345678910121314151617181920212', 'iviviviviviviviv') = aes_encrypt_mysql('aes-256-cfb128', 'Secret', '12345678910121314151617181920212', 'iviviviviviviviv') AS ciphertexts_equal;
```
Результат:
``` text
┌─mode────────┬─hex(aes_encrypt_mysql('aes-128-cbc', input, key32))──────────────┐
│ aes-128-cbc │ FEA8CFDE6EE2C6E7A2CC6ADDC9F62C83 │
│ aes-128-cbc │ 78B16CD4BE107660156124C5FEE6454A │
│ aes-128-cbc │ 67C0B119D96F18E2823968D42871B3D179221B1E7EE642D628341C2B29BA2E18 │
└─────────────┴──────────────────────────────────────────────────────────────────┘
┌─ciphertexts_equal─┐
│ 1 │
└───────────────────┘
```
Пример с `iv`:
Функция `encrypt` генерирует исключение, если `key` или `iv` длиннее чем нужно:
Запрос:
``` sql
SELECT 'aes-256-cfb128' AS mode, hex(aes_encrypt_mysql(mode, input, key32, iv)) FROM encryption_test;
SELECT encrypt('aes-256-cfb128', 'Secret', '123456789101213141516171819202122', 'iviviviviviviviv123');
```
Результат:
``` text
┌─mode───────────┬─hex(aes_encrypt_mysql('aes-256-cfb128', input, key32, iv))─┐
│ aes-256-cfb128 │ │
│ aes-256-cfb128 │ 7FB039F7 │
│ aes-256-cfb128 │ 5CBD20F7ABD3AC41FCAA1A5C0E119E2BB5174F │
└────────────────┴────────────────────────────────────────────────────────────┘
Received exception from server (version 21.1.2):
Code: 36. DB::Exception: Received from localhost:9000. DB::Exception: Invalid key size: 33 expected 32: While processing encrypt('aes-256-cfb128', 'Secret', '123456789101213141516171819202122', 'iviviviviviviviv123').
```
Однако функция `aes_encrypt_mysql` в аналогичном случае возвращает результат, который может быть обработан MySQL:
Запрос:
``` sql
SELECT hex(aes_encrypt_mysql('aes-256-cfb128', 'Secret', '123456789101213141516171819202122', 'iviviviviviviviv123')) AS ciphertext;
```
Результат:
```text
┌─ciphertext───┐
│ 24E9E4966469 │
└──────────────┘
```
Если передать `iv` еще длиннее, результат останется таким же:
Запрос:
``` sql
SELECT hex(aes_encrypt_mysql('aes-256-cfb128', 'Secret', '123456789101213141516171819202122', 'iviviviviviviviv123456')) AS ciphertext
```
Результат:
``` text
┌─ciphertext───┐
│ 24E9E4966469 │
└──────────────┘
```
Это совпадает с результатом, возвращаемым MySQL при таких же входящих значениях:
``` sql
mysql> SET block_encryption_mode='aes-256-cfb128';
Query OK, 0 rows affected (0.00 sec)
mysql> SELECT aes_encrypt('Secret', '123456789101213141516171819202122', 'iviviviviviviviv123456') as ciphertext;
+------------------------+
| ciphertext |
+------------------------+
| 0x24E9E4966469 |
+------------------------+
1 row in set (0.00 sec)
```
## decrypt {#decrypt}
Функция поддерживает расшифровку данных следующими режимами:
Функция расшифровывает зашифрованный текст и может работать в следующих режимах:
- aes-128-ecb, aes-192-ecb, aes-256-ecb
- aes-128-cbc, aes-192-cbc, aes-256-cbc
@ -265,52 +250,58 @@ decrypt('mode', 'ciphertext', 'key' [, iv, aad])
**Примеры**
Создадим такую таблицу:
Рассмотрим таблицу из примера для функции [encrypt](#encrypt).
Запрос:
``` sql
CREATE TABLE encryption_test
(
input String,
key String DEFAULT unhex('fb9958e2e897ef3fdb49067b51a24af645b3626eed2f9ea1dc7fd4dd71b7e38f9a68db2a3184f952382c783785f9d77bf923577108a88adaacae5c141b1576b0'),
iv String DEFAULT unhex('8CA3554377DFF8A369BC50A89780DD85'),
key32 String DEFAULT substring(key, 1, 32),
key24 String DEFAULT substring(key, 1, 24),
key16 String DEFAULT substring(key, 1, 16)
) Engine = Memory;
```
Вставим эти данные:
Запрос:
``` sql
INSERT INTO encryption_test (input) VALUES (''), ('text'), ('What Is ClickHouse?');
```
Запрос:
``` sql
SELECT 'aes-128-ecb' AS mode, decrypt(mode, encrypt(mode, input, key16), key16) FROM encryption_test;
SELECT comment, hex(secret) FROM encryption_test;
```
Результат:
``` text
┌─mode────────┬─decrypt('aes-128-ecb', encrypt('aes-128-ecb', input, key16), key16)─┐
│ aes-128-ecb │ │
│ aes-128-ecb │ text │
│ aes-128-ecb │ What Is ClickHouse? │
└─────────────┴─────────────────────────────────────────────────────────────────────┘
┌─comment──────────────┬─hex(secret)──────────────────────────────────┐
│ aes-256-gcm │ A8A3CCBC6426CFEEB60E4EAE03D3E94204C1B09E0254 │
│ aes-256-gcm with AAD │ A8A3CCBC6426D9A1017A0A932322F1852260A4AD6837 │
└──────────────────────┴──────────────────────────────────────────────┘
┌─comment─────────────────────────────┬─hex(secret)──────────────────────┐
│ aes-256-cfb128 no IV │ B4972BDC4459 │
│ aes-256-cfb128 no IV, different key │ 2FF57C092DC9 │
│ aes-256-cfb128 with IV │ 5E6CB398F653 │
│ aes-256-cbc no IV │ 1BC0629A92450D9E73A00E7D02CF4142 │
└─────────────────────────────────────┴──────────────────────────────────┘
```
Теперь попытаемся расшифровать эти данные:
Запрос:
``` sql
SELECT comment, decrypt('aes-256-cfb128', secret, '12345678910121314151617181920212') as plaintext FROM encryption_test;
```
Результат:
``` text
┌─comment─────────────────────────────┬─plaintext─┐
│ aes-256-cfb128 no IV │ Secret │
│ aes-256-cfb128 no IV, different key │ <20>4<EFBFBD>
<20>
│ aes-256-cfb128 with IV │ <20><><EFBFBD>6<EFBFBD>~ │
│aes-256-cbc no IV │ <20>2*4<>h3c<33>4w<34><77>@
└─────────────────────────────────────┴───────────┘
```
Обратите внимание, что только часть данных была расшифрована верно. Оставшаяся часть расшифрована некорректно, так как при шифровании использовались другие значения `mode`, `key`, или `iv`.
## aes_decrypt_mysql {#aes_decrypt_mysql}
Совместима с шифрованием myqsl и может расшифровать данные, зашифрованные функцией [AES_ENCRYPT](https://dev.mysql.com/doc/refman/8.0/en/encryption-functions.html#function_aes-encrypt).
Функция поддерживает расшифровку данных следующими режимами:
При одинаковых входящих значениях расшифрованный текст будет совпадать с результатом, возвращаемым функцией `decrypt`. Однако если `key` или `iv` длиннее, чем должны быть, `aes_decrypt_mysql` будет работать аналогично функции `aes_decrypt` в MySQL: свернет ключ и проигнорирует лишнюю часть `iv`.
Функция поддерживает расшифровку данных в следующих режимах:
- aes-128-ecb, aes-192-ecb, aes-256-ecb
- aes-128-cbc, aes-192-cbc, aes-256-cbc
@ -332,51 +323,39 @@ aes_decrypt_mysql('mode', 'ciphertext', 'key' [, iv])
- `key` — ключ шифрования. [String](../../sql-reference/data-types/string.md#string).
- `iv` — инициализирующий вектор. Необязателен. [String](../../sql-reference/data-types/string.md#string).
**Возвращаемое значение**
- Расшифрованная строка. [String](../../sql-reference/data-types/string.md#string).
**Примеры**
Создадим такую таблицу:
Расшифруем данные, которые до этого были зашифрованы в MySQL:
Запрос:
``` sql
CREATE TABLE encryption_test
(
input String,
key String DEFAULT unhex('fb9958e2e897ef3fdb49067b51a24af645b3626eed2f9ea1dc7fd4dd71b7e38f9a68db2a3184f952382c783785f9d77bf923577108a88adaacae5c141b1576b0'),
iv String DEFAULT unhex('8CA3554377DFF8A369BC50A89780DD85'),
key32 String DEFAULT substring(key, 1, 32),
key24 String DEFAULT substring(key, 1, 24),
key16 String DEFAULT substring(key, 1, 16)
) Engine = Memory;
```
mysql> SET block_encryption_mode='aes-256-cfb128';
Query OK, 0 rows affected (0.00 sec)
Вставим эти данные:
Запрос:
``` sql
INSERT INTO encryption_test (input) VALUES (''), ('text'), ('What Is ClickHouse?');
mysql> SELECT aes_encrypt('Secret', '123456789101213141516171819202122', 'iviviviviviviviv123456') as ciphertext;
+------------------------+
| ciphertext |
+------------------------+
| 0x24E9E4966469 |
+------------------------+
1 row in set (0.00 sec)
```
Запрос:
``` sql
SELECT 'aes-128-cbc' AS mode, aes_decrypt_mysql(mode, aes_encrypt_mysql(mode, input, key), key) FROM encryption_test;
SELECT aes_decrypt_mysql('aes-256-cfb128', unhex('24E9E4966469'), '123456789101213141516171819202122', 'iviviviviviviviv123456') AS plaintext;
```
Результат:
``` text
┌─mode────────┬─aes_decrypt_mysql('aes-128-cbc', aes_encrypt_mysql('aes-128-cbc', input, key), key)─┐
│ aes-128-cbc │ │
│ aes-128-cbc │ text │
│ aes-128-cbc │ What Is ClickHouse? │
└─────────────┴─────────────────────────────────────────────────────────────────────────────────────┘
┌─plaintext─┐
│ Secret │
└───────────┘
```
[Original article](https://clickhouse.tech/docs/ru/sql-reference/functions/encryption_functions/) <!--hide-->

View File

@ -0,0 +1,22 @@
---
toc_title: ALL
---
# Секция ALL {#select-all}
Если в таблице несколько совпадающих строк, то `ALL` возвращает все из них. Поведение запроса `SELECT ALL` точно такое же, как и `SELECT` без аргумента `DISTINCT`. Если указаны оба аргумента: `ALL` и `DISTINCT`, функция вернет исключение.
`ALL` может быть указан внутри агрегатной функции, например, результат выполнения запроса:
```sql
SELECT sum(ALL number) FROM numbers(10);
```
равен результату выполнения запроса:
```sql
SELECT sum(number) FROM numbers(10);
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/sql-reference/statements/select/all) <!--hide-->

View File

@ -1017,17 +1017,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they require PHDR cache to be created"
" (otherwise the function 'dl_iterate_phdr' is not lock free and not async-signal safe).");
if (has_zookeeper && config().has("distributed_ddl"))
{
/// DDL worker should be started after all tables were loaded
String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
int pool_size = config().getInt("distributed_ddl.pool_size", 1);
if (pool_size < 1)
throw Exception("distributed_ddl.pool_size should be greater then 0", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, *global_context, &config(),
"distributed_ddl", "DDLWorker", &CurrentMetrics::MaxDDLEntryID));
}
std::unique_ptr<DNSCacheUpdater> dns_cache_updater;
if (config().has("disable_internal_dns_cache") && config().getInt("disable_internal_dns_cache"))
{
@ -1309,6 +1298,37 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::thread::hardware_concurrency());
}
/// try to load dictionaries immediately, throw on error and die
ext::scope_guard dictionaries_xmls, models_xmls;
try
{
if (!config().getBool("dictionaries_lazy_load", true))
{
global_context->tryCreateEmbeddedDictionaries();
global_context->getExternalDictionariesLoader().enableAlwaysLoadEverything(true);
}
dictionaries_xmls = global_context->getExternalDictionariesLoader().addConfigRepository(
std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "dictionaries_config"));
models_xmls = global_context->getExternalModelsLoader().addConfigRepository(
std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "models_config"));
}
catch (...)
{
LOG_ERROR(log, "Caught exception while loading dictionaries.");
throw;
}
if (has_zookeeper && config().has("distributed_ddl"))
{
/// DDL worker should be started after all tables were loaded
String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
int pool_size = config().getInt("distributed_ddl.pool_size", 1);
if (pool_size < 1)
throw Exception("distributed_ddl.pool_size should be greater then 0", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, *global_context, &config(),
"distributed_ddl", "DDLWorker", &CurrentMetrics::MaxDDLEntryID));
}
LOG_INFO(log, "Ready for connections.");
SCOPE_EXIT({
@ -1358,26 +1378,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
});
/// try to load dictionaries immediately, throw on error and die
ext::scope_guard dictionaries_xmls, models_xmls;
try
{
if (!config().getBool("dictionaries_lazy_load", true))
{
global_context->tryCreateEmbeddedDictionaries();
global_context->getExternalDictionariesLoader().enableAlwaysLoadEverything(true);
}
dictionaries_xmls = global_context->getExternalDictionariesLoader().addConfigRepository(
std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "dictionaries_config"));
models_xmls = global_context->getExternalModelsLoader().addConfigRepository(
std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "models_config"));
}
catch (...)
{
LOG_ERROR(log, "Caught exception while loading dictionaries.");
throw;
}
std::vector<std::unique_ptr<MetricsTransmitter>> metrics_transmitters;
for (const auto & graphite_key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))
{

View File

@ -892,6 +892,19 @@
<!-- Controls how much ON CLUSTER queries can be run simultaneously. -->
<!-- <pool_size>1</pool_size> -->
<!--
Cleanup settings (active tasks will not be removed)
-->
<!-- Controls task TTL (default 1 week) -->
<!-- <task_max_lifetime>604800</task_max_lifetime> -->
<!-- Controls how often cleanup should be performed (in seconds) -->
<!-- <cleanup_delay_period>60</cleanup_delay_period> -->
<!-- Controls how many tasks could be in the queue -->
<!-- <max_tasks_in_queue>1000</max_tasks_in_queue> -->
</distributed_ddl>
<!-- Settings to fine tune MergeTree tables. See documentation in source code, in MergeTreeSettings.h -->

View File

@ -118,6 +118,8 @@ public:
WhichDataType value_type_to_check(value_type);
/// Do not promote decimal because of implementation issues of this function design
/// Currently we cannot get result column type in case of decimal we cannot get decimal scale
/// in method void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
/// If we decide to make this function more efficient we should promote decimal type during summ
if (value_type_to_check.isDecimal())
result_type = value_type_without_nullable;

View File

@ -101,8 +101,8 @@ endif()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp Functions/replicate.cpp)
list (APPEND dbms_headers Functions/IFunctionImpl.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/replicate.h)
list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp Functions/replicate.cpp Functions/FunctionsLogical.cpp)
list (APPEND dbms_headers Functions/IFunctionImpl.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/replicate.h Functions/FunctionsLogical.h)
list (APPEND dbms_sources
AggregateFunctions/AggregateFunctionFactory.cpp

View File

@ -538,12 +538,13 @@
M(569, MULTIPLE_COLUMNS_SERIALIZED_TO_SAME_PROTOBUF_FIELD) \
M(570, DATA_TYPE_INCOMPATIBLE_WITH_PROTOBUF_FIELD) \
M(571, DATABASE_REPLICATION_FAILED) \
M(572, TOO_MANY_QUERY_PLAN_OPTIMIZATIONS) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \
M(1002, UNKNOWN_EXCEPTION) \
M(1003, INVALID_SHARD_ID)
M(1003, INVALID_SHARD_ID) \
/* See END */

View File

@ -437,6 +437,7 @@ class IColumn;
M(UnionMode, union_default_mode, UnionMode::Unspecified, "Set default Union Mode in SelectWithUnion query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without Union Mode will throw exception.", 0) \
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.

View File

@ -22,7 +22,7 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db
/// We also need similar graph to load tables on server startup in order of topsort.
}
void DatabaseReplicatedDDLWorker::initializeMainThread()
bool DatabaseReplicatedDDLWorker::initializeMainThread()
{
while (!stop_flag)
{
@ -33,7 +33,7 @@ void DatabaseReplicatedDDLWorker::initializeMainThread()
database->tryConnectToZooKeeperAndInitDatabase(false);
initializeReplication();
initialized = true;
return;
return true;
}
catch (...)
{
@ -41,6 +41,8 @@ void DatabaseReplicatedDDLWorker::initializeMainThread()
sleepForSeconds(5);
}
}
return false;
}
void DatabaseReplicatedDDLWorker::shutdown()
@ -61,7 +63,7 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
if (our_log_ptr == 0 || our_log_ptr + logs_to_keep < max_log_ptr)
database->recoverLostReplica(current_zookeeper, our_log_ptr, max_log_ptr);
else
last_skipped_entry_name.emplace(log_ptr_str);
last_skipped_entry_name.emplace(DDLTaskBase::getLogEntryName(our_log_ptr));
}
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry)

View File

@ -30,7 +30,7 @@ public:
void shutdown() override;
private:
void initializeMainThread() override;
bool initializeMainThread() override;
void initializeReplication();
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override;

View File

@ -6,6 +6,7 @@
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionsConversion.h>
#include <Functions/materialize.h>
#include <Functions/FunctionsLogical.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionJIT.h>
#include <IO/WriteBufferFromString.h>
@ -364,7 +365,7 @@ void ActionsDAG::removeUnusedActions(const std::vector<Node *> & required_nodes)
removeUnusedActions();
}
void ActionsDAG::removeUnusedActions()
void ActionsDAG::removeUnusedActions(bool allow_remove_inputs)
{
std::unordered_set<const Node *> visited_nodes;
std::stack<Node *> stack;
@ -388,6 +389,9 @@ void ActionsDAG::removeUnusedActions()
visited_nodes.insert(&node);
stack.push(&node);
}
if (node.type == ActionType::INPUT && !allow_remove_inputs)
visited_nodes.insert(&node);
}
while (!stack.empty())
@ -516,6 +520,11 @@ bool ActionsDAG::removeUnusedResult(const std::string & column_name)
if (col == child)
return false;
/// Do not remove input if it was mentioned in index several times.
for (const auto * node : index)
if (col == node)
return false;
/// Remove from nodes and inputs.
for (auto jt = nodes.begin(); jt != nodes.end(); ++jt)
{
@ -1203,4 +1212,340 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & co
return split(split_nodes);
}
namespace
{
struct ConjunctionNodes
{
std::vector<ActionsDAG::Node *> allowed;
std::vector<ActionsDAG::Node *> rejected;
};
/// Take a node which result is predicate.
/// Assuming predicate is a conjunction (probably, trivial).
/// Find separate conjunctions nodes. Split nodes into allowed and rejected sets.
/// Allowed predicate is a predicate which can be calculated using only nodes from allowed_nodes set.
ConjunctionNodes getConjunctionNodes(ActionsDAG::Node * predicate, std::unordered_set<const ActionsDAG::Node *> allowed_nodes)
{
ConjunctionNodes conjunction;
std::unordered_set<ActionsDAG::Node *> allowed;
std::unordered_set<ActionsDAG::Node *> rejected;
struct Frame
{
ActionsDAG::Node * node;
bool is_predicate = false;
size_t next_child_to_visit = 0;
size_t num_allowed_children = 0;
};
std::stack<Frame> stack;
std::unordered_set<ActionsDAG::Node *> visited_nodes;
stack.push(Frame{.node = predicate, .is_predicate = true});
visited_nodes.insert(predicate);
while (!stack.empty())
{
auto & cur = stack.top();
bool is_conjunction = cur.is_predicate
&& cur.node->type == ActionsDAG::ActionType::FUNCTION
&& cur.node->function_base->getName() == "and";
/// At first, visit all children.
while (cur.next_child_to_visit < cur.node->children.size())
{
auto * child = cur.node->children[cur.next_child_to_visit];
if (visited_nodes.count(child) == 0)
{
visited_nodes.insert(child);
stack.push({.node = child, .is_predicate = is_conjunction});
break;
}
if (allowed_nodes.contains(child))
++cur.num_allowed_children;
++cur.next_child_to_visit;
}
if (cur.next_child_to_visit == cur.node->children.size())
{
if (cur.num_allowed_children == cur.node->children.size())
{
if (cur.node->type != ActionsDAG::ActionType::ARRAY_JOIN && cur.node->type != ActionsDAG::ActionType::INPUT)
allowed_nodes.emplace(cur.node);
}
else if (is_conjunction)
{
for (auto * child : cur.node->children)
{
if (allowed_nodes.count(child))
{
if (allowed.insert(child).second)
conjunction.allowed.push_back(child);
}
}
}
else if (cur.is_predicate)
{
if (rejected.insert(cur.node).second)
conjunction.rejected.push_back(cur.node);
}
stack.pop();
}
}
if (conjunction.allowed.empty())
{
/// If nothing was added to conjunction, check if it is trivial.
if (allowed_nodes.count(predicate))
conjunction.allowed.push_back(predicate);
}
return conjunction;
}
ColumnsWithTypeAndName prepareFunctionArguments(const std::vector<ActionsDAG::Node *> nodes)
{
ColumnsWithTypeAndName arguments;
arguments.reserve(nodes.size());
for (const auto * child : nodes)
{
ColumnWithTypeAndName argument;
argument.column = child->column;
argument.type = child->result_type;
argument.name = child->result_name;
arguments.emplace_back(std::move(argument));
}
return arguments;
}
}
/// Create actions which calculate conjunction of selected nodes.
/// Assume conjunction nodes are predicates (and may be used as arguments of function AND).
///
/// Result actions add single column with conjunction result (it is always last in index).
/// No other columns are added or removed.
ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunction)
{
if (conjunction.empty())
return nullptr;
auto actions = cloneEmpty();
actions->settings.project_input = false;
FunctionOverloadResolverPtr func_builder_and =
std::make_shared<FunctionOverloadResolverAdaptor>(
std::make_unique<DefaultOverloadResolver>(
std::make_shared<FunctionAnd>()));
std::unordered_map<const ActionsDAG::Node *, ActionsDAG::Node *> nodes_mapping;
struct Frame
{
const ActionsDAG::Node * node;
size_t next_child_to_visit = 0;
};
std::stack<Frame> stack;
/// DFS. Clone actions.
for (const auto * predicate : conjunction)
{
if (nodes_mapping.count(predicate))
continue;
stack.push({.node = predicate});
while (!stack.empty())
{
auto & cur = stack.top();
/// At first, visit all children.
while (cur.next_child_to_visit < cur.node->children.size())
{
auto * child = cur.node->children[cur.next_child_to_visit];
if (nodes_mapping.count(child) == 0)
{
stack.push({.node = child});
break;
}
++cur.next_child_to_visit;
}
if (cur.next_child_to_visit == cur.node->children.size())
{
auto & node = actions->nodes.emplace_back(*cur.node);
nodes_mapping[cur.node] = &node;
for (auto & child : node.children)
child = nodes_mapping[child];
if (node.type == ActionType::INPUT)
{
actions->inputs.emplace_back(&node);
actions->index.insert(&node);
}
stack.pop();
}
}
}
Node * result_predicate = nodes_mapping[*conjunction.begin()];
if (conjunction.size() > 1)
{
std::vector<Node *> args;
args.reserve(conjunction.size());
for (const auto * predicate : conjunction)
args.emplace_back(nodes_mapping[predicate]);
result_predicate = &actions->addFunction(func_builder_and, args, {}, true, false);
}
actions->index.insert(result_predicate);
return actions;
}
ActionsDAGPtr ActionsDAG::splitActionsForFilter(const std::string & filter_name, bool can_remove_filter, const Names & available_inputs)
{
Node * predicate;
{
auto it = index.begin();
for (; it != index.end(); ++it)
if ((*it)->result_name == filter_name)
break;
if (it == index.end())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Index for ActionsDAG does not contain filter column name {}. DAG:\n{}",
filter_name, dumpDAG());
predicate = *it;
}
std::unordered_set<const Node *> allowed_nodes;
/// Get input nodes from available_inputs names.
{
std::unordered_map<std::string_view, std::list<const Node *>> inputs_map;
for (const auto & input : inputs)
inputs_map[input->result_name].emplace_back(input);
for (const auto & name : available_inputs)
{
auto & inputs_list = inputs_map[name];
if (inputs_list.empty())
continue;
allowed_nodes.emplace(inputs_list.front());
inputs_list.pop_front();
}
}
auto conjunction = getConjunctionNodes(predicate, allowed_nodes);
auto actions = cloneActionsForConjunction(conjunction.allowed);
if (!actions)
return nullptr;
/// Now, when actions are created, update current DAG.
if (conjunction.rejected.empty())
{
/// The whole predicate was split.
if (can_remove_filter)
{
/// If filter column is not needed, remove it from index.
for (auto i = index.begin(); i != index.end(); ++i)
{
if (*i == predicate)
{
index.remove(i);
break;
}
}
}
else
{
/// Replace predicate result to constant 1.
Node node;
node.type = ActionType::COLUMN;
node.result_name = std::move(predicate->result_name);
node.result_type = std::move(predicate->result_type);
node.column = node.result_type->createColumnConst(0, 1);
*predicate = std::move(node);
}
removeUnusedActions(false);
}
else
{
/// Predicate is conjunction, where both allowed and rejected sets are not empty.
/// Replace this node to conjunction of rejected predicates.
std::vector<Node *> new_children(conjunction.rejected.begin(), conjunction.rejected.end());
if (new_children.size() == 1)
{
/// Rejected set has only one predicate.
if (new_children.front()->result_type->equals(*predicate->result_type))
{
/// If it's type is same, just add alias.
Node node;
node.type = ActionType::ALIAS;
node.result_name = predicate->result_name;
node.result_type = predicate->result_type;
node.children.swap(new_children);
*predicate = std::move(node);
}
else
{
/// If type is different, cast column.
/// This case is possible, cause AND can use any numeric type as argument.
Node node;
node.type = ActionType::COLUMN;
node.result_name = predicate->result_type->getName();
node.column = DataTypeString().createColumnConst(0, node.result_name);
node.result_type = std::make_shared<DataTypeString>();
auto * right_arg = &nodes.emplace_back(std::move(node));
auto * left_arg = new_children.front();
predicate->children = {left_arg, right_arg};
auto arguments = prepareFunctionArguments(predicate->children);
FunctionOverloadResolverPtr func_builder_cast =
std::make_shared<FunctionOverloadResolverAdaptor>(
CastOverloadResolver<CastType::nonAccurate>::createImpl(false));
predicate->function_builder = func_builder_cast;
predicate->function_base = predicate->function_builder->build(arguments);
predicate->function = predicate->function_base->prepare(arguments);
}
}
else
{
/// Predicate is function AND, which still have more then one argument.
/// Just update children and rebuild it.
predicate->children.swap(new_children);
auto arguments = prepareFunctionArguments(predicate->children);
predicate->function_base = predicate->function_builder->build(arguments);
predicate->function = predicate->function_base->prepare(arguments);
}
removeUnusedActions(false);
}
return actions;
}
}

View File

@ -152,6 +152,9 @@ public:
}
};
/// NOTE: std::list is an implementation detail.
/// It allows to add and remove new nodes inplace without reallocation.
/// Raw pointers to nodes remain valid.
using Nodes = std::list<Node>;
using Inputs = std::vector<Node *>;
@ -278,6 +281,13 @@ public:
/// Index of initial actions must contain column_name.
SplitResult splitActionsForFilter(const std::string & column_name) const;
/// Create actions which may calculate part of filter using only available_inputs.
/// If nothing may be calculated, returns nullptr.
/// Otherwise, return actions which inputs are from available_inputs.
/// Returned actions add single column which may be used for filter.
/// Also, replace some nodes of current inputs to constant 1 in case they are filtered.
ActionsDAGPtr splitActionsForFilter(const std::string & filter_name, bool can_remove_filter, const Names & available_inputs);
private:
Node & addNode(Node node, bool can_replace = false, bool add_to_index = true);
Node & getNode(const std::string & name);
@ -302,10 +312,12 @@ private:
}
void removeUnusedActions(const std::vector<Node *> & required_nodes);
void removeUnusedActions();
void removeUnusedActions(bool allow_remove_inputs = true);
void addAliases(const NamesWithAliases & aliases, std::vector<Node *> & result_nodes);
void compileFunctions();
ActionsDAGPtr cloneActionsForConjunction(std::vector<Node *> conjunction);
};

View File

@ -284,7 +284,7 @@ void SelectStreamFactory::createForShard(
if (try_results.empty() || local_delay < max_remote_delay)
{
auto plan = createLocalPlan(modified_query_ast, header, context, stage);
return QueryPipeline::getPipe(std::move(*plan->buildQueryPipeline()));
return QueryPipeline::getPipe(std::move(*plan->buildQueryPipeline(QueryPlanOptimizationSettings(context.getSettingsRef()))));
}
else
{

View File

@ -189,7 +189,7 @@ public:
void commit();
~ZooKeeperMetadataTransaction() { assert(isExecuted() || std::uncaught_exception()); }
~ZooKeeperMetadataTransaction() { assert(isExecuted() || std::uncaught_exceptions()); }
};
}

View File

@ -305,22 +305,28 @@ static void filterAndSortQueueNodes(Strings & all_nodes)
std::sort(all_nodes.begin(), all_nodes.end());
}
void DDLWorker::scheduleTasks()
void DDLWorker::scheduleTasks(bool reinitialized)
{
LOG_DEBUG(log, "Scheduling tasks");
auto zookeeper = tryGetZooKeeper();
for (auto & task : current_tasks)
{
/// Main thread of DDLWorker was restarted, probably due to lost connection with ZooKeeper.
/// We have some unfinished tasks. To avoid duplication of some queries, try to write execution status.
if (reinitialized)
{
for (auto & task : current_tasks)
{
if (task->was_executed)
{
bool task_still_exists = zookeeper->exists(task->entry_path);
bool status_written = zookeeper->exists(task->getFinishedNodePath());
if (task->was_executed && !status_written && task_still_exists)
if (!status_written && task_still_exists)
{
processTask(*task, zookeeper);
}
}
}
}
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
filterAndSortQueueNodes(queue_nodes);
@ -332,19 +338,23 @@ void DDLWorker::scheduleTasks()
else if (max_tasks_in_queue < queue_nodes.size())
cleanup_event->set();
bool server_startup = current_tasks.empty();
/// Detect queue start, using:
/// - skipped tasks
/// - in memory tasks (that are currently active)
auto begin_node = queue_nodes.begin();
if (!server_startup)
UInt64 last_task_id = 0;
if (!current_tasks.empty())
{
/// We will recheck status of last executed tasks. It's useful if main thread was just restarted.
auto & min_task = *std::min_element(current_tasks.begin(), current_tasks.end());
String min_entry_name = last_skipped_entry_name ? std::min(min_task->entry_name, *last_skipped_entry_name) : min_task->entry_name;
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), min_entry_name);
current_tasks.clear();
auto & last_task = current_tasks.back();
last_task_id = DDLTaskBase::getLogEntryNumber(last_task->entry_name);
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_task->entry_name);
}
if (last_skipped_entry_name)
{
UInt64 last_skipped_entry_id = DDLTaskBase::getLogEntryNumber(*last_skipped_entry_name);
if (last_skipped_entry_id > last_task_id)
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), *last_skipped_entry_name);
}
assert(current_tasks.empty());
for (auto it = begin_node; it != queue_nodes.end() && !stop_flag; ++it)
{
@ -365,7 +375,7 @@ void DDLWorker::scheduleTasks()
if (worker_pool)
{
worker_pool->scheduleOrThrowOnError([this, &saved_task, &zookeeper]()
worker_pool->scheduleOrThrowOnError([this, &saved_task, zookeeper]()
{
setThreadName("DDLWorkerExec");
processTask(saved_task, zookeeper);
@ -930,11 +940,11 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
}
void DDLWorker::initializeMainThread()
bool DDLWorker::initializeMainThread()
{
assert(!initialized);
setThreadName("DDLWorker");
LOG_DEBUG(log, "Started DDLWorker thread");
LOG_DEBUG(log, "Initializing DDLWorker thread");
while (!stop_flag)
{
@ -943,7 +953,7 @@ void DDLWorker::initializeMainThread()
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(fs::path(queue_dir) / "");
initialized = true;
return;
return true;
}
catch (const Coordination::Exception & e)
{
@ -964,6 +974,8 @@ void DDLWorker::initializeMainThread()
/// Avoid busy loop when ZooKeeper is not available.
sleepForSeconds(5);
}
return false;
}
void DDLWorker::runMainThread()
@ -989,15 +1001,19 @@ void DDLWorker::runMainThread()
{
try
{
bool reinitialized = !initialized;
/// Reinitialize DDLWorker state (including ZooKeeper connection) if required
if (!initialized)
{
initializeMainThread();
/// Stopped
if (!initializeMainThread())
break;
LOG_DEBUG(log, "Initialized DDLWorker thread");
}
cleanup_event->set();
scheduleTasks();
scheduleTasks(reinitialized);
LOG_DEBUG(log, "Waiting for queue updates");
queue_updated_event->wait();
@ -1007,6 +1023,9 @@ void DDLWorker::runMainThread()
if (Coordination::isHardwareError(e.code))
{
initialized = false;
/// Wait for pending async tasks
if (1 < pool_size)
worker_pool = std::make_unique<ThreadPool>(pool_size);
LOG_INFO(log, "Lost ZooKeeper connection, will try to connect again: {}", getCurrentExceptionMessage(true));
}
else

View File

@ -69,7 +69,7 @@ protected:
ZooKeeperPtr getAndSetZooKeeper();
/// Iterates through queue tasks in ZooKeeper, runs execution of new tasks
void scheduleTasks();
void scheduleTasks(bool reinitialized);
DDLTaskBase & saveTask(DDLTaskPtr && task);
@ -104,7 +104,8 @@ protected:
/// Init task node
void createStatusDirs(const std::string & node_path, const ZooKeeperPtr & zookeeper);
virtual void initializeMainThread();
/// Return false if the worker was stopped (stop_flag = true)
virtual bool initializeMainThread();
void runMainThread();
void runCleanupThread();

View File

@ -21,7 +21,7 @@
#include <IO/WriteHelpers.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
namespace DB
{
@ -122,8 +122,10 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
try
{
PullingPipelineExecutor executor(io.pipeline);
if (!executor.pull(block))
PullingAsyncPipelineExecutor executor(io.pipeline);
while (block.rows() == 0 && executor.pull(block));
if (block.rows() == 0)
{
/// Interpret subquery with empty result as Null literal
auto ast_new = std::make_unique<ASTLiteral>(Null());
@ -132,7 +134,13 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
return;
}
if (block.rows() != 1 || executor.pull(block))
if (block.rows() != 1)
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
Block tmp_block;
while (tmp_block.rows() == 0 && executor.pull(tmp_block));
if (tmp_block.rows() != 0)
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
}
catch (const Exception & e)

View File

@ -54,7 +54,7 @@
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Parsers/formatAST.h>
namespace DB
@ -320,7 +320,7 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
auto interpreter_subquery = interpretSubquery(subquery_or_table_name, context, {}, query_options);
auto io = interpreter_subquery->execute();
PullingPipelineExecutor executor(io.pipeline);
PullingAsyncPipelineExecutor executor(io.pipeline);
SetPtr set = std::make_shared<Set>(settings.size_limits_for_set, true, context.getSettingsRef().transform_null_in);
set->setHeader(executor.getHeader());
@ -328,6 +328,9 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
Block block;
while (executor.pull(block))
{
if (block.rows() == 0)
continue;
/// If the limits have been exceeded, give up and let the default subquery processing actions take place.
if (!set->insertFromBlock(block))
return;

View File

@ -117,7 +117,7 @@ struct QueryPlanSettings
{
QueryPlan::ExplainPlanOptions query_plan_options;
/// Apply query plan optimisations.
/// Apply query plan optimizations.
bool optimize = true;
constexpr static char name[] = "PLAN";
@ -251,7 +251,7 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
interpreter.buildQueryPlan(plan);
if (settings.optimize)
plan.optimize();
plan.optimize(QueryPlanOptimizationSettings(context.getSettingsRef()));
plan.explainPlan(buf, settings.query_plan_options);
}
@ -265,7 +265,7 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), context, SelectQueryOptions());
interpreter.buildQueryPlan(plan);
auto pipeline = plan.buildQueryPipeline();
auto pipeline = plan.buildQueryPipeline(QueryPlanOptimizationSettings(context.getSettingsRef()));
if (settings.graph)
{

View File

@ -548,7 +548,7 @@ BlockIO InterpreterSelectQuery::execute()
buildQueryPlan(query_plan);
res.pipeline = std::move(*query_plan.buildQueryPipeline());
res.pipeline = std::move(*query_plan.buildQueryPipeline(QueryPlanOptimizationSettings(context->getSettingsRef())));
return res;
}

View File

@ -413,7 +413,7 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
QueryPlan query_plan;
buildQueryPlan(query_plan);
auto pipeline = query_plan.buildQueryPipeline();
auto pipeline = query_plan.buildQueryPipeline(QueryPlanOptimizationSettings(context->getSettingsRef()));
res.pipeline = std::move(*pipeline);
res.pipeline.addInterpreterContext(context);

View File

@ -756,7 +756,7 @@ QueryPipelinePtr MutationsInterpreter::addStreamsForLaterStages(const std::vecto
}
}
auto pipeline = plan.buildQueryPipeline();
auto pipeline = plan.buildQueryPipeline(QueryPlanOptimizationSettings(context.getSettingsRef()));
pipeline->addSimpleTransform([&](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);

View File

@ -14,6 +14,7 @@ struct PullingAsyncPipelineExecutor::Data
{
PipelineExecutorPtr executor;
std::exception_ptr exception;
LazyOutputFormat * lazy_format = nullptr;
std::atomic_bool is_finished = false;
std::atomic_bool has_exception = false;
ThreadFromGlobalPool thread;
@ -82,6 +83,10 @@ static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGrou
{
data.exception = std::current_exception();
data.has_exception = true;
/// Finish lazy format in case of exception. Otherwise thread.join() may hung.
if (data.lazy_format)
data.lazy_format->finalize();
}
data.is_finished = true;
@ -95,6 +100,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
{
data = std::make_unique<Data>();
data->executor = pipeline.execute();
data->lazy_format = lazy_format.get();
auto func = [&, thread_group = CurrentThread::getGroup()]()
{
@ -105,14 +111,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
}
if (data->has_exception)
{
/// Finish lazy format in case of exception. Otherwise thread.join() may hung.
if (lazy_format)
lazy_format->finish();
data->has_exception = false;
std::rethrow_exception(std::move(data->exception));
}
bool is_execution_finished = lazy_format ? lazy_format->isFinished()
: data->is_finished.load();
@ -121,7 +120,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
{
/// If lazy format is finished, we don't cancel pipeline but wait for main thread to be finished.
data->is_finished = true;
/// Wait thread ant rethrow exception if any.
/// Wait thread and rethrow exception if any.
cancel();
return false;
}
@ -133,7 +132,12 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
}
chunk.clear();
if (milliseconds)
data->finish_event.tryWait(milliseconds);
else
data->finish_event.wait();
return true;
}

View File

@ -17,6 +17,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/castColumn.h>
#include <IO/ReadHelpers.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
@ -589,7 +590,7 @@ bool ConstantExpressionTemplate::parseLiteralAndAssertType(ReadBuffer & istr, co
}
}
ColumnPtr ConstantExpressionTemplate::evaluateAll(BlockMissingValues & nulls, size_t column_idx, size_t offset)
ColumnPtr ConstantExpressionTemplate::evaluateAll(BlockMissingValues & nulls, size_t column_idx, const DataTypePtr & expected_type, size_t offset)
{
Block evaluated = structure->literals.cloneWithColumns(std::move(columns));
columns = structure->literals.cloneEmptyColumns();
@ -607,12 +608,13 @@ ColumnPtr ConstantExpressionTemplate::evaluateAll(BlockMissingValues & nulls, si
ErrorCodes::LOGICAL_ERROR);
rows_count = 0;
ColumnPtr res = evaluated.getByName(structure->result_column_name).column->convertToFullColumnIfConst();
auto res = evaluated.getByName(structure->result_column_name);
res.column = res.column->convertToFullColumnIfConst();
if (!structure->null_as_default)
return res;
return castColumn(res, expected_type);
/// Extract column with evaluated expression and mask for NULLs
const auto & tuple = assert_cast<const ColumnTuple &>(*res);
const auto & tuple = assert_cast<const ColumnTuple &>(*res.column);
if (tuple.tupleSize() != 2)
throw Exception("Invalid tuple size, it'a a bug", ErrorCodes::LOGICAL_ERROR);
const auto & is_null = assert_cast<const ColumnUInt8 &>(tuple.getColumn(1));
@ -621,7 +623,9 @@ ColumnPtr ConstantExpressionTemplate::evaluateAll(BlockMissingValues & nulls, si
if (is_null.getUInt(i))
nulls.setBit(column_idx, offset + i);
return tuple.getColumnPtr(0);
res.column = tuple.getColumnPtr(0);
res.type = assert_cast<const DataTypeTuple &>(*res.type).getElements()[0];
return castColumn(res, expected_type);
}
void ConstantExpressionTemplate::TemplateStructure::addNodesToCastResult(const IDataType & result_column_type, ASTPtr & expr, bool null_as_default)

View File

@ -72,7 +72,7 @@ public:
/// Evaluate batch of expressions were parsed using template.
/// If template was deduced with null_as_default == true, set bits in nulls for NULL values in column_idx, starting from offset.
ColumnPtr evaluateAll(BlockMissingValues & nulls, size_t column_idx, size_t offset = 0);
ColumnPtr evaluateAll(BlockMissingValues & nulls, size_t column_idx, const DataTypePtr & expected_type, size_t offset = 0);
size_t rowsCount() const { return rows_count; }

View File

@ -73,11 +73,13 @@ Chunk ValuesBlockInputFormat::generate()
{
if (!templates[i] || !templates[i]->rowsCount())
continue;
const auto & expected_type = header.getByPosition(i).type;
if (columns[i]->empty())
columns[i] = IColumn::mutate(templates[i]->evaluateAll(block_missing_values, i));
columns[i] = IColumn::mutate(templates[i]->evaluateAll(block_missing_values, i, expected_type));
else
{
ColumnPtr evaluated = templates[i]->evaluateAll(block_missing_values, i, columns[i]->size());
ColumnPtr evaluated = templates[i]->evaluateAll(block_missing_values, i, expected_type, columns[i]->size());
columns[i]->insertRangeFrom(*evaluated, 0, evaluated->size());
}
}
@ -135,13 +137,16 @@ bool ValuesBlockInputFormat::tryParseExpressionUsingTemplate(MutableColumnPtr &
return true;
}
const auto & header = getPort().getHeader();
const auto & expected_type = header.getByPosition(column_idx).type;
/// Expression in the current row is not match template deduced on the first row.
/// Evaluate expressions, which were parsed using this template.
if (column->empty())
column = IColumn::mutate(templates[column_idx]->evaluateAll(block_missing_values, column_idx));
column = IColumn::mutate(templates[column_idx]->evaluateAll(block_missing_values, column_idx, expected_type));
else
{
ColumnPtr evaluated = templates[column_idx]->evaluateAll(block_missing_values, column_idx, column->size());
ColumnPtr evaluated = templates[column_idx]->evaluateAll(block_missing_values, column_idx, expected_type, column->size());
column->insertRangeFrom(*evaluated, 0, evaluated->size());
}
/// Do not use this template anymore

View File

@ -16,8 +16,13 @@ Chunk LazyOutputFormat::getChunk(UInt64 milliseconds)
}
Chunk chunk;
if (milliseconds)
{
if (!queue.tryPop(chunk, milliseconds))
return {};
}
else
queue.pop(chunk);
if (chunk)
info.update(chunk.getNumRows(), chunk.allocatedBytes());

View File

@ -36,6 +36,14 @@ public:
queue.clear();
}
void finalize() override
{
finished_processing = true;
/// In case we are waiting for result.
queue.emplace(Chunk());
}
protected:
void consume(Chunk chunk) override
{
@ -46,14 +54,6 @@ protected:
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
void finalize() override
{
finished_processing = true;
/// In case we are waiting for result.
queue.emplace(Chunk());
}
private:
ConcurrentBoundedQueue<Chunk> queue;

View File

@ -32,6 +32,8 @@ public:
void describeActions(FormatSettings &) const override;
void describePipeline(FormatSettings & settings) const override;
const Aggregator::Params & getParams() const { return params; }
private:
Aggregator::Params params;
bool final;

View File

@ -34,7 +34,7 @@ private:
class CreatingSetsStep : public IQueryPlanStep
{
public:
CreatingSetsStep(DataStreams input_streams_);
explicit CreatingSetsStep(DataStreams input_streams_);
String getName() const override { return "CreatingSets"; }

View File

@ -43,4 +43,9 @@ void CubeStep::transformPipeline(QueryPipeline & pipeline)
});
}
const Aggregator::Params & CubeStep::getParams() const
{
return params->params;
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <Interpreters/Aggregator.h>
namespace DB
{
@ -18,6 +19,7 @@ public:
void transformPipeline(QueryPipeline & pipeline) override;
const Aggregator::Params & getParams() const;
private:
AggregatingTransformParamsPtr params;
};

View File

@ -17,6 +17,8 @@ public:
void describeActions(FormatSettings & settings) const override;
const SortDescription & getSortDescription() const { return sort_description; }
private:
SortDescription sort_description;
};

View File

@ -1,39 +0,0 @@
#include <Processors/QueryPlan/MaterializingStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <DataStreams/materializeBlock.h>
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}
MaterializingStep::MaterializingStep(const DataStream & input_stream_)
: ITransformingStep(input_stream_, materializeBlock(input_stream_.header), getTraits())
{
}
void MaterializingStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);
});
}
}

View File

@ -1,18 +0,0 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
/// Materialize constants. See MaterializingTransform.
class MaterializingStep : public ITransformingStep
{
public:
explicit MaterializingStep(const DataStream & input_stream_);
String getName() const override { return "Materializing"; }
void transformPipeline(QueryPipeline & pipeline) override;
};
}

View File

@ -9,7 +9,7 @@ namespace QueryPlanOptimizations
{
/// This is the main function which optimizes the whole QueryPlan tree.
void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes);
void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes);
/// Optimization is a function applied to QueryPlan::Node.
/// It can read and update subtree of specified node.
@ -38,14 +38,19 @@ size_t trySplitFilter(QueryPlan::Node * node, QueryPlan::Nodes & nodes);
/// Replace chain `FilterStep -> ExpressionStep` to single FilterStep
size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &);
/// Move FilterStep down if possible.
/// May split FilterStep and push down only part of it.
size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
inline const auto & getOptimizations()
{
static const std::array<Optimization, 4> optimizations =
static const std::array<Optimization, 5> optimizations =
{{
{tryLiftUpArrayJoin, "liftUpArrayJoin"},
{tryPushDownLimit, "pushDownLimit"},
{trySplitFilter, "splitFilter"},
{tryMergeExpressions, "mergeExpressions"},
{tryPushDownFilter, "pushDownFilter"},
}};
return optimizations;

View File

@ -0,0 +1,12 @@
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Core/Settings.h>
namespace DB
{
QueryPlanOptimizationSettings::QueryPlanOptimizationSettings(const Settings & settings)
{
max_optimizations_to_apply = settings.query_plan_max_optimizations_to_apply;
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <cstddef>
namespace DB
{
struct Settings;
struct QueryPlanOptimizationSettings
{
QueryPlanOptimizationSettings() = delete;
explicit QueryPlanOptimizationSettings(const Settings & settings);
/// If not zero, throw if too many optimizations were applied to query plan.
/// It helps to avoid infinite optimization loop.
size_t max_optimizations_to_apply = 0;
};
}

View File

@ -0,0 +1,204 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/IColumn.h>
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace DB::QueryPlanOptimizations
{
static size_t tryAddNewFilterStep(
QueryPlan::Node * parent_node,
QueryPlan::Nodes & nodes,
const Names & allowed_inputs)
{
QueryPlan::Node * child_node = parent_node->children.front();
auto & parent = parent_node->step;
auto & child = child_node->step;
auto * filter = static_cast<FilterStep *>(parent.get());
const auto & expression = filter->getExpression();
const auto & filter_column_name = filter->getFilterColumnName();
bool removes_filter = filter->removesFilterColumn();
// std::cerr << "Filter: \n" << expression->dumpDAG() << std::endl;
auto split_filter = expression->splitActionsForFilter(filter_column_name, removes_filter, allowed_inputs);
if (!split_filter)
return 0;
// std::cerr << "===============\n" << expression->dumpDAG() << std::endl;
// std::cerr << "---------------\n" << split_filter->dumpDAG() << std::endl;
const auto & index = expression->getIndex();
auto it = index.begin();
for (; it != index.end(); ++it)
if ((*it)->result_name == filter_column_name)
break;
const bool found_filter_column = it != expression->getIndex().end();
if (!found_filter_column && !removes_filter)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Filter column {} was removed from ActionsDAG but it is needed in result. DAG:\n{}",
filter_column_name, expression->dumpDAG());
/// Filter column was replaced to constant.
const bool filter_is_constant = found_filter_column && (*it)->column && isColumnConst(*(*it)->column);
if (!found_filter_column || filter_is_constant)
/// This means that all predicates of filter were pused down.
/// Replace current actions to expression, as we don't need to filter anything.
parent = std::make_unique<ExpressionStep>(child->getOutputStream(), expression);
/// Add new Filter step before Aggregating.
/// Expression/Filter -> Aggregating -> Something
auto & node = nodes.emplace_back();
node.children.swap(child_node->children);
child_node->children.emplace_back(&node);
/// Expression/Filter -> Aggregating -> Filter -> Something
/// New filter column is added to the end.
auto split_filter_column_name = (*split_filter->getIndex().rbegin())->result_name;
node.step = std::make_unique<FilterStep>(
node.children.at(0)->step->getOutputStream(),
std::move(split_filter), std::move(split_filter_column_name), true);
return 3;
}
static Names getAggregatinKeys(const Aggregator::Params & params)
{
Names keys;
keys.reserve(params.keys.size());
for (auto pos : params.keys)
keys.push_back(params.src_header.getByPosition(pos).name);
return keys;
}
size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes)
{
if (parent_node->children.size() != 1)
return 0;
QueryPlan::Node * child_node = parent_node->children.front();
auto & parent = parent_node->step;
auto & child = child_node->step;
auto * filter = typeid_cast<FilterStep *>(parent.get());
if (!filter)
return 0;
if (filter->getExpression()->hasStatefulFunctions())
return 0;
if (auto * aggregating = typeid_cast<AggregatingStep *>(child.get()))
{
const auto & params = aggregating->getParams();
Names keys = getAggregatinKeys(params);
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, keys))
return updated_steps;
}
if (auto * totals_having = typeid_cast<TotalsHavingStep *>(child.get()))
{
/// If totals step has HAVING expression, skip it for now.
/// TODO:
/// We can merge HAING expression with current filer.
/// Also, we can push down part of HAVING which depend only on aggregation keys.
if (totals_having->getActions())
return 0;
Names keys;
const auto & header = totals_having->getInputStreams().front().header;
for (const auto & column : header)
if (typeid_cast<const DataTypeAggregateFunction *>(column.type.get()) == nullptr)
keys.push_back(column.name);
/// NOTE: this optimization changes TOTALS value. Example:
/// `select * from (select y, sum(x) from (
/// select number as x, number % 4 as y from numbers(10)
/// ) group by y with totals) where y != 2`
/// Optimization will replace totals row `y, sum(x)` from `(0, 45)` to `(0, 37)`.
/// It is expected to ok, cause AST optimization `enable_optimize_predicate_expression = 1` also brakes it.
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, keys))
return updated_steps;
}
if (auto * array_join = typeid_cast<ArrayJoinStep *>(child.get()))
{
const auto & array_join_actions = array_join->arrayJoin();
const auto & keys = array_join_actions->columns;
const auto & array_join_header = array_join->getInputStreams().front().header;
Names allowed_inputs;
for (const auto & column : array_join_header)
if (keys.count(column.name) == 0)
allowed_inputs.push_back(column.name);
// for (const auto & name : allowed_inputs)
// std::cerr << name << std::endl;
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
return updated_steps;
}
if (auto * distinct = typeid_cast<DistinctStep *>(child.get()))
{
Names allowed_inputs = distinct->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
return updated_steps;
}
/// TODO.
/// We can filter earlier if expression does not depend on WITH FILL columns.
/// But we cannot just push down condition, because other column may be filled with defaults.
///
/// It is possible to filter columns before and after WITH FILL, but such change is not idempotent.
/// So, appliying this to pair (Filter -> Filling) several times will create several similar filters.
// if (auto * filling = typeid_cast<FillingStep *>(child.get()))
// {
// }
/// Same reason for Cube
// if (auto * cube = typeid_cast<CubeStep *>(child.get()))
// {
// }
if (typeid_cast<PartialSortingStep *>(child.get())
|| typeid_cast<MergeSortingStep *>(child.get())
|| typeid_cast<MergingSortedStep *>(child.get())
|| typeid_cast<FinishSortingStep *>(child.get()))
{
Names allowed_inputs = child->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
return updated_steps;
}
return 0;
}
}

View File

@ -1,10 +1,20 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Common/Exception.h>
#include <stack>
namespace DB::QueryPlanOptimizations
namespace DB
{
void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
namespace ErrorCodes
{
extern const int TOO_MANY_QUERY_PLAN_OPTIMIZATIONS;
}
namespace QueryPlanOptimizations
{
void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes)
{
const auto & optimizations = getOptimizations();
@ -23,6 +33,9 @@ void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
std::stack<Frame> stack;
stack.push(Frame{.node = &root});
size_t max_optimizations_to_apply = settings.max_optimizations_to_apply;
size_t total_applied_optimizations = 0;
while (!stack.empty())
{
auto & frame = stack.top();
@ -54,8 +67,15 @@ void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
if (!optimization.apply)
continue;
if (max_optimizations_to_apply && max_optimizations_to_apply < total_applied_optimizations)
throw Exception(ErrorCodes::TOO_MANY_QUERY_PLAN_OPTIMIZATIONS,
"Too many optimizations applied to query plan. Current limit {}",
max_optimizations_to_apply);
/// Try to apply optimization.
auto update_depth = optimization.apply(frame.node, nodes);
if (update_depth)
++total_applied_optimizations;
max_update_depth = std::max<size_t>(max_update_depth, update_depth);
}
@ -73,3 +93,4 @@ void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
}
}
}

View File

@ -130,10 +130,10 @@ void QueryPlan::addStep(QueryPlanStepPtr step)
" input expected", ErrorCodes::LOGICAL_ERROR);
}
QueryPipelinePtr QueryPlan::buildQueryPipeline()
QueryPipelinePtr QueryPlan::buildQueryPipeline(const QueryPlanOptimizationSettings & optimization_settings)
{
checkInitialized();
optimize();
optimize(optimization_settings);
struct Frame
{
@ -177,7 +177,7 @@ QueryPipelinePtr QueryPlan::buildQueryPipeline()
return last_pipeline;
}
Pipe QueryPlan::convertToPipe()
Pipe QueryPlan::convertToPipe(const QueryPlanOptimizationSettings & optimization_settings)
{
if (!isInitialized())
return {};
@ -185,7 +185,7 @@ Pipe QueryPlan::convertToPipe()
if (isCompleted())
throw Exception("Cannot convert completed QueryPlan to Pipe", ErrorCodes::LOGICAL_ERROR);
return QueryPipeline::getPipe(std::move(*buildQueryPipeline()));
return QueryPipeline::getPipe(std::move(*buildQueryPipeline(optimization_settings)));
}
void QueryPlan::addInterpreterContext(std::shared_ptr<Context> context)
@ -333,9 +333,9 @@ void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptio
}
}
void QueryPlan::optimize()
void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_settings)
{
QueryPlanOptimizations::optimizeTree(*root, nodes);
QueryPlanOptimizations::optimizeTree(optimization_settings, *root, nodes);
}
}

View File

@ -5,6 +5,7 @@
#include <set>
#include <Core/Names.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
namespace DB
{
@ -27,7 +28,7 @@ class Pipe;
/// A tree of query steps.
/// The goal of QueryPlan is to build QueryPipeline.
/// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimisations.
/// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimizations.
class QueryPlan
{
public:
@ -43,12 +44,12 @@ public:
bool isCompleted() const; /// Tree is not empty and root hasOutputStream()
const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted())
void optimize();
void optimize(const QueryPlanOptimizationSettings & optimization_settings);
QueryPipelinePtr buildQueryPipeline();
QueryPipelinePtr buildQueryPipeline(const QueryPlanOptimizationSettings & optimization_settings);
/// If initialized, build pipeline and convert to pipe. Otherwise, return empty pipe.
Pipe convertToPipe();
Pipe convertToPipe(const QueryPlanOptimizationSettings & optimization_settings);
struct ExplainPlanOptions
{

View File

@ -28,6 +28,8 @@ public:
void describeActions(FormatSettings & settings) const override;
const ActionsDAGPtr & getActions() const { return actions_dag; }
private:
bool overflow_row;
ActionsDAGPtr actions_dag;

View File

@ -108,12 +108,13 @@ SRCS(
QueryPlan/ITransformingStep.cpp
QueryPlan/LimitByStep.cpp
QueryPlan/LimitStep.cpp
QueryPlan/MaterializingStep.cpp
QueryPlan/MergeSortingStep.cpp
QueryPlan/MergingAggregatedStep.cpp
QueryPlan/MergingFinal.cpp
QueryPlan/MergingSortedStep.cpp
QueryPlan/OffsetStep.cpp
QueryPlan/Optimizations/QueryPlanOptimizationSettings.cpp
QueryPlan/Optimizations/filterPushDown.cpp
QueryPlan/Optimizations/liftUpArrayJoin.cpp
QueryPlan/Optimizations/limitPushDown.cpp
QueryPlan/Optimizations/mergeExpressions.cpp

View File

@ -33,7 +33,7 @@ public:
std::move(*MergeTreeDataSelectExecutor(part->storage)
.readFromParts({part}, column_names, metadata_snapshot, query_info, context, max_block_size, num_streams));
return query_plan.convertToPipe();
return query_plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}

View File

@ -166,7 +166,7 @@ Pipe StorageBuffer::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}
void StorageBuffer::read(

View File

@ -501,7 +501,7 @@ Pipe StorageDistributed::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}
void StorageDistributed::read(

View File

@ -127,7 +127,7 @@ Pipe StorageMaterializedView::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}
void StorageMaterializedView::read(

View File

@ -198,7 +198,7 @@ Pipe StorageMergeTree::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}
std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const

View File

@ -3821,7 +3821,7 @@ Pipe StorageReplicatedMergeTree::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}

View File

@ -15,7 +15,6 @@
#include <Processors/Pipe.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/QueryPlan/MaterializingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
@ -60,7 +59,7 @@ Pipe StorageView::read(
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe();
return plan.convertToPipe(QueryPlanOptimizationSettings(context.getSettingsRef()));
}
void StorageView::read(
@ -87,7 +86,10 @@ void StorageView::read(
/// It's expected that the columns read from storage are not constant.
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
auto materializing = std::make_unique<MaterializingStep>(query_plan.getCurrentDataStream());
auto materializing_actions = std::make_shared<ActionsDAG>(query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName());
materializing_actions->addMaterializingOutputActions();
auto materializing = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), std::move(materializing_actions));
materializing->setStepDescription("Materialize constants after VIEW subquery");
query_plan.addStep(std::move(materializing));

View File

@ -0,0 +1,5 @@
<yandex>
<distributed_ddl>
<pool_size replace="1">2</pool_size>
</distributed_ddl>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<distributed_ddl>
<pool_size replace="1">20</pool_size>
</distributed_ddl>
</yandex>

View File

@ -1,7 +1,7 @@
<?xml version="1.0"?>
<yandex>
<dictionary>
<name>slow_dict</name>
<name>slow_dict_7</name>
<source>
<executable>
<command>sleep 7</command>
@ -23,4 +23,28 @@
</structure>
<lifetime>0</lifetime>
</dictionary>
<dictionary>
<name>slow_dict_3</name>
<source>
<executable>
<command>sleep 3</command>
<format>TabSeparated</format>
</executable>
</source>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
</yandex>

View File

@ -1,6 +1,6 @@
<yandex>
<remote_servers>
<cluster>
<cluster_a>
<shard>
<replica>
<host>n1</host>
@ -13,6 +13,20 @@
<port>9000</port>
</replica>
</shard>
</cluster>
</cluster_a>
<cluster_b>
<shard>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
</shard>
</cluster_b>
</remote_servers>
</yandex>

View File

@ -10,11 +10,31 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
def add_instance(name):
# By default the exceptions that was throwed in threads will be ignored
# (they will not mark the test as failed, only printed to stderr).
#
# Wrap thrading.Thread and re-throw exception on join()
class SafeThread(threading.Thread):
def __init__(self, target):
super().__init__()
self.target = target
self.exception = None
def run(self):
try:
self.target()
except Exception as e: # pylint: disable=broad-except
self.exception = e
def join(self, timeout=None):
super().join(timeout)
if self.exception:
raise self.exception
def add_instance(name, ddl_config=None):
main_configs=[
'configs/ddl.xml',
'configs/remote_servers.xml',
]
if ddl_config:
main_configs.append(ddl_config)
dictionaries=[
'configs/dict.xml',
]
@ -24,8 +44,12 @@ def add_instance(name):
with_zookeeper=True)
initiator = add_instance('initiator')
n1 = add_instance('n1')
n2 = add_instance('n2')
# distributed_ddl.pool_size = 2
n1 = add_instance('n1', 'configs/ddl_a.xml')
n2 = add_instance('n2', 'configs/ddl_a.xml')
# distributed_ddl.pool_size = 20
n3 = add_instance('n3', 'configs/ddl_b.xml')
n4 = add_instance('n4', 'configs/ddl_b.xml')
@pytest.fixture(scope='module', autouse=True)
def start_cluster():
@ -49,17 +73,32 @@ def longer_then(sec):
return inner
return wrapper
# It takes 7 seconds to load slow_dict.
def thread_reload_dictionary():
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster slow_dict')
# It takes 7 seconds to load slow_dict_7.
def execute_reload_dictionary_slow_dict_7():
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_a slow_dict_7', settings={
'distributed_ddl_task_timeout': 60,
})
def execute_reload_dictionary_slow_dict_3():
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_b slow_dict_3', settings={
'distributed_ddl_task_timeout': 60,
})
def execute_smoke_query():
initiator.query('DROP DATABASE IF EXISTS foo ON CLUSTER cluster_b', settings={
'distributed_ddl_task_timeout': 60,
})
def check_log():
# ensure that none of tasks processed multiple times
for _, instance in list(cluster.instances.items()):
assert not instance.contains_in_log('Coordination::Exception: Node exists')
# NOTE: uses inner function to exclude slow start_cluster() from timeout.
def test_dict_load():
def test_slow_dict_load_7():
@pytest.mark.timeout(10)
@longer_then(7)
def inner_test():
initiator.query('SYSTEM RELOAD DICTIONARY slow_dict')
initiator.query('SYSTEM RELOAD DICTIONARY slow_dict_7')
inner_test()
def test_all_in_parallel():
@ -68,12 +107,13 @@ def test_all_in_parallel():
def inner_test():
threads = []
for _ in range(2):
threads.append(threading.Thread(target=thread_reload_dictionary))
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
thread.join(70)
inner_test()
check_log()
def test_two_in_parallel_two_queued():
@pytest.mark.timeout(19)
@ -81,9 +121,35 @@ def test_two_in_parallel_two_queued():
def inner_test():
threads = []
for _ in range(4):
threads.append(threading.Thread(target=thread_reload_dictionary))
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
thread.join(70)
inner_test()
check_log()
def test_smoke():
for _ in range(100):
execute_smoke_query()
check_log()
def test_smoke_parallel():
threads = []
for _ in range(100):
threads.append(SafeThread(target=execute_smoke_query))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
check_log()
def test_smoke_parallel_dict_reload():
threads = []
for _ in range(100):
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_3))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
check_log()

View File

@ -0,0 +1,7 @@
<test>
<create_query>create table tab (a UInt32, b UInt32) engine = MergeTree order by (a, b)</create_query>
<fill_query>insert into tab values (1, 1)</fill_query>
<query>select a, b from tab where (a, b) in (select toUInt32(number) as x, toUInt32(sleep(0.1) + 1) from numbers_mt(16)) settings max_threads = 2, max_block_size = 4</query>
<query>select a, b from tab where (1, 1) = (select min(toUInt32(number + 1)) as x, min(toUInt32(sleep(0.1) + 1)) from numbers_mt(16)) settings max_threads = 2, max_block_size = 4</query>
<drop_query>DROP TABLE tab</drop_query>
</test>

View File

@ -7,3 +7,4 @@ SELECT (SELECT toDate('2015-01-02'), 'Hello');
SELECT (SELECT toDate('2015-01-02'), 'Hello') AS x, x, identity((SELECT 1)), identity((SELECT 1) AS y);
-- SELECT (SELECT uniqState(''));
SELECT ( SELECT throwIf(1 + dummy) ); -- { serverError 395 }

View File

@ -1,6 +1,6 @@
1 1
0 2
0 1
-
test1 10 0

View File

@ -0,0 +1,4 @@
---- arrays ----
6360452672161319041
---- window f ----
6360452672161319041

View File

@ -0,0 +1,36 @@
drop table if exists stack;
set allow_experimental_window_functions = 1;
set max_insert_threads = 4;
create table stack(item_id Int64, brand_id Int64, rack_id Int64, dt DateTime, expiration_dt DateTime, quantity UInt64)
Engine = MergeTree
partition by toYYYYMM(dt)
order by (brand_id, toStartOfHour(dt));
insert into stack
select number%99991, number%11, number%1111, toDateTime('2020-01-01 00:00:00')+number/100,
toDateTime('2020-02-01 00:00:00')+number/10, intDiv(number,100)+1
from numbers_mt(10000000);
select '---- arrays ----';
select cityHash64( toString( groupArray (tuple(*) ) )) from (
select brand_id, rack_id, arrayJoin(arraySlice(arraySort(groupArray(quantity)),1,2)) quantity
from stack
group by brand_id, rack_id
order by brand_id, rack_id, quantity
) t;
select '---- window f ----';
select cityHash64( toString( groupArray (tuple(*) ) )) from (
select brand_id, rack_id, quantity from
( select brand_id, rack_id, quantity, row_number() over (partition by brand_id, rack_id order by quantity) rn
from stack ) as t0
where rn <= 2
order by brand_id, rack_id, quantity
) t;
drop table if exists stack;

View File

@ -0,0 +1,81 @@
---- Q1 ----
Dell Vostro 800.00 Laptop 850
HP Elite 1200.00 Laptop 850
Lenovo Thinkpad 700.00 Laptop 850
Sony VAIO 700.00 Laptop 850
HTC One 400.00 Smartphone 500
Microsoft Lumia 200.00 Smartphone 500
Nexus 500.00 Smartphone 500
iPhone 900.00 Smartphone 500
Kindle Fire 150.00 Tablet 350
Samsung Galaxy Tab 200.00 Tablet 350
iPad 700.00 Tablet 350
---- Q2 ----
Lenovo Thinkpad Laptop 700.00 1
Sony VAIO Laptop 700.00 1
Dell Vostro Laptop 800.00 3
HP Elite Laptop 1200.00 4
Microsoft Lumia Smartphone 200.00 1
HTC One Smartphone 400.00 2
Nexus Smartphone 500.00 3
iPhone Smartphone 900.00 4
Kindle Fire Tablet 150.00 1
Samsung Galaxy Tab Tablet 200.00 2
iPad Tablet 700.00 3
---- Q3 ----
HP Elite Laptop 1200.00 1
Dell Vostro Laptop 800.00 2
Lenovo Thinkpad Laptop 700.00 3
Sony VAIO Laptop 700.00 4
iPhone Smartphone 900.00 1
Nexus Smartphone 500.00 2
HTC One Smartphone 400.00 3
Microsoft Lumia Smartphone 200.00 4
iPad Tablet 700.00 1
Samsung Galaxy Tab Tablet 200.00 2
Kindle Fire Tablet 150.00 3
---- Q4 ----
Lenovo Thinkpad Laptop 700.00 700.00 1
Sony VAIO Laptop 700.00 700.00 1
Dell Vostro Laptop 800.00 700.00 2
HP Elite Laptop 1200.00 700.00 3
Microsoft Lumia Smartphone 200.00 200.00 1
HTC One Smartphone 400.00 200.00 2
Nexus Smartphone 500.00 200.00 3
iPhone Smartphone 900.00 200.00 4
---- Q5 ----
Sony VAIO Laptop 700.00 700.00
Lenovo Thinkpad Laptop 700.00 700.00
HP Elite Laptop 1200.00 700.00
Dell Vostro Laptop 800.00 700.00
iPhone Smartphone 900.00 900.00
Nexus Smartphone 500.00 900.00
Microsoft Lumia Smartphone 200.00 900.00
HTC One Smartphone 400.00 900.00
iPad Tablet 700.00 700.00
Samsung Galaxy Tab Tablet 200.00 700.00
Kindle Fire Tablet 150.00 700.00
---- Q6 ----
Dell Vostro Laptop 800.00 1200.00
HP Elite Laptop 1200.00 1200.00
Lenovo Thinkpad Laptop 700.00 1200.00
Sony VAIO Laptop 700.00 1200.00
HTC One Smartphone 400.00 900.00
Microsoft Lumia Smartphone 200.00 900.00
Nexus Smartphone 500.00 900.00
iPhone Smartphone 900.00 900.00
Kindle Fire Tablet 150.00 700.00
Samsung Galaxy Tab Tablet 200.00 700.00
iPad Tablet 700.00 700.00
---- Q7 ----
Dell Vostro 800.00 Laptop 733 850
HP Elite 1200.00 Laptop 850 850
Lenovo Thinkpad 700.00 Laptop 700 850
Sony VAIO 700.00 Laptop 700 850
HTC One 400.00 Smartphone 300 500
Microsoft Lumia 200.00 Smartphone 200 500
Nexus 500.00 Smartphone 367 500
iPhone 900.00 Smartphone 500 500
Kindle Fire 150.00 Tablet 150 350
Samsung Galaxy Tab 200.00 Tablet 175 350
iPad 700.00 Tablet 350 350

View File

@ -0,0 +1,107 @@
set allow_experimental_window_functions = 1;
drop table if exists product_groups;
drop table if exists products;
CREATE TABLE product_groups (
group_id Int64,
group_name String
) Engine = Memory;
CREATE TABLE products (
product_id Int64,
product_name String,
price DECIMAL(11, 2),
group_id Int64
) Engine = Memory;
INSERT INTO product_groups VALUES (1, 'Smartphone'),(2, 'Laptop'),(3, 'Tablet');
INSERT INTO products (product_id,product_name, group_id,price) VALUES (1, 'Microsoft Lumia', 1, 200), (2, 'HTC One', 1, 400), (3, 'Nexus', 1, 500), (4, 'iPhone', 1, 900),(5, 'HP Elite', 2, 1200),(6, 'Lenovo Thinkpad', 2, 700),(7, 'Sony VAIO', 2, 700),(8, 'Dell Vostro', 2, 800),(9, 'iPad', 3, 700),(10, 'Kindle Fire', 3, 150),(11, 'Samsung Galaxy Tab', 3, 200);
select '---- Q1 ----';
SELECT
product_name,
price,
group_name,
AVG(price) OVER (PARTITION BY group_name)
FROM products INNER JOIN product_groups USING (group_id)
order by group_name, product_name, price;
select '---- Q2 ----';
SELECT
product_name,
group_name,
price,
rank() OVER (PARTITION BY group_name ORDER BY price) rank
FROM products INNER JOIN product_groups USING (group_id)
order by group_name, rank, price;
select '---- Q3 ----';
SELECT
product_name,
group_name,
price,
row_number() OVER (PARTITION BY group_name ORDER BY price desc) rn
FROM products INNER JOIN product_groups USING (group_id)
ORDER BY group_name, rn;
select '---- Q4 ----';
SELECT *
FROM
(
SELECT
product_name,
group_name,
price,
min(price) OVER (PARTITION BY group_name) AS min_price,
dense_rank() OVER (PARTITION BY group_name ORDER BY price ASC) AS r
FROM products
INNER JOIN product_groups USING (group_id)
) AS t
WHERE min_price > 160
ORDER BY
group_name ASC,
r ASC,
product_name ASC;
select '---- Q5 ----';
SELECT
product_name,
group_name,
price,
FIRST_VALUE (price) OVER (PARTITION BY group_name ORDER BY product_name desc) AS price_per_group_per_alphab
FROM products INNER JOIN product_groups USING (group_id)
order by group_name, product_name desc;
select '---- Q6 ----';
SELECT
product_name,
group_name,
price,
LAST_VALUE (price) OVER (PARTITION BY group_name ORDER BY
price RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS highest_price_per_group
FROM
products
INNER JOIN product_groups USING (group_id)
order by group_name, product_name;
select '---- Q7 ----';
select product_name, price, group_name, round(avg0), round(avg1)
from (
SELECT
product_name,
price,
group_name,
avg(price) OVER (PARTITION BY group_name ORDER BY price) avg0,
avg(price) OVER (PARTITION BY group_name ORDER BY
price RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) avg1
FROM products INNER JOIN product_groups USING (group_id)) t
order by group_name, product_name, price;
drop table product_groups;
drop table products;

View File

@ -1,7 +1,9 @@
sipHash should be calculated after filtration
Too many optimizations applied to query plan
Too many optimizations applied to query plan
> sipHash should be calculated after filtration
FUNCTION sipHash64
Filter column: equals
sorting steps should know about limit
> sorting steps should know about limit
Limit 10
MergingSorted
Limit 10
@ -9,3 +11,115 @@ MergeSorting
Limit 10
PartialSorting
Limit 10
-- filter push down --
> filter should be pushed down after aggregating
Aggregating
Filter
0 1
1 2
2 3
3 4
4 5
5 6
6 7
7 8
8 9
9 10
> filter should be pushed down after aggregating, column after aggregation is const
COLUMN Const(UInt8) -> notEquals(y, 0)
Aggregating
Filter
Filter
0 1 1
1 2 1
2 3 1
3 4 1
4 5 1
5 6 1
6 7 1
7 8 1
8 9 1
9 10 1
> one condition of filter should be pushed down after aggregating, other condition is aliased
Filter column
ALIAS notEquals(s, 4) :: 1 -> and(notEquals(y, 0), notEquals(s, 4))
Aggregating
Filter column: notEquals(y, 0)
0 1
1 2
2 3
3 4
5 6
6 7
7 8
8 9
9 10
> one condition of filter should be pushed down after aggregating, other condition is casted
Filter column
FUNCTION CAST(minus(s, 4) :: 1, UInt8 :: 3) -> and(notEquals(y, 0), minus(s, 4))
Aggregating
Filter column: notEquals(y, 0)
0 1
1 2
2 3
3 4
5 6
6 7
7 8
8 9
9 10
> one condition of filter should be pushed down after aggregating, other two conditions are ANDed
Filter column
FUNCTION and(minus(s, 8) :: 1, minus(s, 4) :: 2) -> and(notEquals(y, 0), minus(s, 8), minus(s, 4))
Aggregating
Filter column: notEquals(y, 0)
0 1
1 2
2 3
3 4
5 6
6 7
7 8
9 10
> two conditions of filter should be pushed down after aggregating and ANDed, one condition is aliased
Filter column
ALIAS notEquals(s, 8) :: 1 -> and(notEquals(y, 0), notEquals(s, 8), minus(y, 4))
Aggregating
Filter column: and(notEquals(y, 0), minus(y, 4))
0 1
1 2
2 3
4 5
5 6
6 7
7 8
9 10
> filter is split, one part is filtered before ARRAY JOIN
Filter column: and(notEquals(y, 2), notEquals(x, 0))
ARRAY JOIN x
Filter column: notEquals(y, 2)
1 3
> filter is pushed down before Distinct
Distinct
Distinct
Filter column: notEquals(y, 2)
0 0
0 1
1 0
1 1
> filter is pushed down before sorting steps
MergingSorted
MergeSorting
PartialSorting
Filter column: and(notEquals(x, 0), notEquals(y, 0))
1 2
1 1
> filter is pushed down before TOTALS HAVING and aggregating
TotalsHaving
Aggregating
Filter column: notEquals(y, 2)
0 12
1 15
3 10
0 37

View File

@ -4,7 +4,149 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo "sipHash should be calculated after filtration"
$CLICKHOUSE_CLIENT -q "select x + 1 from (select y + 2 as x from (select dummy + 3 as y)) settings query_plan_max_optimizations_to_apply = 1" 2>&1 |
grep -o "Too many optimizations applied to query plan"
echo "> sipHash should be calculated after filtration"
$CLICKHOUSE_CLIENT -q "explain actions = 1 select sum(x), sum(y) from (select sipHash64(number) as x, bitAnd(number, 1024) as y from numbers_mt(1000000000) limit 1000000000) where y = 0" | grep -o "FUNCTION sipHash64\|Filter column: equals"
echo "sorting steps should know about limit"
echo "> sorting steps should know about limit"
$CLICKHOUSE_CLIENT -q "explain actions = 1 select number from (select number from numbers(500000000) order by -number) limit 10" | grep -o "MergingSorted\|MergeSorting\|PartialSorting\|Limit 10"
echo "-- filter push down --"
echo "> filter should be pushed down after aggregating"
$CLICKHOUSE_CLIENT -q "
explain select * from (select sum(x), y from (
select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0
settings enable_optimize_predicate_expression=0" | grep -o "Aggregating\|Filter"
$CLICKHOUSE_CLIENT -q "
select s, y from (select sum(x) as s, y from (
select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 order by s, y
settings enable_optimize_predicate_expression=0"
echo "> filter should be pushed down after aggregating, column after aggregation is const"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select s, y, y != 0 from (select sum(x) as s, y from (
select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0
settings enable_optimize_predicate_expression=0" | grep -o "Aggregating\|Filter\|COLUMN Const(UInt8) -> notEquals(y, 0)"
$CLICKHOUSE_CLIENT -q "
select s, y, y != 0 from (select sum(x) as s, y from (
select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 order by s, y, y != 0
settings enable_optimize_predicate_expression=0"
echo "> one condition of filter should be pushed down after aggregating, other condition is aliased"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select s, y from (
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 and s != 4
settings enable_optimize_predicate_expression=0" |
grep -o "Aggregating\|Filter column\|Filter column: notEquals(y, 0)\|ALIAS notEquals(s, 4) :: 1 -> and(notEquals(y, 0), notEquals(s, 4))"
$CLICKHOUSE_CLIENT -q "
select s, y from (
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 and s != 4 order by s, y
settings enable_optimize_predicate_expression=0"
echo "> one condition of filter should be pushed down after aggregating, other condition is casted"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select s, y from (
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 and s - 4
settings enable_optimize_predicate_expression=0" |
grep -o "Aggregating\|Filter column\|Filter column: notEquals(y, 0)\|FUNCTION CAST(minus(s, 4) :: 1, UInt8 :: 3) -> and(notEquals(y, 0), minus(s, 4))"
$CLICKHOUSE_CLIENT -q "
select s, y from (
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 and s - 4 order by s, y
settings enable_optimize_predicate_expression=0"
echo "> one condition of filter should be pushed down after aggregating, other two conditions are ANDed"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select s, y from (
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 and s - 8 and s - 4
settings enable_optimize_predicate_expression=0" |
grep -o "Aggregating\|Filter column\|Filter column: notEquals(y, 0)\|FUNCTION and(minus(s, 8) :: 1, minus(s, 4) :: 2) -> and(notEquals(y, 0), minus(s, 8), minus(s, 4))"
$CLICKHOUSE_CLIENT -q "
select s, y from (
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 and s - 8 and s - 4 order by s, y
settings enable_optimize_predicate_expression=0"
echo "> two conditions of filter should be pushed down after aggregating and ANDed, one condition is aliased"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select s, y from (
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 and s != 8 and y - 4
settings enable_optimize_predicate_expression=0" |
grep -o "Aggregating\|Filter column\|Filter column: and(notEquals(y, 0), minus(y, 4))\|ALIAS notEquals(s, 8) :: 1 -> and(notEquals(y, 0), notEquals(s, 8), minus(y, 4))"
$CLICKHOUSE_CLIENT -q "
select s, y from (
select sum(x) as s, y from (select number as x, number + 1 as y from numbers(10)) group by y
) where y != 0 and s != 8 and y - 4 order by s, y
settings enable_optimize_predicate_expression=0"
echo "> filter is split, one part is filtered before ARRAY JOIN"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select x, y from (
select range(number) as x, number + 1 as y from numbers(3)
) array join x where y != 2 and x != 0" |
grep -o "Filter column: and(notEquals(y, 2), notEquals(x, 0))\|ARRAY JOIN x\|Filter column: notEquals(y, 2)"
$CLICKHOUSE_CLIENT -q "
select x, y from (
select range(number) as x, number + 1 as y from numbers(3)
) array join x where y != 2 and x != 0 order by x, y"
# echo "> filter is split, one part is filtered before Aggregating and Cube"
# $CLICKHOUSE_CLIENT -q "
# explain actions = 1 select * from (
# select sum(x) as s, x, y from (select number as x, number + 1 as y from numbers(10)) group by x, y with cube
# ) where y != 0 and s != 4
# settings enable_optimize_predicate_expression=0" |
# grep -o "Cube\|Aggregating\|Filter column: notEquals(y, 0)"
# $CLICKHOUSE_CLIENT -q "
# select s, x, y from (
# select sum(x) as s, x, y from (select number as x, number + 1 as y from numbers(10)) group by x, y with cube
# ) where y != 0 and s != 4 order by s, x, y
# settings enable_optimize_predicate_expression=0"
echo "> filter is pushed down before Distinct"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select x, y from (
select distinct x, y from (select number % 2 as x, number % 3 as y from numbers(10))
) where y != 2
settings enable_optimize_predicate_expression=0" |
grep -o "Distinct\|Filter column: notEquals(y, 2)"
$CLICKHOUSE_CLIENT -q "
select x, y from (
select distinct x, y from (select number % 2 as x, number % 3 as y from numbers(10))
) where y != 2 order by x, y
settings enable_optimize_predicate_expression=0"
echo "> filter is pushed down before sorting steps"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select x, y from (
select number % 2 as x, number % 3 as y from numbers(6) order by y desc
) where x != 0 and y != 0
settings enable_optimize_predicate_expression = 0" |
grep -o "MergingSorted\|MergeSorting\|PartialSorting\|Filter column: and(notEquals(x, 0), notEquals(y, 0))"
$CLICKHOUSE_CLIENT -q "
select x, y from (
select number % 2 as x, number % 3 as y from numbers(6) order by y desc
) where x != 0 and y != 0
settings enable_optimize_predicate_expression = 0"
echo "> filter is pushed down before TOTALS HAVING and aggregating"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select * from (
select y, sum(x) from (select number as x, number % 4 as y from numbers(10)) group by y with totals
) where y != 2
settings enable_optimize_predicate_expression=0" |
grep -o "TotalsHaving\|Aggregating\|Filter column: notEquals(y, 2)"
$CLICKHOUSE_CLIENT -q "
select * from (
select y, sum(x) from (select number as x, number % 4 as y from numbers(10)) group by y with totals
) where y != 2"

View File

@ -0,0 +1 @@
a

View File

@ -0,0 +1,14 @@
drop table if exists lc_test;
CREATE TABLE lc_test
(
`id` LowCardinality(String)
)
ENGINE = MergeTree
PARTITION BY tuple()
ORDER BY id;
insert into lc_test values (toString('a'));
select id from lc_test;
drop table if exists lc_test;