Merge remote-tracking branch 'origin' into fix-style

This commit is contained in:
Yatsishin Ilya 2021-08-23 17:18:43 +03:00
commit 121df40a97
109 changed files with 1946 additions and 527 deletions

View File

@ -3,7 +3,7 @@ I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla
Changelog category (leave one):
- New Feature
- Improvement
- Bug Fix
- Bug Fix (user-visible misbehaviour in official stable or prestable release)
- Performance Improvement
- Backward Incompatible Change
- Build/Testing/Packaging Improvement

View File

@ -1,9 +1,11 @@
# Security Policy
## Supported Versions
## Security Announcements
Security fixes will be announced by posting them in the [security changelog](https://clickhouse.tech/docs/en/whats-new/security-changelog/)
The following versions of ClickHouse server are
currently being supported with security updates:
## Scope and Supported Versions
The following versions of ClickHouse server are currently being supported with security updates:
| Version | Supported |
| ------- | ------------------ |
@ -11,18 +13,49 @@ currently being supported with security updates:
| 18.x | :x: |
| 19.x | :x: |
| 20.1 | :x: |
| 20.3 | :white_check_mark: |
| 20.3 | :x: |
| 20.4 | :x: |
| 20.5 | :x: |
| 20.6 | :x: |
| 20.7 | :x: |
| 20.8 | :white_check_mark: |
| 20.8 | :x: |
| 20.9 | :x: |
| 20.10 | :x: |
| 20.11 | :white_check_mark: |
| 20.12 | :white_check_mark: |
| 21.1 | :white_check_mark: |
| 20.11 | :x: |
| 20.12 | :x: |
| 21.1 | :x: |
| 21.2 | :x: |
| 21.3 | ✅ |
| 21.4 | :x: |
| 21.5 | :x: |
| 21.6 | ✅ |
| 21.7 | ✅ |
| 21.8 | ✅ |
## Reporting a Vulnerability
We're extremely grateful for security researchers and users that report vulnerabilities to the ClickHouse Open Source Community. All reports are thoroughly investigated by developers.
To report a potential vulnerability in ClickHouse please send the details about it to [clickhouse-feedback@yandex-team.com](mailto:clickhouse-feedback@yandex-team.com).
### When Should I Report a Vulnerability?
- You think you discovered a potential security vulnerability in ClickHouse
- You are unsure how a vulnerability affects ClickHouse
### When Should I NOT Report a Vulnerability?
- You need help tuning ClickHouse components for security
- You need help applying security related updates
- Your issue is not security related
## Security Vulnerability Response
Each report is acknowledged and analyzed by ClickHouse maintainers within 5 working days.
As the security issue moves from triage, to identified fix, to release planning we will keep the reporter updated.
## Public Disclosure Timing
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect report date to disclosure date to be on the order of 7 days.

10
base/common/unit.h Normal file
View File

@ -0,0 +1,10 @@
#pragma once
#include <cstddef>
constexpr size_t KiB = 1024;
constexpr size_t MiB = 1024 * KiB;
constexpr size_t GiB = 1024 * MiB;
constexpr size_t operator"" _KiB(unsigned long long val) { return val * KiB; }
constexpr size_t operator"" _MiB(unsigned long long val) { return val * MiB; }
constexpr size_t operator"" _GiB(unsigned long long val) { return val * GiB; }

View File

@ -303,6 +303,7 @@ function run_tests
01683_codec_encrypted # Depends on OpenSSL
01776_decrypt_aead_size_check # Depends on OpenSSL
01811_filter_by_null # Depends on OpenSSL
02012_sha512_fixedstring # Depends on OpenSSL
01281_unsucceeded_insert_select_queries_counter
01292_create_user
01294_lazy_database_concurrent

View File

@ -23,3 +23,5 @@ You can also use the following database engines:
- [PostgreSQL](../../engines/database-engines/postgresql.md)
- [Replicated](../../engines/database-engines/replicated.md)
- [SQLite](../../engines/database-engines/sqlite.md)

View File

@ -1,6 +1,6 @@
---
toc_priority: 29
toc_title: "[experimental] MaterializedMySQL"
toc_title: MaterializedMySQL
---
# [experimental] MaterializedMySQL {#materialized-mysql}

View File

@ -0,0 +1,80 @@
---
toc_priority: 32
toc_title: SQLite
---
# SQLite {#sqlite}
Allows to connect to [SQLite](https://www.sqlite.org/index.html) database and perform `INSERT` and `SELECT` queries to exchange data between ClickHouse and SQLite.
## Creating a Database {#creating-a-database}
``` sql
CREATE DATABASE sqlite_database
ENGINE = SQLite('db_path')
```
**Engine Parameters**
- `db_path` — Path to a file with SQLite database.
## Data Types Support {#data_types-support}
| SQLite | ClickHouse |
|---------------|---------------------------------------------------------|
| INTEGER | [Int32](../../sql-reference/data-types/int-uint.md) |
| REAL | [Float32](../../sql-reference/data-types/float.md) |
| TEXT | [String](../../sql-reference/data-types/string.md) |
| BLOB | [String](../../sql-reference/data-types/string.md) |
## Specifics and Recommendations {#specifics-and-recommendations}
SQLite stores the entire database (definitions, tables, indices, and the data itself) as a single cross-platform file on a host machine. During writing SQLite locks the entire database file, therefore write operations are performed sequentially. Read operations can be multitasked.
SQLite does not require service management (such as startup scripts) or access control based on `GRANT` and passwords. Access control is handled by means of file-system permissions given to the database file itself.
## Usage Example {#usage-example}
Database in ClickHouse, connected to the SQLite:
``` sql
CREATE DATABASE sqlite_db ENGINE = SQLite('sqlite.db');
SHOW TABLES FROM sqlite_db;
```
``` text
┌──name───┐
│ table1 │
│ table2 │
└─────────┘
```
Shows the tables:
``` sql
SELECT * FROM sqlite_db.table1;
```
``` text
┌─col1──┬─col2─┐
│ line1 │ 1 │
│ line2 │ 2 │
│ line3 │ 3 │
└───────┴──────┘
```
Inserting data into SQLite table from ClickHouse table:
``` sql
CREATE TABLE clickhouse_table(`col1` String,`col2` Int16) ENGINE = MergeTree() ORDER BY col2;
INSERT INTO clickhouse_table VALUES ('text',10);
INSERT INTO sqlite_db.table1 SELECT * FROM clickhouse_table;
SELECT * FROM sqlite_db.table1;
```
``` text
┌─col1──┬─col2─┐
│ line1 │ 1 │
│ line2 │ 2 │
│ line3 │ 3 │
│ text │ 10 │
└───────┴──────┘
```

View File

@ -19,3 +19,4 @@ List of supported integrations:
- [EmbeddedRocksDB](../../../engines/table-engines/integrations/embedded-rocksdb.md)
- [RabbitMQ](../../../engines/table-engines/integrations/rabbitmq.md)
- [PostgreSQL](../../../engines/table-engines/integrations/postgresql.md)
- [SQLite](../../../engines/table-engines/integrations/sqlite.md)

View File

@ -0,0 +1,59 @@
---
toc_priority: 7
toc_title: SQLite
---
# SQLite {#sqlite}
The engine allows to import and export data to SQLite and supports queries to SQLite tables directly from ClickHouse.
## Creating a Table {#creating-a-table}
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
name1 [type1],
name2 [type2], ...
) ENGINE = SQLite('db_path', 'table')
```
**Engine Parameters**
- `db_path` — Path to SQLite file with a database.
- `table` — Name of a table in the SQLite database.
## Usage Example {#usage-example}
Shows a query creating the SQLite table:
```sql
SHOW CREATE TABLE sqlite_db.table2;
```
``` text
CREATE TABLE SQLite.table2
(
`col1` Nullable(Int32),
`col2` Nullable(String)
)
ENGINE = SQLite('sqlite.db','table2');
```
Returns the data from the table:
``` sql
SELECT * FROM sqlite_db.table2 ORDER BY col1;
```
```text
┌─col1─┬─col2──┐
│ 1 │ text1 │
│ 2 │ text2 │
│ 3 │ text3 │
└──────┴───────┘
```
**See Also**
- [SQLite](../../../engines/database-engines/sqlite.md) engine
- [sqlite](../../../sql-reference/table-functions/sqlite.md) table function

View File

@ -2041,10 +2041,25 @@ Default value: 0.
## input_format_parallel_parsing {#input-format-parallel-parsing}
- Type: bool
- Default value: True
Enables or disables order-preserving parallel parsing of data formats. Supported only for [TSV](../../interfaces/formats.md#tabseparated), [TKSV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) and [JSONEachRow](../../interfaces/formats.md#jsoneachrow) formats.
Enable order-preserving parallel parsing of data formats. Supported only for TSV, TKSV, CSV, and JSONEachRow formats.
Possible values:
- 1 — Enabled.
- 0 — Disabled.
Default value: `0`.
## output_format_parallel_formatting {#output-format-parallel-formatting}
Enables or disables parallel formatting of data formats. Supported only for [TSV](../../interfaces/formats.md#tabseparated), [TKSV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) and [JSONEachRow](../../interfaces/formats.md#jsoneachrow) formats.
Possible values:
- 1 — Enabled.
- 0 — Disabled.
Default value: `0`.
## min_chunk_bytes_for_parallel_parsing {#min-chunk-bytes-for-parallel-parsing}

View File

@ -255,7 +255,7 @@ windowFunnel(window, [mode, [mode, ... ]])(timestamp, cond1, cond2, ..., condN)
- `window` — Length of the sliding window, it is the time interval between the first and the last condition. The unit of `window` depends on the `timestamp` itself and varies. Determined using the expression `timestamp of cond1 <= timestamp of cond2 <= ... <= timestamp of condN <= timestamp of cond1 + window`.
- `mode` — It is an optional argument. One or more modes can be set.
- `'strict'` — If same condition holds for sequence of events then such non-unique events would be skipped.
- `'strict_deduplication'` — If the same condition holds for the sequence of events, then such repeating event interrupts further processing.
- `'strict_order'` — Don't allow interventions of other events. E.g. in the case of `A->B->D->C`, it stops finding `A->B->C` at the `D` and the max event level is 2.
- `'strict_increase'` — Apply conditions only to events with strictly increasing timestamps.

View File

@ -143,7 +143,9 @@ It works faster than intHash32. Average quality.
## SHA256 {#sha256}
Calculates SHA-1, SHA-224, or SHA-256 from a string and returns the resulting set of bytes as FixedString(20), FixedString(28), or FixedString(32).
## SHA512 {#sha512}
Calculates SHA-1, SHA-224, SHA-256 or SHA-512 from a string and returns the resulting set of bytes as FixedString(20), FixedString(28), FixedString(32), or FixedString(64).
The function works fairly slowly (SHA-1 processes about 5 million short strings per second per processor core, while SHA-224 and SHA-256 process about 2.2 million).
We recommend using this function only in cases when you need a specific hash function and you cant select it.
Even in these cases, we recommend applying the function offline and pre-calculating values when inserting them into the table, instead of applying it in SELECTS.

View File

@ -6,7 +6,7 @@ toc_title: JOIN
Join produces a new table by combining columns from one or multiple tables by using values common to each. It is a common operation in databases with SQL support, which corresponds to [relational algebra](https://en.wikipedia.org/wiki/Relational_algebra#Joins_and_join-like_operators) join. The special case of one table join is often referred to as “self-join”.
Syntax:
**Syntax**
``` sql
SELECT <expr_list>
@ -38,7 +38,7 @@ Additional join types available in ClickHouse:
## Settings {#join-settings}
The default join type can be overriden using [join_default_strictness](../../../operations/settings/settings.md#settings-join_default_strictness) setting.
The default join type can be overridden using [join_default_strictness](../../../operations/settings/settings.md#settings-join_default_strictness) setting.
The behavior of ClickHouse server for `ANY JOIN` operations depends on the [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys) setting.
@ -52,6 +52,61 @@ The behavior of ClickHouse server for `ANY JOIN` operations depends on the [any_
- [join_on_disk_max_files_to_merge](../../../operations/settings/settings.md#join_on_disk_max_files_to_merge)
- [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys)
## ON Section Conditions {on-section-conditions}
An `ON` section can contain several conditions combined using the `AND` operator. Conditions specifying join keys must refer both left and right tables and must use the equality operator. Other conditions may use other logical operators but they must refer either the left or the right table of a query.
Rows are joined if the whole complex condition is met. If the conditions are not met, still rows may be included in the result depending on the `JOIN` type. Note that if the same conditions are placed in a `WHERE` section and they are not met, then rows are always filtered out from the result.
!!! note "Note"
The `OR` operator inside an `ON` section is not supported yet.
!!! note "Note"
If a condition refers columns from different tables, then only the equality operator (`=`) is supported so far.
**Example**
Consider `table_1` and `table_2`:
```
┌─Id─┬─name─┐ ┌─Id─┬─text───────────┬─scores─┐
│ 1 │ A │ │ 1 │ Text A │ 10 │
│ 2 │ B │ │ 1 │ Another text A │ 12 │
│ 3 │ C │ │ 2 │ Text B │ 15 │
└────┴──────┘ └────┴────────────────┴────────┘
```
Query with one join key condition and an additional condition for `table_2`:
``` sql
SELECT name, text FROM table_1 LEFT OUTER JOIN table_2
ON table_1.Id = table_2.Id AND startsWith(table_2.text, 'Text');
```
Note that the result contains the row with the name `C` and the empty text column. It is included into the result because an `OUTER` type of a join is used.
```
┌─name─┬─text───┐
│ A │ Text A │
│ B │ Text B │
│ C │ │
└──────┴────────┘
```
Query with `INNER` type of a join and multiple conditions:
``` sql
SELECT name, text, scores FROM table_1 INNER JOIN table_2
ON table_1.Id = table_2.Id AND table_2.scores > 10 AND startsWith(table_2.text, 'Text');
```
Result:
```
┌─name─┬─text───┬─scores─┐
│ B │ Text B │ 15 │
└──────┴────────┴────────┘
```
## ASOF JOIN Usage {#asof-join-usage}
`ASOF JOIN` is useful when you need to join records that have no exact match.
@ -59,7 +114,7 @@ The behavior of ClickHouse server for `ANY JOIN` operations depends on the [any_
Algorithm requires the special column in tables. This column:
- Must contain an ordered sequence.
- Can be one of the following types: [Int*, UInt*](../../../sql-reference/data-types/int-uint.md), [Float\*](../../../sql-reference/data-types/float.md), [Date](../../../sql-reference/data-types/date.md), [DateTime](../../../sql-reference/data-types/datetime.md), [Decimal\*](../../../sql-reference/data-types/decimal.md).
- Can be one of the following types: [Int, UInt](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), [Date](../../../sql-reference/data-types/date.md), [DateTime](../../../sql-reference/data-types/datetime.md), [Decimal](../../../sql-reference/data-types/decimal.md).
- Cant be the only column in the `JOIN` clause.
Syntax `ASOF JOIN ... ON`:
@ -84,7 +139,7 @@ ASOF JOIN table_2
USING (equi_column1, ... equi_columnN, asof_column)
```
`ASOF JOIN` uses `equi_columnX` for joining on equality and `asof_column` for joining on the closest match with the `table_1.asof_column >= table_2.asof_column` condition. The `asof_column` column always the last one in the `USING` clause.
`ASOF JOIN` uses `equi_columnX` for joining on equality and `asof_column` for joining on the closest match with the `table_1.asof_column >= table_2.asof_column` condition. The `asof_column` column is always the last one in the `USING` clause.
For example, consider the following tables:

View File

@ -34,5 +34,6 @@ You can use table functions in:
| [odbc](../../sql-reference/table-functions/odbc.md) | Creates a [ODBC](../../engines/table-engines/integrations/odbc.md)-engine table. |
| [hdfs](../../sql-reference/table-functions/hdfs.md) | Creates a [HDFS](../../engines/table-engines/integrations/hdfs.md)-engine table. |
| [s3](../../sql-reference/table-functions/s3.md) | Creates a [S3](../../engines/table-engines/integrations/s3.md)-engine table. |
| [sqlite](../../sql-reference/table-functions/sqlite.md) | Creates a [sqlite](../../engines/table-engines/integrations/sqlite.md)-engine table. |
[Original article](https://clickhouse.tech/docs/en/sql-reference/table-functions/) <!--hide-->

View File

@ -0,0 +1,45 @@
---
toc_priority: 55
toc_title: sqlite
---
## sqlite {#sqlite}
Allows to perform queries on a data stored in an [SQLite](../../engines/database-engines/sqlite.md) database.
**Syntax**
``` sql
sqlite('db_path', 'table_name')
```
**Arguments**
- `db_path` — Path to a file with an SQLite database. [String](../../sql-reference/data-types/string.md).
- `table_name` — Name of a table in the SQLite database. [String](../../sql-reference/data-types/string.md).
**Returned value**
- A table object with the same columns as in the original `SQLite` table.
**Example**
Query:
``` sql
SELECT * FROM sqlite('sqlite.db', 'table1') ORDER BY col2;
```
Result:
``` text
┌─col1──┬─col2─┐
│ line1 │ 1 │
│ line2 │ 2 │
│ line3 │ 3 │
└───────┴──────┘
```
**See Also**
- [SQLite](../../engines/table-engines/integrations/sqlite.md) table engine

View File

@ -0,0 +1,79 @@
---
toc_priority: 32
toc_title: SQLite
---
# SQLite {#sqlite}
Движок баз данных позволяет подключаться к базе [SQLite](https://www.sqlite.org/index.html) и выполнять запросы `INSERT` и `SELECT` для обмена данными между ClickHouse и SQLite.
## Создание базы данных {#creating-a-database}
``` sql
CREATE DATABASE sqlite_database
ENGINE = SQLite('db_path')
```
**Параметры движка**
- `db_path` — путь к файлу с базой данных SQLite.
## Поддерживаемые типы данных {#data_types-support}
| SQLite | ClickHouse |
|---------------|---------------------------------------------------------|
| INTEGER | [Int32](../../sql-reference/data-types/int-uint.md) |
| REAL | [Float32](../../sql-reference/data-types/float.md) |
| TEXT | [String](../../sql-reference/data-types/string.md) |
| BLOB | [String](../../sql-reference/data-types/string.md) |
## Особенности и рекомендации {#specifics-and-recommendations}
SQLite хранит всю базу данных (определения, таблицы, индексы и сами данные) в виде единого кроссплатформенного файла на хост-машине. Во время записи SQLite блокирует весь файл базы данных, поэтому операции записи выполняются последовательно. Операции чтения могут быть многозадачными.
SQLite не требует управления службами (например, сценариями запуска) или контроля доступа на основе `GRANT` и паролей. Контроль доступа осуществляется с помощью разрешений файловой системы, предоставляемых самому файлу базы данных.
## Примеры использования {#usage-example}
Отобразим список таблиц базы данных в ClickHouse, подключенной к SQLite:
``` sql
CREATE DATABASE sqlite_db ENGINE = SQLite('sqlite.db');
SHOW TABLES FROM sqlite_db;
```
``` text
┌──name───┐
│ table1 │
│ table2 │
└─────────┘
```
Отобразим содержимое таблицы:
``` sql
SELECT * FROM sqlite_db.table1;
```
``` text
┌─col1──┬─col2─┐
│ line1 │ 1 │
│ line2 │ 2 │
│ line3 │ 3 │
└───────┴──────┘
```
Вставим данные в таблицу SQLite из таблицы ClickHouse:
``` sql
CREATE TABLE clickhouse_table(`col1` String,`col2` Int16) ENGINE = MergeTree() ORDER BY col2;
INSERT INTO clickhouse_table VALUES ('text',10);
INSERT INTO sqlite_db.table1 SELECT * FROM clickhouse_table;
SELECT * FROM sqlite_db.table1;
```
``` text
┌─col1──┬─col2─┐
│ line1 │ 1 │
│ line2 │ 2 │
│ line3 │ 3 │
│ text │ 10 │
└───────┴──────┘
```

View File

@ -0,0 +1,59 @@
---
toc_priority: 7
toc_title: SQLite
---
# SQLite {#sqlite}
Движок позволяет импортировать и экспортировать данные из SQLite, а также поддерживает отправку запросов к таблицам SQLite напрямую из ClickHouse.
## Создание таблицы {#creating-a-table}
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
name1 [type1],
name2 [type2], ...
) ENGINE = SQLite('db_path', 'table')
```
**Параметры движка**
- `db_path` — путь к файлу с базой данных SQLite.
- `table` — имя таблицы в базе данных SQLite.
## Примеры использования {#usage-example}
Отобразим запрос, с помощью которого была создана таблица SQLite:
```sql
SHOW CREATE TABLE sqlite_db.table2;
```
``` text
CREATE TABLE SQLite.table2
(
`col1` Nullable(Int32),
`col2` Nullable(String)
)
ENGINE = SQLite('sqlite.db','table2');
```
Получим данные из таблицы:
``` sql
SELECT * FROM sqlite_db.table2 ORDER BY col1;
```
```text
┌─col1─┬─col2──┐
│ 1 │ text1 │
│ 2 │ text2 │
│ 3 │ text3 │
└──────┴───────┘
```
**См. также**
- [SQLite](../../../engines/database-engines/sqlite.md) движок баз данных
- [sqlite](../../../sql-reference/table-functions/sqlite.md) табличная функция

View File

@ -1865,10 +1865,25 @@ ClickHouse генерирует исключение
## input_format_parallel_parsing {#input-format-parallel-parsing}
- Тип: bool
- Значение по умолчанию: True
Включает или отключает режим, при котором входящие данные разбиваются на части, парсинг каждой из которых осуществляется параллельно с сохранением исходного порядка. Поддерживается только для форматов [TSV](../../interfaces/formats.md#tabseparated), [TKSV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) и [JSONEachRow](../../interfaces/formats.md#jsoneachrow).
Включает режим, при котором входящие данные парсятся параллельно, но с сохранением исходного порядка следования. Поддерживается только для форматов TSV, TKSV, CSV и JSONEachRow.
Возможные значения:
- 1 — включен режим параллельного разбора.
- 0 — отключен режим параллельного разбора.
Значение по умолчанию: `0`.
## output_format_parallel_formatting {#output-format-parallel-formatting}
Включает или отключает режим, при котором исходящие данные форматируются параллельно с сохранением исходного порядка. Поддерживается только для форматов [TSV](../../interfaces/formats.md#tabseparated), [TKSV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) и [JSONEachRow](../../interfaces/formats.md#jsoneachrow).
Возможные значения:
- 1 — включен режим параллельного форматирования.
- 0 — отключен режим параллельного форматирования.
Значение по умолчанию: `0`.
## min_chunk_bytes_for_parallel_parsing {#min-chunk-bytes-for-parallel-parsing}

View File

@ -6,7 +6,7 @@ toc_title: JOIN
`JOIN` создаёт новую таблицу путем объединения столбцов из одной или нескольких таблиц с использованием общих для каждой из них значений. Это обычная операция в базах данных с поддержкой SQL, которая соответствует join из [реляционной алгебры](https://en.wikipedia.org/wiki/Relational_algebra#Joins_and_join-like_operators). Частный случай соединения одной таблицы часто называют self-join.
Синтаксис:
**Синтаксис**
``` sql
SELECT <expr_list>
@ -19,7 +19,7 @@ FROM <left_table>
## Поддерживаемые типы соединения {#select-join-types}
Все типы из стандартого [SQL JOIN](https://en.wikipedia.org/wiki/Join_(SQL)) поддерживаются:
Все типы из стандартного [SQL JOIN](https://en.wikipedia.org/wiki/Join_(SQL)) поддерживаются:
- `INNER JOIN`, возвращаются только совпадающие строки.
- `LEFT OUTER JOIN`, не совпадающие строки из левой таблицы возвращаются в дополнение к совпадающим строкам.
@ -33,7 +33,7 @@ FROM <left_table>
- `LEFT SEMI JOIN` и `RIGHT SEMI JOIN`, белый список по ключам соединения, не производит декартово произведение.
- `LEFT ANTI JOIN` и `RIGHT ANTI JOIN`, черный список по ключам соединения, не производит декартово произведение.
- `LEFT ANY JOIN`, `RIGHT ANY JOIN` и `INNER ANY JOIN`, Частично (для противоположных сторон `LEFT` и `RIGHT`) или полностью (для `INNER` и `FULL`) отключает декартово произведение для стандартых видов `JOIN`.
- `LEFT ANY JOIN`, `RIGHT ANY JOIN` и `INNER ANY JOIN`, Частично (для противоположных сторон `LEFT` и `RIGHT`) или полностью (для `INNER` и `FULL`) отключает декартово произведение для стандартных видов `JOIN`.
- `ASOF JOIN` и `LEFT ASOF JOIN`, Для соединения последовательностей по нечеткому совпадению. Использование `ASOF JOIN` описано ниже.
## Настройки {#join-settings}
@ -52,6 +52,61 @@ FROM <left_table>
- [join_on_disk_max_files_to_merge](../../../operations/settings/settings.md#join_on_disk_max_files_to_merge)
- [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys)
## Условия в секции ON {on-section-conditions}
Секция `ON` может содержать несколько условий, связанных оператором `AND`. Условия, задающие ключи соединения, должны содержать столбцы левой и правой таблицы и должны использовать оператор равенства. Прочие условия могут использовать другие логические операторы, но в отдельном условии могут использоваться столбцы либо только левой, либо только правой таблицы.
Строки объединяются только тогда, когда всё составное условие выполнено. Если оно не выполнено, то строки могут попасть в результат в зависимости от типа `JOIN`. Обратите внимание, что если то же самое условие поместить в секцию `WHERE`, то строки, для которых оно не выполняется, никогда не попаду в результат.
!!! note "Примечание"
Оператор `OR` внутри секции `ON` пока не поддерживается.
!!! note "Примечание"
Если в условии использованы столбцы из разных таблиц, то пока поддерживается только оператор равенства (`=`).
**Пример**
Рассмотрим `table_1` и `table_2`:
```
┌─Id─┬─name─┐ ┌─Id─┬─text───────────┬─scores─┐
│ 1 │ A │ │ 1 │ Text A │ 10 │
│ 2 │ B │ │ 1 │ Another text A │ 12 │
│ 3 │ C │ │ 2 │ Text B │ 15 │
└────┴──────┘ └────┴────────────────┴────────┘
```
Запрос с одним условием, задающим ключ соединения, и дополнительным условием для `table_2`:
``` sql
SELECT name, text FROM table_1 LEFT OUTER JOIN table_2
ON table_1.Id = table_2.Id AND startsWith(table_2.text, 'Text');
```
Обратите внимание, что результат содержит строку с именем `C` и пустым текстом. Строка включена в результат, потому что использован тип соединения `OUTER`.
```
┌─name─┬─text───┐
│ A │ Text A │
│ B │ Text B │
│ C │ │
└──────┴────────┘
```
Запрос с типом соединения `INNER` и несколькими условиями:
``` sql
SELECT name, text, scores FROM table_1 INNER JOIN table_2
ON table_1.Id = table_2.Id AND table_2.scores > 10 AND startsWith(table_2.text, 'Text');
```
Результат:
```
┌─name─┬─text───┬─scores─┐
│ B │ Text B │ 15 │
└──────┴────────┴────────┘
```
## Использование ASOF JOIN {#asof-join-usage}
`ASOF JOIN` применим в том случае, когда необходимо объединять записи, которые не имеют точного совпадения.
@ -59,7 +114,7 @@ FROM <left_table>
Для работы алгоритма необходим специальный столбец в таблицах. Этот столбец:
- Должен содержать упорядоченную последовательность.
- Может быть одного из следующих типов: [Int*, UInt*](../../data-types/int-uint.md), [Float*](../../data-types/float.md), [Date](../../data-types/date.md), [DateTime](../../data-types/datetime.md), [Decimal*](../../data-types/decimal.md).
- Может быть одного из следующих типов: [Int, UInt](../../data-types/int-uint.md), [Float](../../data-types/float.md), [Date](../../data-types/date.md), [DateTime](../../data-types/datetime.md), [Decimal](../../data-types/decimal.md).
- Не может быть единственным столбцом в секции `JOIN`.
Синтаксис `ASOF JOIN ... ON`:

View File

@ -0,0 +1,45 @@
---
toc_priority: 55
toc_title: sqlite
---
## sqlite {#sqlite}
Позволяет выполнять запросы к данным, хранящимся в базе данных [SQLite](../../engines/database-engines/sqlite.md).
**Синтаксис**
``` sql
sqlite('db_path', 'table_name')
```
**Аргументы**
- `db_path` — путь к файлу с базой данных SQLite. [String](../../sql-reference/data-types/string.md).
- `table_name` — имя таблицы в базе данных SQLite. [String](../../sql-reference/data-types/string.md).
**Возвращаемое значение**
- Объект таблицы с теми же столбцами, что и в исходной таблице `SQLite`.
**Пример**
Запрос:
``` sql
SELECT * FROM sqlite('sqlite.db', 'table1') ORDER BY col2;
```
Результат:
``` text
┌─col1──┬─col2─┐
│ line1 │ 1 │
│ line2 │ 2 │
│ line3 │ 3 │
└───────┴──────┘
```
**См. также**
- [SQLite](../../engines/table-engines/integrations/sqlite.md) движок таблиц

View File

@ -38,13 +38,13 @@ ENGINE = MySQL('host:port', ['database' | database], 'user', 'password')
| BIGINT | [Int64](../../sql-reference/data-types/int-uint.md) |
| FLOAT | [Float32](../../sql-reference/data-types/float.md) |
| DOUBLE | [Float64](../../sql-reference/data-types/float.md) |
| DATE | [日期](../../sql-reference/data-types/date.md) |
| DATETIME, TIMESTAMP | [日期时间](../../sql-reference/data-types/datetime.md) |
| BINARY | [固定字符串](../../sql-reference/data-types/fixedstring.md) |
| DATE | [Date](../../sql-reference/data-types/date.md) |
| DATETIME, TIMESTAMP | [DateTime](../../sql-reference/data-types/datetime.md) |
| BINARY | [FixedString](../../sql-reference/data-types/fixedstring.md) |
其他的MySQL数据类型将全部都转换为[字符串](../../sql-reference/data-types/string.md)。
其他的MySQL数据类型将全部都转换为[String](../../sql-reference/data-types/string.md)。
同时以上的所有类型都支持[可为空](../../sql-reference/data-types/nullable.md)。
同时以上的所有类型都支持[Nullable](../../sql-reference/data-types/nullable.md)。
## 使用示例 {#shi-yong-shi-li}

View File

@ -1,8 +1,8 @@
# 功能与Yandex的工作。梅特里卡词典 {#functions-for-working-with-yandex-metrica-dictionaries}
# 使用 Yandex.Metrica 字典函数 {#functions-for-working-with-yandex-metrica-dictionaries}
为了使下面的功能正常工作,服务器配置必须指定获取所有Yandex的路径和地址。梅特里卡字典. 字典在任何这些函数的第一次调用时加载。 如果无法加载引用列表,则会引发异常。
为了使下面的功能正常工作,服务器配置必须指定获取所有 Yandex.Metrica 字典的路径和地址。Yandex.Metrica 字典在任何这些函数的第一次调用时加载。 如果无法加载引用列表,则会引发异常。
For information about creating reference lists, see the section «Dictionaries».
有关创建引用列表的信息,请参阅 «字典» 部分.
## 多个地理基 {#multiple-geobases}
@ -17,18 +17,18 @@ ClickHouse支持同时使用多个备选地理基区域层次结构
所有字典都在运行时重新加载每隔一定数量的秒重新加载一次如builtin_dictionaries_reload_interval config参数中定义或默认情况下每小时一次。 但是,可用字典列表在服务器启动时定义一次。
All functions for working with regions have an optional argument at the end the dictionary key. It is referred to as the geobase.
所有处理区域的函数都在末尾有一个可选参数—字典键。它被称为地基。
示例:
regionToCountry(RegionID) Uses the default dictionary: /opt/geo/regions_hierarchy.txt
regionToCountry(RegionID, '') Uses the default dictionary: /opt/geo/regions_hierarchy.txt
regionToCountry(RegionID, 'ua') Uses the dictionary for the 'ua' key: /opt/geo/regions_hierarchy_ua.txt
regionToCountry(RegionID) 使用默认路径: /opt/geo/regions_hierarchy.txt
regionToCountry(RegionID, '') 使用默认路径: /opt/geo/regions_hierarchy.txt
regionToCountry(RegionID, 'ua') 使用字典中的'ua' 键: /opt/geo/regions_hierarchy_ua.txt
### ツ环板(ョツ嘉ッツ偲青regionシツ氾カツ鉄ツ工ツ渉\]) {#regiontocityid-geobase}
### regionToCity(id[, geobase]) {#regiontocityid-geobase}
Accepts a UInt32 number the region ID from the Yandex geobase. If this region is a city or part of a city, it returns the region ID for the appropriate city. Otherwise, returns 0.
从 Yandex geobase 接收一个 UInt32 数字类型的区域ID 。如果该区域是一个城市或城市的一部分它将返回相应城市的区域ID。否则,返回0。
### 虏茅驴麓卤戮碌禄路戮鲁拢\]) {#regiontoareaid-geobase}
### regionToArea(id[, geobase]) {#regiontoareaid-geobase}
将区域转换为区域地理数据库中的类型5。 在所有其他方式,这个功能是一样的 regionToCity.
@ -84,36 +84,58 @@ LIMIT 15
│ Federation of Bosnia and Herzegovina │
└──────────────────────────────────────────────────────────┘
### 虏茅驴麓卤戮碌禄路戮鲁拢(陆毛隆隆(803)888-8325\]) {#regiontocountryid-geobase}
### regionToCountry(id[, geobase]) {#regiontocountryid-geobase}
将区域转换为国家。 在所有其他方式,这个功能是一样的 regionToCity.
示例: `regionToCountry(toUInt32(213)) = 225` 转换莫斯科213到俄罗斯225
### 掳胫((禄脢鹿脷露胫鲁隆鹿((酶-11-16""\[脪陆,ase\]) {#regiontocontinentid-geobase}
### regionToContinent(id[, geobase]) {#regiontocontinentid-geobase}
将区域转换为大陆。 在所有其他方式,这个功能是一样的 regionToCity.
示例: `regionToContinent(toUInt32(213)) = 10001` 将莫斯科213转换为欧亚大陆10001
### ツ环板(ョツ嘉ッツ偲青regionャツ静ャツ青サツ催ャツ渉\]) {#regiontopopulationid-geobase}
### regionToTopContinent (#regiontotopcontinent) {#regiontotopcontinent-regiontotopcontinent}
查找该区域层次结构中最高的大陆。
**语法**
``` sql
regionToTopContinent(id[, geobase])
```
**参数**
- `id` — Yandex geobase 的区域 ID. [UInt32](../../sql-reference/data-types/int-uint.md).
- `geobase` — 字典的建. 参阅 [Multiple Geobases](#multiple-geobases). [String](../../sql-reference/data-types/string.md). 可选.
**返回值**
- 顶级大陆的标识符(当您在区域层次结构中攀爬时,是后者)。
- 0如果没有。
类型: `UInt32`.
### regionToPopulation(id\[, geobase\]) {#regiontopopulationid-geobase}
获取区域的人口。
The population can be recorded in files with the geobase. See the section «External dictionaries».
人口可以记录在文件与地球基。请参阅«外部词典»部分。
如果没有为该区域记录人口则返回0。
在Yandex地理数据库中可能会为子区域记录人口但不会为父区域记录人口。
### regionIn(lhs,rhs\[,地理数据库\]) {#regioninlhs-rhs-geobase}
检查是否 lhs 属于一个区域 rhs 区域。 如果属于UInt8则返回等于1的数字如果不属于则返回0。
The relationship is reflexive any region also belongs to itself.
这种关系是反射的——任何地区也属于自己。
### ツ暗ェツ氾环催ツ団ツ法ツ人\]) {#regionhierarchyid-geobase}
### regionHierarchy(id\[, geobase\]) {#regionhierarchyid-geobase}
Accepts a UInt32 number the region ID from the Yandex geobase. Returns an array of region IDs consisting of the passed region and all parents along the chain.
从 Yandex geobase 接收一个 UInt32 数字类型的区域ID。返回一个区域ID数组由传递的区域和链上的所有父节点组成。
示例: `regionHierarchy(toUInt32(213)) = [213,1,3,225,10001,10000]`.
### 地区名称(id\[,郎\]) {#regiontonameid-lang}
### regionToName(id\[, lang\]) {#regiontonameid-lang}
Accepts a UInt32 number the region ID from the Yandex geobase. A string with the name of the language can be passed as a second argument. Supported languages are: ru, en, ua, uk, by, kz, tr. If the second argument is omitted, the language ru is used. If the language is not supported, an exception is thrown. Returns a string the name of the region in the corresponding language. If the region with the specified ID doesnt exist, an empty string is returned.
从 Yandex geobase 接收一个 UInt32 数字类型的区域ID。带有语言名称的字符串可以作为第二个参数传递。支持的语言有:ru, en, ua, uk, by, kz, tr。如果省略第二个参数则使用' ru '语言。如果不支持该语言,则抛出异常。返回一个字符串-对应语言的区域名称。如果指定ID的区域不存在则返回一个空字符串。
`ua``uk` 都意味着乌克兰。

View File

@ -129,6 +129,7 @@ namespace ErrorCodes
extern const int UNRECOGNIZED_ARGUMENTS;
extern const int SYNTAX_ERROR;
extern const int TOO_DEEP_RECURSION;
extern const int AUTHENTICATION_FAILED;
}
@ -773,31 +774,50 @@ private:
<< connection_parameters.host << ":" << connection_parameters.port
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl;
connection = std::make_unique<Connection>(
connection_parameters.host,
connection_parameters.port,
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
"", /* cluster */
"", /* cluster_secret */
"client",
connection_parameters.compression,
connection_parameters.security);
String server_name;
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_version_patch = 0;
if (max_client_network_bandwidth)
try
{
ThrottlerPtr throttler = std::make_shared<Throttler>(max_client_network_bandwidth, 0, "");
connection->setThrottler(throttler);
}
connection = std::make_unique<Connection>(
connection_parameters.host,
connection_parameters.port,
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
"", /* cluster */
"", /* cluster_secret */
"client",
connection_parameters.compression,
connection_parameters.security);
connection->getServerVersion(
connection_parameters.timeouts, server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
if (max_client_network_bandwidth)
{
ThrottlerPtr throttler = std::make_shared<Throttler>(max_client_network_bandwidth, 0, "");
connection->setThrottler(throttler);
}
connection->getServerVersion(
connection_parameters.timeouts, server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
}
catch (const Exception & e)
{
/// It is typical when users install ClickHouse, type some password and instantly forget it.
if ((connection_parameters.user.empty() || connection_parameters.user == "default")
&& e.code() == DB::ErrorCodes::AUTHENTICATION_FAILED)
{
std::cerr << std::endl
<< "If you have installed ClickHouse and forgot password you can reset it in the configuration file." << std::endl
<< "The password for default user is typically located at /etc/clickhouse-server/users.d/default-password.xml" << std::endl
<< "and deleting this file will reset the password." << std::endl
<< "See also /etc/clickhouse-server/users.xml on the server where ClickHouse is installed." << std::endl
<< std::endl;
}
throw;
}
server_version = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_version_patch);

View File

@ -376,8 +376,8 @@ void LocalServer::processQueries()
throw Exception("Cannot parse and execute the following part of query: " + String(parse_res.first), ErrorCodes::SYNTAX_ERROR);
/// Authenticate and create a context to execute queries.
Session session{global_context, ClientInfo::Interface::TCP};
session.authenticate("default", "", Poco::Net::SocketAddress{});
Session session{global_context, ClientInfo::Interface::LOCAL};
session.authenticate("default", "", {});
/// Use the same context for all queries.
auto context = session.makeQueryContext();

View File

@ -357,6 +357,7 @@ void Server::createServer(const std::string & listen_host, const char * port_nam
try
{
func(port);
global_context->registerServerPort(port_name, port);
}
catch (const Poco::Exception &)
{

View File

@ -56,6 +56,8 @@ template <typename Value, bool float_return> using FuncQuantilesTDigestWeighted
template <typename Value, bool float_return> using FuncQuantileBFloat16 = AggregateFunctionQuantile<Value, QuantileBFloat16Histogram<Value>, NameQuantileBFloat16, false, std::conditional_t<float_return, Float64, void>, false>;
template <typename Value, bool float_return> using FuncQuantilesBFloat16 = AggregateFunctionQuantile<Value, QuantileBFloat16Histogram<Value>, NameQuantilesBFloat16, false, std::conditional_t<float_return, Float64, void>, true>;
template <typename Value, bool float_return> using FuncQuantileBFloat16Weighted = AggregateFunctionQuantile<Value, QuantileBFloat16Histogram<Value>, NameQuantileBFloat16Weighted, true, std::conditional_t<float_return, Float64, void>, false>;
template <typename Value, bool float_return> using FuncQuantilesBFloat16Weighted = AggregateFunctionQuantile<Value, QuantileBFloat16Histogram<Value>, NameQuantilesBFloat16Weighted, true, std::conditional_t<float_return, Float64, void>, true>;
template <template <typename, bool> class Function>
static constexpr bool supportDecimal()
@ -167,6 +169,9 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
factory.registerFunction(NameQuantileBFloat16::name, createAggregateFunctionQuantile<FuncQuantileBFloat16>);
factory.registerFunction(NameQuantilesBFloat16::name, { createAggregateFunctionQuantile<FuncQuantilesBFloat16>, properties });
factory.registerFunction(NameQuantileBFloat16Weighted::name, createAggregateFunctionQuantile<FuncQuantileBFloat16Weighted>);
factory.registerFunction(NameQuantilesBFloat16Weighted::name, createAggregateFunctionQuantile<FuncQuantilesBFloat16Weighted>);
/// 'median' is an alias for 'quantile'
factory.registerAlias("median", NameQuantile::name);
factory.registerAlias("medianDeterministic", NameQuantileDeterministic::name);
@ -179,6 +184,7 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
factory.registerAlias("medianTDigest", NameQuantileTDigest::name);
factory.registerAlias("medianTDigestWeighted", NameQuantileTDigestWeighted::name);
factory.registerAlias("medianBFloat16", NameQuantileBFloat16::name);
factory.registerAlias("medianBFloat16Weighted", NameQuantileBFloat16Weighted::name);
}
}

View File

@ -237,5 +237,7 @@ struct NameQuantilesTDigestWeighted { static constexpr auto name = "quantilesTDi
struct NameQuantileBFloat16 { static constexpr auto name = "quantileBFloat16"; };
struct NameQuantilesBFloat16 { static constexpr auto name = "quantilesBFloat16"; };
struct NameQuantileBFloat16Weighted { static constexpr auto name = "quantileBFloat16Weighted"; };
struct NameQuantilesBFloat16Weighted { static constexpr auto name = "quantilesBFloat16Weighted"; };
}

View File

@ -137,8 +137,8 @@ class AggregateFunctionWindowFunnel final
private:
UInt64 window;
UInt8 events_size;
/// When the 'strict' is set, it applies conditions only for the not repeating values.
bool strict;
/// When the 'strict_deduplication' is set, it applies conditions only for the not repeating values.
bool strict_deduplication;
/// When the 'strict_order' is set, it doesn't allow interventions of other events.
/// In the case of 'A->B->D->C', it stops finding 'A->B->C' at the 'D' and the max event level is 2.
@ -150,7 +150,7 @@ private:
/// Loop through the entire events_list, update the event timestamp value
/// The level path must be 1---2---3---...---check_events_size, find the max event level that satisfied the path in the sliding window.
/// If found, returns the max event level, else return 0.
/// The Algorithm complexity is O(n).
/// The algorithm works in O(n) time, but the overall function works in O(n * log(n)) due to sorting.
UInt8 getEventLevel(Data & data) const
{
if (data.size() == 0)
@ -163,10 +163,10 @@ private:
/// events_timestamp stores the timestamp of the first and previous i-th level event happen within time window
std::vector<std::optional<std::pair<UInt64, UInt64>>> events_timestamp(events_size);
bool first_event = false;
for (const auto & pair : data.events_list)
for (size_t i = 0; i < data.events_list.size(); ++i)
{
const T & timestamp = pair.first;
const auto & event_idx = pair.second - 1;
const T & timestamp = data.events_list[i].first;
const auto & event_idx = data.events_list[i].second - 1;
if (strict_order && event_idx == -1)
{
if (first_event)
@ -179,9 +179,9 @@ private:
events_timestamp[0] = std::make_pair(timestamp, timestamp);
first_event = true;
}
else if (strict && events_timestamp[event_idx].has_value())
else if (strict_deduplication && events_timestamp[event_idx].has_value())
{
return event_idx + 1;
return data.events_list[i - 1].second;
}
else if (strict_order && first_event && !events_timestamp[event_idx - 1].has_value())
{
@ -226,18 +226,20 @@ public:
events_size = arguments.size() - 1;
window = params.at(0).safeGet<UInt64>();
strict = false;
strict_deduplication = false;
strict_order = false;
strict_increase = false;
for (size_t i = 1; i < params.size(); ++i)
{
String option = params.at(i).safeGet<String>();
if (option == "strict")
strict = true;
if (option == "strict_deduplication")
strict_deduplication = true;
else if (option == "strict_order")
strict_order = true;
else if (option == "strict_increase")
strict_increase = true;
else if (option == "strict")
throw Exception{"strict is replaced with strict_deduplication in Aggregate function " + getName(), ErrorCodes::BAD_ARGUMENTS};
else
throw Exception{"Aggregate function " + getName() + " doesn't support a parameter: " + option, ErrorCodes::BAD_ARGUMENTS};
}

View File

@ -304,7 +304,7 @@ size_t ColumnUnique<ColumnType>::uniqueInsert(const Field & x)
if (x.getType() == Field::Types::Null)
return getNullValueIndex();
if (isNumeric())
if (valuesHaveFixedSize())
return uniqueInsertData(&x.reinterpret<char>(), size_of_value_if_fixed);
auto & val = x.get<String>();

View File

@ -80,8 +80,3 @@ target_link_libraries (average PRIVATE clickhouse_common_io)
add_executable (shell_command_inout shell_command_inout.cpp)
target_link_libraries (shell_command_inout PRIVATE clickhouse_common_io)
if (ENABLE_FUZZING)
add_executable(YAML_fuzzer YAML_fuzzer.cpp ${SRCS})
target_link_libraries(YAML_fuzzer PRIVATE clickhouse_parsers ${LIB_FUZZING_ENGINE})
endif ()

View File

@ -1,39 +0,0 @@
#include <iostream>
#include <fstream>
#include <string>
#include <cstdio>
#include <time.h>
#include <filesystem>
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
{
/// How to test:
/// build ClickHouse with YAML_fuzzer.cpp
/// ./YAML_fuzzer YAML_CORPUS
/// where YAML_CORPUS is a directory with different YAML configs for libfuzzer
char file_name[L_tmpnam];
if (!std::tmpnam(file_name))
{
std::cerr << "Cannot create temp file!\n";
return 1;
}
std::string input = std::string(reinterpret_cast<const char*>(data), size);
DB::YAMLParser parser;
{
std::ofstream temp_file(file_name);
temp_file << input;
}
try
{
DB::YAMLParser::parse(std::string(file_name));
}
catch (...)
{
std::cerr << "YAML_fuzzer failed: " << getCurrentExceptionMessage() << std::endl;
return 1;
}
return 0;
}

View File

@ -1,3 +1,18 @@
if(ENABLE_EXAMPLES)
if (ENABLE_FUZZING)
include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
add_headers_and_sources(fuzz_compression .)
# Remove this file, because it has dependencies on DataTypes
list(REMOVE_ITEM ${fuzz_compression_sources} CompressionFactoryAdditions.cpp)
add_library(fuzz_compression ${fuzz_compression_headers} ${fuzz_compression_sources})
target_link_libraries(fuzz_compression PUBLIC clickhouse_parsers clickhouse_common_io common lz4)
endif()
if (ENABLE_EXAMPLES)
add_subdirectory(examples)
endif()
if (ENABLE_FUZZING)
add_subdirectory(fuzzers)
endif()

View File

@ -22,13 +22,10 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_CODEC;
extern const int BAD_ARGUMENTS;
extern const int UNEXPECTED_AST_STRUCTURE;
extern const int DATA_TYPE_CANNOT_HAVE_ARGUMENTS;
}
static constexpr auto DEFAULT_CODEC_NAME = "Default";
CompressionCodecPtr CompressionCodecFactory::getDefaultCodec() const
{
return default_codec;
@ -49,184 +46,6 @@ CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std
}
}
void CompressionCodecFactory::validateCodec(
const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const
{
if (family_name.empty())
throw Exception("Compression codec name cannot be empty", ErrorCodes::BAD_ARGUMENTS);
if (level)
{
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)),
{}, sanity_check, allow_experimental_codecs);
}
else
{
auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier),
{}, sanity_check, allow_experimental_codecs);
}
}
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const
{
if (const auto * func = ast->as<ASTFunction>())
{
ASTPtr codecs_descriptions = std::make_shared<ASTExpressionList>();
bool is_compression = false;
bool has_none = false;
std::optional<size_t> generic_compression_codec_pos;
std::set<size_t> post_processing_codecs;
bool can_substitute_codec_arguments = true;
for (size_t i = 0, size = func->arguments->children.size(); i < size; ++i)
{
const auto & inner_codec_ast = func->arguments->children[i];
String codec_family_name;
ASTPtr codec_arguments;
if (const auto * family_name = inner_codec_ast->as<ASTIdentifier>())
{
codec_family_name = family_name->name();
codec_arguments = {};
}
else if (const auto * ast_func = inner_codec_ast->as<ASTFunction>())
{
codec_family_name = ast_func->name;
codec_arguments = ast_func->arguments;
}
else
throw Exception("Unexpected AST element for compression codec", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
/// Default codec replaced with current default codec which may depend on different
/// settings (and properties of data) in runtime.
CompressionCodecPtr result_codec;
if (codec_family_name == DEFAULT_CODEC_NAME)
{
if (codec_arguments != nullptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{} codec cannot have any arguments, it's just an alias for codec specified in config.xml", DEFAULT_CODEC_NAME);
result_codec = default_codec;
codecs_descriptions->children.emplace_back(std::make_shared<ASTIdentifier>(DEFAULT_CODEC_NAME));
}
else
{
if (column_type)
{
CompressionCodecPtr prev_codec;
IDataType::StreamCallbackWithType callback = [&](
const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type)
{
if (ISerialization::isSpecialCompressionAllowed(substream_path))
{
result_codec = getImpl(codec_family_name, codec_arguments, &substream_type);
/// Case for column Tuple, which compressed with codec which depends on data type, like Delta.
/// We cannot substitute parameters for such codecs.
if (prev_codec && prev_codec->getHash() != result_codec->getHash())
can_substitute_codec_arguments = false;
prev_codec = result_codec;
}
};
ISerialization::SubstreamPath stream_path;
column_type->enumerateStreams(column_type->getDefaultSerialization(), callback, stream_path);
if (!result_codec)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName());
}
else
{
result_codec = getImpl(codec_family_name, codec_arguments, nullptr);
}
if (!allow_experimental_codecs && result_codec->isExperimental())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Codec {} is experimental and not meant to be used in production."
" You can enable it with the 'allow_experimental_codecs' setting.",
codec_family_name);
codecs_descriptions->children.emplace_back(result_codec->getCodecDesc());
}
is_compression |= result_codec->isCompression();
has_none |= result_codec->isNone();
if (!generic_compression_codec_pos && result_codec->isGenericCompression())
generic_compression_codec_pos = i;
if (result_codec->isPostProcessing())
post_processing_codecs.insert(i);
}
String codec_description = queryToString(codecs_descriptions);
if (sanity_check)
{
if (codecs_descriptions->children.size() > 1 && has_none)
throw Exception(
"It does not make sense to have codec NONE along with other compression codecs: " + codec_description
+ ". (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).",
ErrorCodes::BAD_ARGUMENTS);
/// Allow to explicitly specify single NONE codec if user don't want any compression.
/// But applying other transformations solely without compression (e.g. Delta) does not make sense.
/// It's okay to apply post-processing codecs solely without anything else.
if (!is_compression && !has_none && post_processing_codecs.size() != codecs_descriptions->children.size())
throw Exception(
"Compression codec " + codec_description
+ " does not compress anything."
" You may want to add generic compression algorithm after other transformations, like: "
+ codec_description
+ ", LZ4."
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).",
ErrorCodes::BAD_ARGUMENTS);
/// It does not make sense to apply any non-post-processing codecs
/// after post-processing one.
if (!post_processing_codecs.empty() &&
*post_processing_codecs.begin() != codecs_descriptions->children.size() - post_processing_codecs.size())
throw Exception("The combination of compression codecs " + codec_description + " is meaningless,"
" because it does not make sense to apply any non-post-processing codecs after"
" post-processing ones. (Note: you can enable setting 'allow_suspicious_codecs'"
" to skip this check).", ErrorCodes::BAD_ARGUMENTS);
/// It does not make sense to apply any transformations after generic compression algorithm
/// So, generic compression can be only one and only at the end.
if (generic_compression_codec_pos &&
*generic_compression_codec_pos != codecs_descriptions->children.size() - 1 - post_processing_codecs.size())
throw Exception("The combination of compression codecs " + codec_description + " is meaningless,"
" because it does not make sense to apply any transformations after generic compression algorithm."
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS);
}
/// For columns with nested types like Tuple(UInt32, UInt64) we
/// obviously cannot substitute parameters for codecs which depend on
/// data type, because for the first column Delta(4) is suitable and
/// Delta(8) for the second. So we should leave codec description as is
/// and deduce them in get method for each subtype separately. For all
/// other types it's better to substitute parameters, for better
/// readability and backward compatibility.
if (can_substitute_codec_arguments)
{
std::shared_ptr<ASTFunction> result = std::make_shared<ASTFunction>();
result->name = "CODEC";
result->arguments = codecs_descriptions;
return result;
}
else
{
return ast;
}
}
throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC);
}
CompressionCodecPtr CompressionCodecFactory::get(
const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default, bool only_generic) const

View File

@ -14,6 +14,8 @@
namespace DB
{
static constexpr auto DEFAULT_CODEC_NAME = "Default";
class ICompressionCodec;
using CompressionCodecPtr = std::shared_ptr<ICompressionCodec>;

View File

@ -0,0 +1,214 @@
/**
* This file contains a part of CompressionCodecFactory methods definitions and
* is needed only because they have dependencies on DataTypes.
* They are not useful for fuzzers, so we leave them in other translation unit.
*/
#include <Compression/CompressionFactory.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeNested.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNEXPECTED_AST_STRUCTURE;
extern const int UNKNOWN_CODEC;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
void CompressionCodecFactory::validateCodec(
const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const
{
if (family_name.empty())
throw Exception("Compression codec name cannot be empty", ErrorCodes::BAD_ARGUMENTS);
if (level)
{
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)),
{}, sanity_check, allow_experimental_codecs);
}
else
{
auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier),
{}, sanity_check, allow_experimental_codecs);
}
}
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const
{
if (const auto * func = ast->as<ASTFunction>())
{
ASTPtr codecs_descriptions = std::make_shared<ASTExpressionList>();
bool is_compression = false;
bool has_none = false;
std::optional<size_t> generic_compression_codec_pos;
std::set<size_t> post_processing_codecs;
bool can_substitute_codec_arguments = true;
for (size_t i = 0, size = func->arguments->children.size(); i < size; ++i)
{
const auto & inner_codec_ast = func->arguments->children[i];
String codec_family_name;
ASTPtr codec_arguments;
if (const auto * family_name = inner_codec_ast->as<ASTIdentifier>())
{
codec_family_name = family_name->name();
codec_arguments = {};
}
else if (const auto * ast_func = inner_codec_ast->as<ASTFunction>())
{
codec_family_name = ast_func->name;
codec_arguments = ast_func->arguments;
}
else
throw Exception("Unexpected AST element for compression codec", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
/// Default codec replaced with current default codec which may depend on different
/// settings (and properties of data) in runtime.
CompressionCodecPtr result_codec;
if (codec_family_name == DEFAULT_CODEC_NAME)
{
if (codec_arguments != nullptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{} codec cannot have any arguments, it's just an alias for codec specified in config.xml", DEFAULT_CODEC_NAME);
result_codec = default_codec;
codecs_descriptions->children.emplace_back(std::make_shared<ASTIdentifier>(DEFAULT_CODEC_NAME));
}
else
{
if (column_type)
{
CompressionCodecPtr prev_codec;
IDataType::StreamCallbackWithType callback = [&](
const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type)
{
if (ISerialization::isSpecialCompressionAllowed(substream_path))
{
result_codec = getImpl(codec_family_name, codec_arguments, &substream_type);
/// Case for column Tuple, which compressed with codec which depends on data type, like Delta.
/// We cannot substitute parameters for such codecs.
if (prev_codec && prev_codec->getHash() != result_codec->getHash())
can_substitute_codec_arguments = false;
prev_codec = result_codec;
}
};
ISerialization::SubstreamPath stream_path;
column_type->enumerateStreams(column_type->getDefaultSerialization(), callback, stream_path);
if (!result_codec)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName());
}
else
{
result_codec = getImpl(codec_family_name, codec_arguments, nullptr);
}
if (!allow_experimental_codecs && result_codec->isExperimental())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Codec {} is experimental and not meant to be used in production."
" You can enable it with the 'allow_experimental_codecs' setting.",
codec_family_name);
codecs_descriptions->children.emplace_back(result_codec->getCodecDesc());
}
is_compression |= result_codec->isCompression();
has_none |= result_codec->isNone();
if (!generic_compression_codec_pos && result_codec->isGenericCompression())
generic_compression_codec_pos = i;
if (result_codec->isPostProcessing())
post_processing_codecs.insert(i);
}
String codec_description = queryToString(codecs_descriptions);
if (sanity_check)
{
if (codecs_descriptions->children.size() > 1 && has_none)
throw Exception(
"It does not make sense to have codec NONE along with other compression codecs: " + codec_description
+ ". (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).",
ErrorCodes::BAD_ARGUMENTS);
/// Allow to explicitly specify single NONE codec if user don't want any compression.
/// But applying other transformations solely without compression (e.g. Delta) does not make sense.
/// It's okay to apply post-processing codecs solely without anything else.
if (!is_compression && !has_none && post_processing_codecs.size() != codecs_descriptions->children.size())
throw Exception(
"Compression codec " + codec_description
+ " does not compress anything."
" You may want to add generic compression algorithm after other transformations, like: "
+ codec_description
+ ", LZ4."
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).",
ErrorCodes::BAD_ARGUMENTS);
/// It does not make sense to apply any non-post-processing codecs
/// after post-processing one.
if (!post_processing_codecs.empty() &&
*post_processing_codecs.begin() != codecs_descriptions->children.size() - post_processing_codecs.size())
throw Exception("The combination of compression codecs " + codec_description + " is meaningless,"
" because it does not make sense to apply any non-post-processing codecs after"
" post-processing ones. (Note: you can enable setting 'allow_suspicious_codecs'"
" to skip this check).", ErrorCodes::BAD_ARGUMENTS);
/// It does not make sense to apply any transformations after generic compression algorithm
/// So, generic compression can be only one and only at the end.
if (generic_compression_codec_pos &&
*generic_compression_codec_pos != codecs_descriptions->children.size() - 1 - post_processing_codecs.size())
throw Exception("The combination of compression codecs " + codec_description + " is meaningless,"
" because it does not make sense to apply any transformations after generic compression algorithm."
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS);
}
/// For columns with nested types like Tuple(UInt32, UInt64) we
/// obviously cannot substitute parameters for codecs which depend on
/// data type, because for the first column Delta(4) is suitable and
/// Delta(8) for the second. So we should leave codec description as is
/// and deduce them in get method for each subtype separately. For all
/// other types it's better to substitute parameters, for better
/// readability and backward compatibility.
if (can_substitute_codec_arguments)
{
std::shared_ptr<ASTFunction> result = std::make_shared<ASTFunction>();
result->name = "CODEC";
result->arguments = codecs_descriptions;
return result;
}
else
{
return ast;
}
}
throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC);
}
}

View File

@ -3,8 +3,3 @@ target_link_libraries (compressed_buffer PRIVATE dbms)
add_executable (cached_compressed_read_buffer cached_compressed_read_buffer.cpp)
target_link_libraries (cached_compressed_read_buffer PRIVATE dbms)
if (ENABLE_FUZZING)
add_executable (compressed_buffer_fuzzer compressed_buffer_fuzzer.cpp)
target_link_libraries (compressed_buffer_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})
endif ()

View File

@ -0,0 +1,2 @@
add_executable (compressed_buffer_fuzzer compressed_buffer_fuzzer.cpp)
target_link_libraries (compressed_buffer_fuzzer PRIVATE fuzz_compression clickhouse_common_io ${LIB_FUZZING_ENGINE})

View File

@ -1,3 +1,7 @@
if (ENABLE_EXAMPLES)
add_subdirectory(examples)
endif ()
if (ENABLE_FUZZING)
add_subdirectory(fuzzers)
endif()

View File

@ -55,7 +55,7 @@ class IColumn;
M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "", 0) \
M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "", 0) \
M(Seconds, drain_timeout, DBMS_DEFAULT_DRAIN_TIMEOUT_SEC, "", 0) \
M(Seconds, tcp_keep_alive_timeout, 0, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
M(Seconds, tcp_keep_alive_timeout, 290 /* less than DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC */, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
M(Milliseconds, hedged_connection_timeout_ms, DBMS_DEFAULT_HEDGED_CONNECTION_TIMEOUT_MS, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
M(Milliseconds, receive_data_timeout_ms, DBMS_DEFAULT_RECEIVE_DATA_TIMEOUT_MS, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \
M(Bool, use_hedged_requests, true, "Use hedged requests for distributed queries", 0) \
@ -114,6 +114,7 @@ class IColumn;
M(UInt64, group_by_two_level_threshold_bytes, 50000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.", 0) \
M(Bool, distributed_aggregation_memory_efficient, true, "Is the memory-saving mode of distributed aggregation enabled.", 0) \
M(UInt64, aggregation_memory_efficient_merge_threads, 0, "Number of threads to use for merge intermediate aggregation results in memory efficient mode. When bigger, then more memory is consumed. 0 means - same as 'max_threads'.", 0) \
M(Bool, enable_positional_arguments, false, "Enable positional arguments in ORDER BY, GROUP BY and LIMIT BY", 0) \
\
M(UInt64, max_parallel_replicas, 1, "The maximum number of replicas of each shard used when the query is executed. For consistency (to get different parts of the same partition), this option only works for the specified sampling key. The lag of the replicas is not controlled.", 0) \
M(UInt64, parallel_replicas_count, 0, "", 0) \
@ -252,6 +253,7 @@ class IColumn;
M(Bool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.", 0) \
M(Bool, joined_subquery_requires_alias, true, "Force joined subqueries and table functions to have aliases for correct name qualification.", 0) \
M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \
M(Bool, empty_result_for_aggregation_by_constant_keys_on_empty_set, true, "Return empty result when aggregating by constant keys on empty set.", 0) \
M(Bool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \
M(Bool, allow_suspicious_codecs, false, "If it is set to true, allow to specify meaningless compression codecs.", 0) \
M(Bool, allow_experimental_codecs, false, "If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).", 0) \

View File

@ -8,11 +8,6 @@ target_link_libraries (field PRIVATE dbms)
add_executable (string_ref_hash string_ref_hash.cpp)
target_link_libraries (string_ref_hash PRIVATE clickhouse_common_io)
if (ENABLE_FUZZING)
add_executable (names_and_types_fuzzer names_and_types_fuzzer.cpp)
target_link_libraries (names_and_types_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})
endif ()
add_executable (mysql_protocol mysql_protocol.cpp)
target_link_libraries (mysql_protocol PRIVATE dbms)
if(USE_SSL)

View File

@ -0,0 +1,2 @@
add_executable (names_and_types_fuzzer names_and_types_fuzzer.cpp)
target_link_libraries (names_and_types_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})

View File

@ -26,23 +26,6 @@ namespace ErrorCodes
IDataType::~IDataType() = default;
String IDataType::getName() const
{
if (custom_name)
{
return custom_name->getName();
}
else
{
return doGetName();
}
}
String IDataType::doGetName() const
{
return getFamilyName();
}
void IDataType::updateAvgValueSizeHint(const IColumn & column, double & avg_value_size_hint)
{
/// Update the average value size hint if amount of read rows isn't too small

View File

@ -62,7 +62,13 @@ public:
/// static constexpr bool is_parametric = false;
/// Name of data type (examples: UInt64, Array(String)).
String getName() const;
String getName() const
{
if (custom_name)
return custom_name->getName();
else
return doGetName();
}
/// Name of data type family (example: FixedString, Array).
virtual const char * getFamilyName() const = 0;
@ -105,7 +111,7 @@ public:
void enumerateStreams(const SerializationPtr & serialization, const StreamCallbackWithType & callback) const { enumerateStreams(serialization, callback, {}); }
protected:
virtual String doGetName() const;
virtual String doGetName() const { return getFamilyName(); }
virtual SerializationPtr doGetDefaultSerialization() const = 0;
DataTypePtr getTypeForSubstream(const ISerialization::SubstreamPath & substream_path) const;

View File

@ -256,8 +256,8 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
if (configuration.is_local)
{
/// Start local session in case when the dictionary is loaded in-process (without TCP communication).
local_session = std::make_shared<Session>(global_context, ClientInfo::Interface::TCP);
local_session->authenticate(configuration.user, configuration.password, Poco::Net::SocketAddress{"127.0.0.1", 0});
local_session = std::make_shared<Session>(global_context, ClientInfo::Interface::LOCAL);
local_session->authenticate(configuration.user, configuration.password, {});
context = local_session->makeQueryContext();
context->applySettingsChanges(readSettingsFromDictionaryConfig(config, config_prefix));
}

View File

@ -161,9 +161,6 @@ DictionaryStructure::DictionaryStructure(const Poco::Util::AbstractConfiguration
}
}
if (attributes.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Dictionary has no attributes defined");
if (config.getBool(config_prefix + ".layout.ip_trie.access_to_key_from_attributes", false))
access_to_key_from_attributes = true;
}

View File

@ -496,9 +496,6 @@ void checkAST(const ASTCreateQuery & query)
if (!query.is_dictionary || query.dictionary == nullptr)
throw Exception(ErrorCodes::INCORRECT_DICTIONARY_DEFINITION, "Cannot convert dictionary to configuration from non-dictionary AST.");
if (query.dictionary_attributes_list == nullptr || query.dictionary_attributes_list->children.empty())
throw Exception(ErrorCodes::INCORRECT_DICTIONARY_DEFINITION, "Cannot create dictionary with empty attributes list");
if (query.dictionary->layout == nullptr)
throw Exception(ErrorCodes::INCORRECT_DICTIONARY_DEFINITION, "Cannot create dictionary with empty layout");
@ -512,8 +509,6 @@ void checkAST(const ASTCreateQuery & query)
if (query.dictionary->source == nullptr)
throw Exception(ErrorCodes::INCORRECT_DICTIONARY_DEFINITION, "Cannot create dictionary with empty source");
/// Range can be empty
}
void checkPrimaryKey(const NamesToTypeNames & all_attrs, const Names & key_attrs)

View File

@ -6,25 +6,37 @@
#include <bitset>
#include <random>
#include <utility>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromS3.h>
#include <boost/algorithm/string.hpp>
#include <common/unit.h>
#include <Common/checkStackSize.h>
#include <Common/createHardLink.h>
#include <Common/quoteString.h>
#include <Common/thread_local_rng.h>
#include <Disks/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/WriteIndirectBufferFromRemoteFS.h>
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Common/createHardLink.h>
#include <Common/quoteString.h>
#include <Common/thread_local_rng.h>
#include <Common/checkStackSize.h>
#include <boost/algorithm/string.hpp>
#include <aws/s3/model/CopyObjectRequest.h> // Y_IGNORE
#include <aws/s3/model/DeleteObjectsRequest.h> // Y_IGNORE
#include <aws/s3/model/GetObjectRequest.h> // Y_IGNORE
#include <aws/s3/model/ListObjectsV2Request.h> // Y_IGNORE
#include <aws/s3/model/HeadObjectRequest.h> // Y_IGNORE
#include <aws/s3/model/CreateMultipartUploadRequest.h> // Y_IGNORE
#include <aws/s3/model/CompleteMultipartUploadRequest.h> // Y_IGNORE
#include <aws/s3/model/UploadPartCopyRequest.h> // Y_IGNORE
#include <aws/s3/model/AbortMultipartUploadRequest.h> // Y_IGNORE
namespace DB
@ -388,16 +400,7 @@ void DiskS3::saveSchemaVersion(const int & version)
void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & metadata)
{
auto settings = current_settings.get();
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(bucket + "/" + key);
request.SetBucket(bucket);
request.SetKey(key);
request.SetMetadata(metadata);
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
auto outcome = settings->client->CopyObject(request);
throwIfError(outcome);
copyObjectImpl(bucket, key, bucket, key, std::nullopt, metadata);
}
void DiskS3::migrateFileToRestorableSchema(const String & path)
@ -553,18 +556,124 @@ void DiskS3::listObjects(const String & source_bucket, const String & source_pat
} while (outcome.GetResult().GetIsTruncated());
}
void DiskS3::copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key) const
void DiskS3::copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head) const
{
if (head && (head->GetContentLength() >= static_cast<Int64>(5_GiB)))
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head);
else
copyObjectImpl(src_bucket, src_key, dst_bucket, dst_key);
}
void DiskS3::copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head,
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata) const
{
auto settings = current_settings.get();
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(src_bucket + "/" + src_key);
request.SetBucket(dst_bucket);
request.SetKey(dst_key);
if (metadata)
{
request.SetMetadata(*metadata);
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
}
auto outcome = settings->client->CopyObject(request);
if (!outcome.IsSuccess() && outcome.GetError().GetExceptionName() == "EntityTooLarge")
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head, metadata);
return;
}
throwIfError(outcome);
}
void DiskS3::copyObjectMultipartImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head,
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata) const
{
LOG_DEBUG(log, "Multipart copy upload has created. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, Metadata: {}",
src_bucket, src_key, dst_bucket, dst_key, metadata ? "REPLACE" : "NOT_SET");
auto settings = current_settings.get();
if (!head)
head = headObject(src_bucket, src_key);
size_t size = head->GetContentLength();
String multipart_upload_id;
{
Aws::S3::Model::CreateMultipartUploadRequest request;
request.SetBucket(dst_bucket);
request.SetKey(dst_key);
if (metadata)
request.SetMetadata(*metadata);
auto outcome = settings->client->CreateMultipartUpload(request);
throwIfError(outcome);
multipart_upload_id = outcome.GetResult().GetUploadId();
}
std::vector<String> part_tags;
size_t upload_part_size = settings->s3_min_upload_part_size;
for (size_t position = 0, part_number = 1; position < size; ++part_number, position += upload_part_size)
{
Aws::S3::Model::UploadPartCopyRequest part_request;
part_request.SetCopySource(src_bucket + "/" + src_key);
part_request.SetBucket(dst_bucket);
part_request.SetKey(dst_key);
part_request.SetUploadId(multipart_upload_id);
part_request.SetPartNumber(part_number);
part_request.SetCopySourceRange(fmt::format("bytes={}-{}", position, std::min(size, position + upload_part_size) - 1));
auto outcome = settings->client->UploadPartCopy(part_request);
if (!outcome.IsSuccess())
{
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
abort_request.SetBucket(dst_bucket);
abort_request.SetKey(dst_key);
abort_request.SetUploadId(multipart_upload_id);
settings->client->AbortMultipartUpload(abort_request);
// In error case we throw exception later with first error from UploadPartCopy
}
throwIfError(outcome);
auto etag = outcome.GetResult().GetCopyPartResult().GetETag();
part_tags.push_back(etag);
}
{
Aws::S3::Model::CompleteMultipartUploadRequest req;
req.SetBucket(dst_bucket);
req.SetKey(dst_key);
req.SetUploadId(multipart_upload_id);
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
for (size_t i = 0; i < part_tags.size(); ++i)
{
Aws::S3::Model::CompletedPart part;
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1));
}
req.SetMultipartUpload(multipart_upload);
auto outcome = settings->client->CompleteMultipartUpload(req);
throwIfError(outcome);
LOG_DEBUG(log, "Multipart copy upload has completed. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, "
"Upload_id: {}, Parts: {}", src_bucket, src_key, dst_bucket, dst_key, multipart_upload_id, part_tags.size());
}
}
struct DiskS3::RestoreInformation
{
UInt64 revision = LATEST_REVISION;
@ -757,7 +866,7 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
/// Copy object if we restore to different bucket / path.
if (bucket != source_bucket || remote_fs_root_path != source_path)
copyObject(source_bucket, key, bucket, remote_fs_root_path + relative_key);
copyObject(source_bucket, key, bucket, remote_fs_root_path + relative_key, head_result);
metadata.addObject(relative_key, head_result.GetContentLength());
metadata.save();

View File

@ -7,6 +7,7 @@
#if USE_AWS_S3
#include <atomic>
#include <optional>
#include <common/logger_useful.h>
#include "Disks/DiskFactory.h"
#include "Disks/Executor.h"
@ -131,7 +132,15 @@ private:
Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key) const;
void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback) const;
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key) const;
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt) const;
void copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata = std::nullopt) const;
void copyObjectMultipartImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata = std::nullopt) const;
/// Restore S3 metadata files on file system.
void restore();

View File

@ -14,6 +14,7 @@ void registerFunctionsHashing(FunctionFactory & factory)
factory.registerFunction<FunctionSHA1>();
factory.registerFunction<FunctionSHA224>();
factory.registerFunction<FunctionSHA256>();
factory.registerFunction<FunctionSHA512>();
#endif
factory.registerFunction<FunctionSipHash64>();
factory.registerFunction<FunctionSipHash128>();

View File

@ -193,6 +193,20 @@ struct SHA256Impl
SHA256_Final(out_char_data, &ctx);
}
};
struct SHA512Impl
{
static constexpr auto name = "SHA512";
enum { length = 64 };
static void apply(const char * begin, const size_t size, unsigned char * out_char_data)
{
SHA512_CTX ctx;
SHA512_Init(&ctx);
SHA512_Update(&ctx, reinterpret_cast<const unsigned char *>(begin), size);
SHA512_Final(out_char_data, &ctx);
}
};
#endif
struct SipHash64Impl
@ -1318,6 +1332,7 @@ using FunctionMD5 = FunctionStringHashFixedString<MD5Impl>;
using FunctionSHA1 = FunctionStringHashFixedString<SHA1Impl>;
using FunctionSHA224 = FunctionStringHashFixedString<SHA224Impl>;
using FunctionSHA256 = FunctionStringHashFixedString<SHA256Impl>;
using FunctionSHA512 = FunctionStringHashFixedString<SHA512Impl>;
#endif
using FunctionSipHash128 = FunctionStringHashFixedString<SipHash128Impl>;
using FunctionCityHash64 = FunctionAnyHash<ImplCityHash64>;

View File

@ -696,6 +696,8 @@ struct JSONExtractTree
{
if (element.isString())
return JSONExtractStringImpl<JSONParser>::insertResultToColumn(dest, element, {});
else if (element.isNull())
return false;
else
return JSONExtractRawImpl<JSONParser>::insertResultToColumn(dest, element, {});
}

View File

@ -0,0 +1,136 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{
class ExecutableFunctionGetServerPort : public IExecutableFunction
{
public:
explicit ExecutableFunctionGetServerPort(UInt16 port_) : port(port_) {}
String getName() const override { return "getServerPort"; }
bool useDefaultImplementationForNulls() const override { return false; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeNumber<UInt16>().createColumnConst(input_rows_count, port);
}
private:
UInt16 port;
};
class FunctionBaseGetServerPort : public IFunctionBase
{
public:
explicit FunctionBaseGetServerPort(bool is_distributed_, UInt16 port_, DataTypes argument_types_, DataTypePtr return_type_)
: is_distributed(is_distributed_), port(port_), argument_types(std::move(argument_types_)), return_type(std::move(return_type_))
{
}
String getName() const override { return "getServerPort"; }
const DataTypes & getArgumentTypes() const override
{
return argument_types;
}
const DataTypePtr & getResultType() const override
{
return return_type;
}
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &) const override
{
return std::make_unique<ExecutableFunctionGetServerPort>(port);
}
private:
bool is_distributed;
UInt16 port;
DataTypes argument_types;
DataTypePtr return_type;
};
class GetServerPortOverloadResolver : public IFunctionOverloadResolver, WithContext
{
public:
static constexpr auto name = "getServerPort";
String getName() const override { return name; }
static FunctionOverloadResolverPtr create(ContextPtr context_)
{
return std::make_unique<GetServerPortOverloadResolver>(context_);
}
explicit GetServerPortOverloadResolver(ContextPtr context_) : WithContext(context_) {}
size_t getNumberOfArguments() const override { return 1; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0}; }
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & data_types) const override
{
size_t number_of_arguments = data_types.size();
if (number_of_arguments != 1)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be 1",
getName(),
number_of_arguments);
return std::make_shared<DataTypeNumber<UInt16>>();
}
FunctionBasePtr buildImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & return_type) const override
{
if (!isString(arguments[0].type))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"The argument of function {} should be a constant string with the name of a setting",
getName());
const auto * column = arguments[0].column.get();
if (!column || !checkAndGetColumnConstStringOrFixedString(column))
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The argument of function {} should be a constant string with the name of a setting",
getName());
String port_name{column->getDataAt(0)};
auto port = getContext()->getServerPort(port_name);
DataTypes argument_types;
argument_types.emplace_back(arguments.back().type);
return std::make_unique<FunctionBaseGetServerPort>(getContext()->isDistributed(), port, argument_types, return_type);
}
};
}
void registerFunctionGetServerPort(FunctionFactory & factory)
{
factory.registerFunction<GetServerPortOverloadResolver>();
}
}

View File

@ -34,6 +34,19 @@ public:
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0}; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
auto value = getValue(arguments);
return applyVisitor(FieldToDataType{}, value);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
auto value = getValue(arguments);
return result_type->createColumnConst(input_rows_count, convertFieldToType(value, *result_type));
}
private:
Field getValue(const ColumnsWithTypeAndName & arguments) const
{
if (!isString(arguments[0].type))
throw Exception{"The argument of function " + String{name} + " should be a constant string with the name of a setting",
@ -44,20 +57,8 @@ public:
ErrorCodes::ILLEGAL_COLUMN};
std::string_view setting_name{column->getDataAt(0)};
value = getContext()->getSettingsRef().get(setting_name);
DataTypePtr type = applyVisitor(FieldToDataType{}, value);
value = convertFieldToType(value, *type);
return type;
return getContext()->getSettingsRef().get(setting_name);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr & result_type, size_t input_rows_count) const override
{
return result_type->createColumnConst(input_rows_count, value);
}
private:
mutable Field value;
};
}

View File

@ -71,6 +71,7 @@ void registerFunctionHasThreadFuzzer(FunctionFactory &);
void registerFunctionInitializeAggregation(FunctionFactory &);
void registerFunctionErrorCodeToName(FunctionFactory &);
void registerFunctionTcpPort(FunctionFactory &);
void registerFunctionGetServerPort(FunctionFactory &);
void registerFunctionByteSize(FunctionFactory &);
void registerFunctionFile(FunctionFactory & factory);
void registerFunctionConnectionId(FunctionFactory & factory);
@ -150,6 +151,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionInitializeAggregation(factory);
registerFunctionErrorCodeToName(factory);
registerFunctionTcpPort(factory);
registerFunctionGetServerPort(factory);
registerFunctionByteSize(factory);
registerFunctionFile(factory);
registerFunctionConnectionId(factory);

View File

@ -28,6 +28,8 @@ public:
GRPC = 3,
MYSQL = 4,
POSTGRESQL = 5,
LOCAL = 6,
TCP_INTERSERVER = 7,
};
enum class HTTPMethod : uint8_t

View File

@ -228,6 +228,8 @@ struct ContextSharedPart
ConfigurationPtr clusters_config; /// Stores updated configs
mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config
std::map<String, UInt16> server_ports;
bool shutdown_called = false;
Stopwatch uptime_watch;
@ -1798,13 +1800,27 @@ std::optional<UInt16> Context::getTCPPortSecure() const
return {};
}
void Context::registerServerPort(String port_name, UInt16 port)
{
shared->server_ports.emplace(std::move(port_name), port);
}
UInt16 Context::getServerPort(const String & port_name) const
{
auto it = shared->server_ports.find(port_name);
if (it == shared->server_ports.end())
throw Exception(ErrorCodes::BAD_GET, "There is no port named {}", port_name);
else
return it->second;
}
std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) const
{
auto res = getClusters()->getCluster(cluster_name);
if (res)
return res;
res = tryGetReplicatedDatabaseCluster(cluster_name);
if (!cluster_name.empty())
res = tryGetReplicatedDatabaseCluster(cluster_name);
if (res)
return res;

View File

@ -580,6 +580,11 @@ public:
std::optional<UInt16> getTCPPortSecure() const;
/// Register server ports during server starting up. No lock is held.
void registerServerPort(String port_name, UInt16 port);
UInt16 getServerPort(const String & port_name) const;
/// For methods below you may need to acquire the context lock by yourself.
ContextMutablePtr getQueryContext() const;

View File

@ -162,6 +162,36 @@ ExpressionAnalyzer::ExpressionAnalyzer(
analyzeAggregation();
}
static ASTPtr checkPositionalArgument(ASTPtr argument, const ASTSelectQuery * select_query, ASTSelectQuery::Expression expression)
{
auto columns = select_query->select()->children;
/// Case when GROUP BY element is position.
/// Do not consider case when GROUP BY element is not a literal, but expression, even if all values are constants.
if (const auto * ast_literal = typeid_cast<const ASTLiteral *>(argument.get()))
{
auto which = ast_literal->value.getType();
if (which == Field::Types::UInt64)
{
auto pos = ast_literal->value.get<UInt64>();
if (pos > 0 && pos <= columns.size())
{
const auto & column = columns[--pos];
if (const auto * literal_ast = typeid_cast<const ASTIdentifier *>(column.get()))
{
return std::make_shared<ASTIdentifier>(literal_ast->name());
}
else
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal value for positional argument in {}",
ASTSelectQuery::expressionToString(expression));
}
}
/// Do not throw if out of bounds, see appendUnusedGroupByColumn.
}
}
return nullptr;
}
void ExpressionAnalyzer::analyzeAggregation()
{
@ -238,13 +268,22 @@ void ExpressionAnalyzer::analyzeAggregation()
{
NameSet unique_keys;
ASTs & group_asts = select_query->groupBy()->children;
for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i)
{
ssize_t size = group_asts.size();
getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false);
if (getContext()->getSettingsRef().enable_positional_arguments)
{
auto new_argument = checkPositionalArgument(group_asts[i], select_query, ASTSelectQuery::Expression::GROUP_BY);
if (new_argument)
group_asts[i] = new_argument;
}
const auto & column_name = group_asts[i]->getColumnName();
const auto * node = temp_actions->tryFindInIndex(column_name);
if (!node)
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
@ -1223,11 +1262,20 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChai
bool with_fill = false;
NameSet order_by_keys;
for (auto & child : select_query->orderBy()->children)
{
const auto * ast = child->as<ASTOrderByElement>();
auto * ast = child->as<ASTOrderByElement>();
if (!ast || ast->children.empty())
throw Exception("Bad order expression AST", ErrorCodes::UNKNOWN_TYPE_OF_AST_NODE);
if (getContext()->getSettingsRef().enable_positional_arguments)
{
auto new_argument = checkPositionalArgument(ast->children.at(0), select_query, ASTSelectQuery::Expression::ORDER_BY);
if (new_argument)
ast->children[0] = new_argument;
}
ASTPtr order_expression = ast->children.at(0);
step.addRequiredOutput(order_expression->getColumnName());
@ -1277,8 +1325,16 @@ bool SelectQueryExpressionAnalyzer::appendLimitBy(ExpressionActionsChain & chain
aggregated_names.insert(column.name);
}
for (const auto & child : select_query->limitBy()->children)
auto & children = select_query->limitBy()->children;
for (auto & child : children)
{
if (getContext()->getSettingsRef().enable_positional_arguments)
{
auto new_argument = checkPositionalArgument(child, select_query, ASTSelectQuery::Expression::LIMIT_BY);
if (new_argument)
child = new_argument;
}
auto child_name = child->getColumnName();
if (!aggregated_names.count(child_name))
step.addRequiredOutput(std::move(child_name));

View File

@ -170,15 +170,18 @@ namespace
auto entity = access_control.tryRead(id);
if (auto role = typeid_cast<RolePtr>(entity))
{
checkGranteeIsAllowed(current_user_access, id, *role);
if (need_check_grantees_are_allowed)
checkGranteeIsAllowed(current_user_access, id, *role);
all_granted_access.makeUnion(role->access);
}
else if (auto user = typeid_cast<UserPtr>(entity))
{
checkGranteeIsAllowed(current_user_access, id, *user);
if (need_check_grantees_are_allowed)
checkGranteeIsAllowed(current_user_access, id, *user);
all_granted_access.makeUnion(user->access);
}
}
need_check_grantees_are_allowed = false; /// already checked
if (!elements_to_revoke.empty() && elements_to_revoke[0].is_partial_revoke)
@ -200,28 +203,6 @@ namespace
current_user_access.checkGrantOption(elements_to_revoke);
}
/// Checks if the current user has enough access rights granted with grant option to grant or revoke specified access rights.
/// Also checks if grantees are allowed for the current user.
void checkGrantOptionAndGrantees(
const AccessControlManager & access_control,
const ContextAccess & current_user_access,
const std::vector<UUID> & grantees_from_query,
const AccessRightsElements & elements_to_grant,
AccessRightsElements & elements_to_revoke)
{
bool need_check_grantees_are_allowed = true;
checkGrantOption(
access_control,
current_user_access,
grantees_from_query,
need_check_grantees_are_allowed,
elements_to_grant,
elements_to_revoke);
if (need_check_grantees_are_allowed)
checkGranteesAreAllowed(access_control, current_user_access, grantees_from_query);
}
/// Checks if the current user has enough roles granted with admin option to grant or revoke specified roles.
void checkAdminOption(
const AccessControlManager & access_control,
@ -262,18 +243,21 @@ namespace
auto entity = access_control.tryRead(id);
if (auto role = typeid_cast<RolePtr>(entity))
{
checkGranteeIsAllowed(current_user_access, id, *role);
if (need_check_grantees_are_allowed)
checkGranteeIsAllowed(current_user_access, id, *role);
all_granted_roles.makeUnion(role->granted_roles);
}
else if (auto user = typeid_cast<UserPtr>(entity))
{
checkGranteeIsAllowed(current_user_access, id, *user);
if (need_check_grantees_are_allowed)
checkGranteeIsAllowed(current_user_access, id, *user);
all_granted_roles.makeUnion(user->granted_roles);
}
}
const auto & all_granted_roles_set = admin_option ? all_granted_roles.getGrantedWithAdminOption() : all_granted_roles.getGranted();
need_check_grantees_are_allowed = false; /// already checked
const auto & all_granted_roles_set = admin_option ? all_granted_roles.getGrantedWithAdminOption() : all_granted_roles.getGranted();
if (roles_to_revoke.all)
boost::range::set_difference(all_granted_roles_set, roles_to_revoke.except_ids, std::back_inserter(roles_to_revoke_ids));
else
@ -283,28 +267,45 @@ namespace
current_user_access.checkAdminOption(roles_to_revoke_ids);
}
/// Checks if the current user has enough roles granted with admin option to grant or revoke specified roles.
/// Also checks if grantees are allowed for the current user.
void checkAdminOptionAndGrantees(
const AccessControlManager & access_control,
const ContextAccess & current_user_access,
const std::vector<UUID> & grantees_from_query,
const std::vector<UUID> & roles_to_grant,
RolesOrUsersSet & roles_to_revoke,
bool admin_option)
/// Returns access rights which should be checked for executing GRANT/REVOKE on cluster.
/// This function is less accurate than checkGrantOption() because it cannot use any information about
/// access rights the grantees currently have (due to those grantees are located on multiple nodes,
/// we just don't have the full information about them).
AccessRightsElements getRequiredAccessForExecutingOnCluster(const AccessRightsElements & elements_to_grant, const AccessRightsElements & elements_to_revoke)
{
bool need_check_grantees_are_allowed = true;
checkAdminOption(
access_control,
current_user_access,
grantees_from_query,
need_check_grantees_are_allowed,
roles_to_grant,
roles_to_revoke,
admin_option);
auto required_access = elements_to_grant;
required_access.insert(required_access.end(), elements_to_revoke.begin(), elements_to_revoke.end());
std::for_each(required_access.begin(), required_access.end(), [&](AccessRightsElement & element) { element.grant_option = true; });
return required_access;
}
if (need_check_grantees_are_allowed)
checkGranteesAreAllowed(access_control, current_user_access, grantees_from_query);
/// Checks if the current user has enough roles granted with admin option to grant or revoke specified roles on cluster.
/// This function is less accurate than checkAdminOption() because it cannot use any information about
/// granted roles the grantees currently have (due to those grantees are located on multiple nodes,
/// we just don't have the full information about them).
void checkAdminOptionForExecutingOnCluster(const ContextAccess & current_user_access,
const std::vector<UUID> roles_to_grant,
const RolesOrUsersSet & roles_to_revoke)
{
if (roles_to_revoke.all)
{
/// Revoking all the roles on cluster always requires ROLE_ADMIN privilege
/// because when we send the query REVOKE ALL to each shard we don't know at this point
/// which roles exactly this is going to revoke on each shard.
/// However ROLE_ADMIN just allows to revoke every role, that's why we check it here.
current_user_access.checkAccess(AccessType::ROLE_ADMIN);
return;
}
if (current_user_access.isGranted(AccessType::ROLE_ADMIN))
return;
for (const auto & role_id : roles_to_grant)
current_user_access.checkAdminOption(role_id);
for (const auto & role_id : roles_to_revoke.getMatchingIDs())
current_user_access.checkAdminOption(role_id);
}
template <typename T>
@ -382,29 +383,39 @@ BlockIO InterpreterGrantQuery::execute()
throw Exception("A partial revoke should be revoked, not granted", ErrorCodes::LOGICAL_ERROR);
auto & access_control = getContext()->getAccessControlManager();
auto current_user_access = getContext()->getAccess();
std::vector<UUID> grantees = RolesOrUsersSet{*query.grantees, access_control, getContext()->getUserID()}.getMatchingIDs(access_control);
/// Check if the current user has corresponding roles granted with admin option.
/// Collect access rights and roles we're going to grant or revoke.
AccessRightsElements elements_to_grant, elements_to_revoke;
collectAccessRightsElementsToGrantOrRevoke(query, elements_to_grant, elements_to_revoke);
std::vector<UUID> roles_to_grant;
RolesOrUsersSet roles_to_revoke;
collectRolesToGrantOrRevoke(access_control, query, roles_to_grant, roles_to_revoke);
checkAdminOptionAndGrantees(access_control, *getContext()->getAccess(), grantees, roles_to_grant, roles_to_revoke, query.admin_option);
/// Executing on cluster.
if (!query.cluster.empty())
{
/// To execute the command GRANT the current user needs to have the access granted with GRANT OPTION.
auto required_access = query.access_rights_elements;
std::for_each(required_access.begin(), required_access.end(), [&](AccessRightsElement & element) { element.grant_option = true; });
checkGranteesAreAllowed(access_control, *getContext()->getAccess(), grantees);
auto required_access = getRequiredAccessForExecutingOnCluster(elements_to_grant, elements_to_revoke);
checkAdminOptionForExecutingOnCluster(*current_user_access, roles_to_grant, roles_to_revoke);
checkGranteesAreAllowed(access_control, *current_user_access, grantees);
return executeDDLQueryOnCluster(query_ptr, getContext(), std::move(required_access));
}
query.replaceEmptyDatabase(getContext()->getCurrentDatabase());
/// Check if the current user has corresponding access rights granted with grant option.
String current_database = getContext()->getCurrentDatabase();
elements_to_grant.replaceEmptyDatabase(current_database);
elements_to_revoke.replaceEmptyDatabase(current_database);
bool need_check_grantees_are_allowed = true;
checkGrantOption(access_control, *current_user_access, grantees, need_check_grantees_are_allowed, elements_to_grant, elements_to_revoke);
/// Check if the current user has corresponding access rights with grant option.
AccessRightsElements elements_to_grant, elements_to_revoke;
collectAccessRightsElementsToGrantOrRevoke(query, elements_to_grant, elements_to_revoke);
checkGrantOptionAndGrantees(access_control, *getContext()->getAccess(), grantees, elements_to_grant, elements_to_revoke);
/// Check if the current user has corresponding roles granted with admin option.
checkAdminOption(access_control, *current_user_access, grantees, need_check_grantees_are_allowed, roles_to_grant, roles_to_revoke, query.admin_option);
if (need_check_grantees_are_allowed)
checkGranteesAreAllowed(access_control, *current_user_access, grantees);
/// Update roles and users listed in `grantees`.
auto update_func = [&](const AccessEntityPtr & entity) -> AccessEntityPtr

View File

@ -2080,7 +2080,9 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set || (keys.empty() && query_analyzer->hasConstAggregationKeys()),
settings.empty_result_for_aggregation_by_empty_set
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
&& query_analyzer->hasConstAggregationKeys()),
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,

View File

@ -276,10 +276,14 @@ void Session::authenticate(const Credentials & credentials_, const Poco::Net::So
if (session_context)
throw Exception("If there is a session context it must be created after authentication", ErrorCodes::LOGICAL_ERROR);
user_id = global_context->getAccessControlManager().login(credentials_, address_.host());
auto address = address_;
if ((address == Poco::Net::SocketAddress{}) && (prepared_client_info->interface == ClientInfo::Interface::LOCAL))
address = Poco::Net::SocketAddress{"127.0.0.1", 0};
user_id = global_context->getAccessControlManager().login(credentials_, address.host());
prepared_client_info->current_user = credentials_.getUserName();
prepared_client_info->current_address = address_;
prepared_client_info->current_address = address;
#if defined(ARCADIA_BUILD)
/// This is harmful field that is used only in foreign "Arcadia" build.

View File

@ -69,7 +69,9 @@ const std::unordered_set<String> possibly_injective_function_names
void appendUnusedGroupByColumn(ASTSelectQuery * select_query, const NameSet & source_columns)
{
/// You must insert a constant that is not the name of the column in the table. Such a case is rare, but it happens.
UInt64 unused_column = 0;
/// Also start unused_column integer from source_columns.size() + 1, because lower numbers ([1, source_columns.size()])
/// might be in positional GROUP BY.
UInt64 unused_column = source_columns.size() + 1;
String unused_column_name = toString(unused_column);
while (source_columns.count(unused_column_name))
@ -111,6 +113,8 @@ void optimizeGroupBy(ASTSelectQuery * select_query, const NameSet & source_colum
group_exprs.pop_back();
};
const auto & settings = context->getSettingsRef();
/// iterate over each GROUP BY expression, eliminate injective function calls and literals
for (size_t i = 0; i < group_exprs.size();)
{
@ -166,7 +170,22 @@ void optimizeGroupBy(ASTSelectQuery * select_query, const NameSet & source_colum
}
else if (is_literal(group_exprs[i]))
{
remove_expr_at_index(i);
bool keep_position = false;
if (settings.enable_positional_arguments)
{
const auto & value = group_exprs[i]->as<ASTLiteral>()->value;
if (value.getType() == Field::Types::UInt64)
{
auto pos = value.get<UInt64>();
if (pos > 0 && pos <= select_query->children.size())
keep_position = true;
}
}
if (keep_position)
++i;
else
remove_expr_at_index(i);
}
else
{

View File

@ -7,6 +7,7 @@
#include <Common/quoteString.h>
#include <IO/Operators.h>
#include <re2/re2.h>
#include <stack>
namespace DB
@ -40,10 +41,18 @@ void ASTColumnsApplyTransformer::formatImpl(const FormatSettings & settings, For
if (!column_name_prefix.empty())
settings.ostr << "(";
settings.ostr << func_name;
if (parameters)
parameters->formatImpl(settings, state, frame);
if (lambda)
{
lambda->formatImpl(settings, state, frame);
}
else
{
settings.ostr << func_name;
if (parameters)
parameters->formatImpl(settings, state, frame);
}
if (!column_name_prefix.empty())
settings.ostr << ", '" << column_name_prefix << "')";
@ -64,9 +73,33 @@ void ASTColumnsApplyTransformer::transform(ASTs & nodes) const
else
name = column->getColumnName();
}
auto function = makeASTFunction(func_name, column);
function->parameters = parameters;
column = function;
if (lambda)
{
auto body = lambda->as<const ASTFunction &>().arguments->children.at(1)->clone();
std::stack<ASTPtr> stack;
stack.push(body);
while (!stack.empty())
{
auto ast = stack.top();
stack.pop();
for (auto & child : ast->children)
{
if (auto arg_name = tryGetIdentifierName(child); arg_name && arg_name == lambda_arg)
{
child = column->clone();
continue;
}
stack.push(child);
}
}
column = body;
}
else
{
auto function = makeASTFunction(func_name, column);
function->parameters = parameters;
column = function;
}
if (!column_name_prefix.empty())
column->setAlias(column_name_prefix + name);
}

View File

@ -25,13 +25,22 @@ public:
auto res = std::make_shared<ASTColumnsApplyTransformer>(*this);
if (parameters)
res->parameters = parameters->clone();
if (lambda)
res->lambda = lambda->clone();
return res;
}
void transform(ASTs & nodes) const override;
// Case 1 APPLY (quantile(0.9))
String func_name;
String column_name_prefix;
ASTPtr parameters;
// Case 2 APPLY (x -> quantile(0.9)(x))
ASTPtr lambda;
String lambda_arg;
String column_name_prefix;
protected:
void formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override;
};

View File

@ -35,6 +35,44 @@ public:
SETTINGS
};
static String expressionToString(Expression expr)
{
switch (expr)
{
case Expression::WITH:
return "WITH";
case Expression::SELECT:
return "SELECT";
case Expression::TABLES:
return "TABLES";
case Expression::PREWHERE:
return "PREWHERE";
case Expression::WHERE:
return "WHERE";
case Expression::GROUP_BY:
return "GROUP BY";
case Expression::HAVING:
return "HAVING";
case Expression::WINDOW:
return "WINDOW";
case Expression::ORDER_BY:
return "ORDER BY";
case Expression::LIMIT_BY_OFFSET:
return "LIMIT BY OFFSET";
case Expression::LIMIT_BY_LENGTH:
return "LIMIT BY LENGTH";
case Expression::LIMIT_BY:
return "LIMIT BY";
case Expression::LIMIT_OFFSET:
return "LIMIT OFFSET";
case Expression::LIMIT_LENGTH:
return "LIMIT LENGTH";
case Expression::SETTINGS:
return "SETTINGS";
}
return "";
}
/** Get the text that identifies this element. */
String getID(char) const override { return "SelectQuery"; }

View File

@ -12,3 +12,7 @@ endif ()
if(ENABLE_EXAMPLES)
add_subdirectory(examples)
endif()
if (ENABLE_FUZZING)
add_subdirectory(fuzzers)
endif()

View File

@ -1827,20 +1827,47 @@ bool ParserColumnsTransformers::parseImpl(Pos & pos, ASTPtr & node, Expected & e
with_open_round_bracket = true;
}
ASTPtr lambda;
String lambda_arg;
ASTPtr func_name;
if (!ParserIdentifier().parse(pos, func_name, expected))
return false;
ASTPtr expr_list_args;
if (pos->type == TokenType::OpeningRoundBracket)
auto opos = pos;
if (ParserLambdaExpression().parse(pos, lambda, expected))
{
++pos;
if (!ParserExpressionList(false).parse(pos, expr_list_args, expected))
if (const auto * func = lambda->as<ASTFunction>(); func && func->name == "lambda")
{
const auto * lambda_args_tuple = func->arguments->children.at(0)->as<ASTFunction>();
const ASTs & lambda_arg_asts = lambda_args_tuple->arguments->children;
if (lambda_arg_asts.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "APPLY column transformer can only accept lambda with one argument");
if (auto opt_arg_name = tryGetIdentifierName(lambda_arg_asts[0]); opt_arg_name)
lambda_arg = *opt_arg_name;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "lambda argument declarations must be identifiers");
}
else
{
lambda = nullptr;
pos = opos;
}
}
if (!lambda)
{
if (!ParserIdentifier().parse(pos, func_name, expected))
return false;
if (pos->type != TokenType::ClosingRoundBracket)
return false;
++pos;
if (pos->type == TokenType::OpeningRoundBracket)
{
++pos;
if (!ParserExpressionList(false).parse(pos, expr_list_args, expected))
return false;
if (pos->type != TokenType::ClosingRoundBracket)
return false;
++pos;
}
}
String column_name_prefix;
@ -1864,8 +1891,16 @@ bool ParserColumnsTransformers::parseImpl(Pos & pos, ASTPtr & node, Expected & e
}
auto res = std::make_shared<ASTColumnsApplyTransformer>();
res->func_name = getIdentifierName(func_name);
res->parameters = expr_list_args;
if (lambda)
{
res->lambda = lambda;
res->lambda_arg = lambda_arg;
}
else
{
res->func_name = getIdentifierName(func_name);
res->parameters = expr_list_args;
}
res->column_name_prefix = column_name_prefix;
node = std::move(res);
return true;

View File

@ -8,14 +8,3 @@ target_link_libraries(select_parser PRIVATE clickhouse_parsers)
add_executable(create_parser create_parser.cpp ${SRCS})
target_link_libraries(create_parser PRIVATE clickhouse_parsers)
if (ENABLE_FUZZING)
add_executable(lexer_fuzzer lexer_fuzzer.cpp ${SRCS})
target_link_libraries(lexer_fuzzer PRIVATE clickhouse_parsers ${LIB_FUZZING_ENGINE})
add_executable(select_parser_fuzzer select_parser_fuzzer.cpp ${SRCS})
target_link_libraries(select_parser_fuzzer PRIVATE clickhouse_parsers ${LIB_FUZZING_ENGINE})
add_executable(create_parser_fuzzer create_parser_fuzzer.cpp ${SRCS})
target_link_libraries(create_parser_fuzzer PRIVATE clickhouse_parsers ${LIB_FUZZING_ENGINE})
endif ()

View File

@ -0,0 +1,8 @@
add_executable(lexer_fuzzer lexer_fuzzer.cpp ${SRCS})
target_link_libraries(lexer_fuzzer PRIVATE clickhouse_parsers ${LIB_FUZZING_ENGINE})
add_executable(select_parser_fuzzer select_parser_fuzzer.cpp ${SRCS})
target_link_libraries(select_parser_fuzzer PRIVATE clickhouse_parsers ${LIB_FUZZING_ENGINE})
add_executable(create_parser_fuzzer create_parser_fuzzer.cpp ${SRCS})
target_link_libraries(create_parser_fuzzer PRIVATE clickhouse_parsers ${LIB_FUZZING_ENGINE})

View File

@ -15,7 +15,10 @@ try
DB::ParserCreateQuery parser;
DB::ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 0);
DB::formatAST(*ast, std::cerr);
DB::WriteBufferFromOwnString wb;
DB::formatAST(*ast, wb);
std::cerr << wb.str() << std::endl;
return 0;
}

View File

@ -14,7 +14,10 @@ try
DB::ParserQueryWithOutput parser(input.data() + input.size());
DB::ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 0);
DB::formatAST(*ast, std::cerr);
DB::WriteBufferFromOwnString wb;
DB::formatAST(*ast, wb);
std::cerr << wb.str() << std::endl;
return 0;
}

View File

@ -979,6 +979,7 @@ void TCPHandler::receiveHello()
is_interserver_mode = (user == USER_INTERSERVER_MARKER);
if (is_interserver_mode)
{
client_info.interface = ClientInfo::Interface::TCP_INTERSERVER;
receiveClusterNameAndSalt();
return;
}

View File

@ -679,6 +679,9 @@ bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
while (true)
{
if (from->equals(*to))
return true;
auto it_range = ALLOWED_CONVERSIONS.equal_range(typeid(*from));
for (auto it = it_range.first; it != it_range.second; ++it)
{
@ -697,9 +700,9 @@ bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
const auto * nullable_from = typeid_cast<const DataTypeNullable *>(from);
const auto * nullable_to = typeid_cast<const DataTypeNullable *>(to);
if (nullable_from && nullable_to)
if (nullable_to)
{
from = nullable_from->getNestedType().get();
from = nullable_from ? nullable_from->getNestedType().get() : from;
to = nullable_to->getNestedType().get();
continue;
}

View File

@ -1,6 +1,10 @@
add_subdirectory(MergeTree)
add_subdirectory(System)
if(ENABLE_EXAMPLES)
if (ENABLE_EXAMPLES)
add_subdirectory(examples)
endif()
if (ENABLE_FUZZING)
add_subdirectory(fuzzers)
endif()

View File

@ -327,11 +327,13 @@ StorageDistributed::StorageDistributed(
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach_,
ClusterPtr owned_cluster_)
ClusterPtr owned_cluster_,
ASTPtr remote_table_function_ptr_)
: IStorage(id_)
, WithContext(context_->getGlobalContext())
, remote_database(remote_database_)
, remote_table(remote_table_)
, remote_table_function_ptr(remote_table_function_ptr_)
, log(&Poco::Logger::get("StorageDistributed (" + id_.table_name + ")"))
, owned_cluster(std::move(owned_cluster_))
, cluster_name(getContext()->getMacros()->expand(cluster_name_))
@ -363,10 +365,13 @@ StorageDistributed::StorageDistributed(
}
/// Sanity check. Skip check if the table is already created to allow the server to start.
if (!attach_ && !cluster_name.empty())
if (!attach_)
{
size_t num_local_shards = getContext()->getCluster(cluster_name)->getLocalShardCount();
if (num_local_shards && remote_database == id_.database_name && remote_table == id_.table_name)
if (remote_database.empty() && !remote_table_function_ptr && !getCluster()->maybeCrossReplication())
LOG_WARNING(log, "Name of remote database is empty. Default database will be used implicitly.");
size_t num_local_shards = getCluster()->getLocalShardCount();
if (num_local_shards && (remote_database.empty() || remote_database == id_.database_name) && remote_table == id_.table_name)
throw Exception("Distributed table " + id_.table_name + " looks at itself", ErrorCodes::INFINITE_LOOP);
}
}
@ -399,9 +404,9 @@ StorageDistributed::StorageDistributed(
relative_data_path_,
distributed_settings_,
attach,
std::move(owned_cluster_))
std::move(owned_cluster_),
remote_table_function_ptr_)
{
remote_table_function_ptr = std::move(remote_table_function_ptr_);
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
@ -810,9 +815,6 @@ void StorageDistributed::alter(const AlterCommands & params, ContextPtr local_co
void StorageDistributed::startup()
{
if (remote_database.empty() && !remote_table_function_ptr && !getCluster()->maybeCrossReplication())
LOG_WARNING(log, "Name of remote database is empty. Default database will be used implicitly.");
if (!storage_policy)
return;

View File

@ -136,7 +136,8 @@ private:
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach_,
ClusterPtr owned_cluster_ = {});
ClusterPtr owned_cluster_ = {},
ASTPtr remote_table_function_ptr_ = {});
StorageDistributed(
const StorageID & id_,

View File

@ -49,6 +49,7 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_FSTAT;
extern const int CANNOT_TRUNCATE_FILE;
extern const int DATABASE_ACCESS_DENIED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
@ -164,6 +165,12 @@ bool StorageFile::isColumnOriented() const
StorageFile::StorageFile(int table_fd_, CommonArguments args)
: StorageFile(args)
{
struct stat buf;
int res = fstat(table_fd_, &buf);
if (-1 == res)
throwFromErrno("Cannot execute fstat", res, ErrorCodes::CANNOT_FSTAT);
total_bytes_to_read = buf.st_size;
if (args.getContext()->getApplicationType() == Context::ApplicationType::SERVER)
throw Exception("Using file descriptor as source of storage isn't allowed for server daemons", ErrorCodes::DATABASE_ACCESS_DENIED);
if (args.format_name == "Distributed")
@ -208,6 +215,8 @@ StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArgu
String table_dir_path = fs::path(base_path) / relative_table_dir_path / "";
fs::create_directories(table_dir_path);
paths = {getTablePath(table_dir_path, format_name)};
if (fs::exists(paths[0]))
total_bytes_to_read = fs::file_size(paths[0]);
}
StorageFile::StorageFile(CommonArguments args)

View File

@ -23,10 +23,3 @@ target_link_libraries (transform_part_zk_nodes
string_utils
)
if (ENABLE_FUZZING)
add_executable (mergetree_checksum_fuzzer mergetree_checksum_fuzzer.cpp)
target_link_libraries (mergetree_checksum_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})
add_executable (columns_description_fuzzer columns_description_fuzzer.cpp)
target_link_libraries (columns_description_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})
endif ()

View File

@ -0,0 +1,11 @@
add_executable (mergetree_checksum_fuzzer
mergetree_checksum_fuzzer.cpp
"${ClickHouse_SOURCE_DIR}/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp"
"${ClickHouse_SOURCE_DIR}/src/Compression/CompressedReadBuffer.cpp"
"${ClickHouse_SOURCE_DIR}/src/Compression/CompressedWriteBuffer.cpp"
)
target_link_libraries (mergetree_checksum_fuzzer PRIVATE clickhouse_common_io fuzz_compression ${LIB_FUZZING_ENGINE})
add_executable (columns_description_fuzzer columns_description_fuzzer.cpp)
target_link_libraries (columns_description_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})

View File

@ -8,6 +8,13 @@
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
</s3>
<unstable_s3>
<type>s3</type>
<endpoint>http://resolver:8081/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_read_retries>10</s3_max_single_read_retries>
</unstable_s3>
<hdd>
<type>local</type>
<path>/</path>
@ -24,6 +31,13 @@
</external>
</volumes>
</s3>
<unstable_s3>
<volumes>
<main>
<disk>unstable_s3</disk>
</main>
</volumes>
</unstable_s3>
</policies>
</storage_configuration>

View File

@ -0,0 +1,64 @@
import http.client
import http.server
import random
import socketserver
import sys
import urllib.parse
UPSTREAM_HOST = "minio1:9001"
random.seed("Unstable proxy/1.0")
def request(command, url, headers={}, data=None):
""" Mini-requests. """
class Dummy:
pass
parts = urllib.parse.urlparse(url)
c = http.client.HTTPConnection(parts.hostname, parts.port)
c.request(command, urllib.parse.urlunparse(parts._replace(scheme='', netloc='')), headers=headers, body=data)
r = c.getresponse()
result = Dummy()
result.status_code = r.status
result.headers = r.headers
result.content = r.read()
return result
class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
else:
self.do_HEAD()
def do_PUT(self):
self.do_HEAD()
def do_POST(self):
self.do_HEAD()
def do_HEAD(self):
content_length = self.headers.get("Content-Length")
data = self.rfile.read(int(content_length)) if content_length else None
r = request(self.command, f"http://{UPSTREAM_HOST}{self.path}", headers=self.headers, data=data)
self.send_response(r.status_code)
for k, v in r.headers.items():
self.send_header(k, v)
self.end_headers()
if random.random() < 0.25 and len(r.content) > 1024*1024:
r.content = r.content[:len(r.content)//2]
self.wfile.write(r.content)
self.wfile.close()
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
"""Handle requests in a separate thread."""
httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()

View File

@ -54,6 +54,7 @@ def cluster():
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
run_s3_mocks(cluster)
yield cluster
finally:
@ -77,11 +78,17 @@ def generate_values(date_str, count, sign=1):
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
def create_table(cluster, table_name, additional_settings=None):
def create_table(cluster, table_name, **additional_settings):
node = cluster.instances["node"]
settings = {
"storage_policy": "s3",
"old_parts_lifetime": 0,
"index_granularity": 512
}
settings.update(additional_settings)
create_table_statement = """
CREATE TABLE {} (
create_table_statement = f"""
CREATE TABLE {table_name} (
dt Date,
id Int64,
data String,
@ -89,19 +96,40 @@ def create_table(cluster, table_name, additional_settings=None):
) ENGINE=MergeTree()
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS
storage_policy='s3',
old_parts_lifetime=0,
index_granularity=512
""".format(table_name)
if additional_settings:
create_table_statement += ","
create_table_statement += additional_settings
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}"""
node.query(create_table_statement)
def run_s3_mocks(cluster):
logging.info("Starting s3 mocks")
mocks = (
("unstable_proxy.py", "resolver", "8081"),
)
for mock_filename, container, port in mocks:
container_id = cluster.get_container_id(container)
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mocks", mock_filename), mock_filename)
cluster.exec_in_container(container_id, ["python", mock_filename, port], detach=True)
# Wait for S3 mocks to start
for mock_filename, container, port in mocks:
num_attempts = 100
for attempt in range(num_attempts):
ping_response = cluster.exec_in_container(cluster.get_container_id(container),
["curl", "-s", f"http://localhost:{port}/"], nothrow=True)
if ping_response != "OK":
if attempt == num_attempts - 1:
assert ping_response == "OK", f'Expected "OK", but got "{ping_response}"'
else:
time.sleep(1)
else:
logging.debug(f"mock {mock_filename} ({port}) answered {ping_response} on attempt {attempt}")
break
logging.info("S3 mocks started")
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
minio = cluster.minio_client
while timeout > 0:
@ -136,7 +164,7 @@ def drop_table(cluster):
]
)
def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part):
create_table(cluster, "s3_test", additional_settings="min_rows_for_wide_part={}".format(min_rows_for_wide_part))
create_table(cluster, "s3_test", min_rows_for_wide_part=min_rows_for_wide_part)
node = cluster.instances["node"]
minio = cluster.minio_client
@ -158,13 +186,12 @@ def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part):
"merge_vertical", [False, True]
)
def test_insert_same_partition_and_merge(cluster, merge_vertical):
settings = None
settings = {}
if merge_vertical:
settings = """
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0
"""
create_table(cluster, "s3_test", additional_settings=settings)
settings['vertical_merge_algorithm_min_rows_to_activate'] = 0
settings['vertical_merge_algorithm_min_columns_to_activate'] = 0
create_table(cluster, "s3_test", **settings)
node = cluster.instances["node"]
minio = cluster.minio_client
@ -459,3 +486,13 @@ def test_s3_disk_restart_during_load(cluster):
for thread in threads:
thread.join()
def test_s3_disk_reads_on_unstable_connection(cluster):
create_table(cluster, "s3_test", storage_policy='unstable_s3')
node = cluster.instances["node"]
node.query("INSERT INTO s3_test SELECT today(), *, toString(*) FROM system.numbers LIMIT 9000000")
for i in range(30):
print(f"Read sequence {i}")
assert node.query("SELECT sum(id) FROM s3_test").splitlines() == ["40499995500000"]

View File

@ -37,6 +37,7 @@
[5, 2]
[6, 1]
[7, 1]
[1]
[1, 2]
[2, 2]
[3, 0]

View File

@ -43,7 +43,7 @@ drop table if exists funnel_test_strict;
create table funnel_test_strict (timestamp UInt32, event UInt32) engine=Memory;
insert into funnel_test_strict values (00,1000),(10,1001),(20,1002),(30,1003),(40,1004),(50,1005),(51,1005),(60,1006),(70,1007),(80,1008);
select 6 = windowFunnel(10000, 'strict')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004, event = 1005, event = 1006) from funnel_test_strict;
select 6 = windowFunnel(10000, 'strict_deduplication')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004, event = 1005, event = 1006) from funnel_test_strict;
select 7 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004, event = 1005, event = 1006) from funnel_test_strict;
@ -62,11 +62,18 @@ insert into funnel_test_strict_order values (1, 5, 'a') (2, 5, 'a') (3, 5, 'b')
insert into funnel_test_strict_order values (1, 6, 'c') (2, 6, 'c') (3, 6, 'b') (4, 6, 'b') (5, 6, 'a') (6, 6, 'a');
select user, windowFunnel(86400)(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
select user, windowFunnel(86400, 'strict_order')(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
select user, windowFunnel(86400, 'strict', 'strict_order')(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
select user, windowFunnel(86400, 'strict_deduplication', 'strict_order')(dt, event='a', event='b', event='c') as s from funnel_test_strict_order group by user order by user format JSONCompactEachRow;
insert into funnel_test_strict_order values (1, 7, 'a') (2, 7, 'c') (3, 7, 'b');
select user, windowFunnel(10, 'strict_order')(dt, event = 'a', event = 'b', event = 'c') as s from funnel_test_strict_order where user = 7 group by user format JSONCompactEachRow;
drop table funnel_test_strict_order;
--https://github.com/ClickHouse/ClickHouse/issues/27469
drop table if exists strict_BiteTheDDDD;
create table strict_BiteTheDDDD (ts UInt64, event String) engine = Log();
insert into strict_BiteTheDDDD values (1,'a') (2,'b') (3,'c') (4,'b') (5,'d');
select 3 = windowFunnel(86400, 'strict_deduplication')(ts, event='a', event='b', event='c', event='d') from strict_BiteTheDDDD format JSONCompactEachRow;
drop table strict_BiteTheDDDD;
drop table if exists funnel_test_non_null;
create table funnel_test_non_null (`dt` DateTime, `u` int, `a` Nullable(String), `b` Nullable(String)) engine = MergeTree() partition by dt order by u;
insert into funnel_test_non_null values (1, 1, 'a1', 'b1') (2, 1, 'a2', 'b2');

View File

@ -16,3 +16,5 @@
['2016-06-15 23:00:16']
2016-04-02 17:23:12
['2016-04-02 17:23:12']
2016-04-02 17:23:12
['2016-04-02 17:23:12']

View File

@ -30,4 +30,7 @@ SELECT quantilesTDigestWeighted(0.2)(d, 1) FROM datetime;
SELECT quantileBFloat16(0.2)(d) FROM datetime;
SELECT quantilesBFloat16(0.2)(d) FROM datetime;
SELECT quantileBFloat16Weighted(0.2)(d, 1) FROM datetime;
SELECT quantilesBFloat16Weighted(0.2)(d, 1) FROM datetime;
DROP TABLE datetime;

View File

@ -4,8 +4,7 @@ DROP TABLE IF EXISTS distr2;
CREATE TABLE distr (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr); -- { serverError 269 }
CREATE TABLE distr0 (x UInt8) ENGINE = Distributed(test_shard_localhost, '', distr0);
SELECT * FROM distr0; -- { serverError 581 }
CREATE TABLE distr0 (x UInt8) ENGINE = Distributed(test_shard_localhost, '', distr0); -- { serverError 269 }
CREATE TABLE distr1 (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr2);
CREATE TABLE distr2 (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr1);
@ -13,6 +12,5 @@ CREATE TABLE distr2 (x UInt8) ENGINE = Distributed(test_shard_localhost, current
SELECT * FROM distr1; -- { serverError 581 }
SELECT * FROM distr2; -- { serverError 581 }
DROP TABLE distr0;
DROP TABLE distr1;
DROP TABLE distr2;

View File

@ -1 +1,4 @@
100 10 324 120.00 B 8.00 B 23.00 B
0
SELECT argMax(number, number)
FROM numbers(1)

View File

@ -5,3 +5,6 @@ INSERT INTO columns_transformers VALUES (100, 10, 324, 120, 8, 23);
SELECT * EXCEPT 'bytes', COLUMNS('bytes') APPLY formatReadableSize FROM columns_transformers;
DROP TABLE IF EXISTS columns_transformers;
SELECT * APPLY x->argMax(x, number) FROM numbers(1);
EXPLAIN SYNTAX SELECT * APPLY x->argMax(x, number) FROM numbers(1);

View File

@ -9,7 +9,9 @@ CREATE TABLE tt6
`status` String
)
ENGINE = Distributed('test_shard_localhost', '', 'tt6', rand());
ENGINE = Distributed('test_shard_localhost', '', 'tt7', rand());
CREATE TABLE tt7 as tt6 ENGINE = Distributed('test_shard_localhost', '', 'tt6', rand());
INSERT INTO tt6 VALUES (1, 1, 1, 1, 'ok'); -- { serverError 581 }

View File

@ -1 +1,2 @@
('123','456','[7,8,9]')
\N

View File

@ -1 +1,3 @@
select JSONExtract('{"a": "123", "b": 456, "c": [7, 8, 9]}', 'Tuple(a String, b String, c String)');
with '{"string_value":null}' as json select JSONExtract(json, 'string_value', 'Nullable(String)');

View File

@ -1,2 +1,4 @@
SELECT 1 as a, count() FROM numbers(10) WHERE 0 GROUP BY a;
SELECT count() FROM numbers(10) WHERE 0
SELECT 1 as a, count() FROM numbers(10) WHERE 0 GROUP BY a;
SELECT count() FROM numbers(10) WHERE 0;
SELECT 1 as a, count() FROM numbers(10) WHERE 0 GROUP BY a SETTINGS empty_result_for_aggregation_by_constant_keys_on_empty_set = 0;

View File

@ -0,0 +1,50 @@
-- { echo }
select x3, x2, x1 from test order by 1;
1 100 100
10 1 10
100 10 1
select x3, x2, x1 from test order by x3;
1 100 100
10 1 10
100 10 1
select x3, x2, x1 from test order by 1 desc;
100 10 1
10 1 10
1 100 100
select x3, x2, x1 from test order by x3 desc;
100 10 1
10 1 10
1 100 100
insert into test values (1, 10, 200), (10, 1, 200), (100, 100, 1);
select x3, x2 from test group by x3, x2;
200 1
10 1
200 10
1 100
100 10
select x3, x2 from test group by 1, 2;
200 1
10 1
200 10
1 100
100 10
select x1, x2, x3 from test order by x3 limit 1 by x3;
100 100 1
10 1 10
1 10 100
1 10 200
select x1, x2, x3 from test order by 3 limit 1 by 3;
100 100 1
10 1 10
1 10 100
1 10 200
select x1, x2, x3 from test order by x3 limit 1 by x1;
100 100 1
10 1 10
1 10 100
select x1, x2, x3 from test order by 3 limit 1 by 1;
100 100 1
10 1 10
1 10 100
select max(x3), max(x2), max(x1) from test group by 1; -- { serverError 43 }
select max(x1) from test order by 1; -- { serverError 43 }

Some files were not shown because too many files have changed in this diff Show More