Merge branch 'master' into support-apple-m1

This commit is contained in:
Maksim Kita 2021-04-16 23:33:12 +03:00 committed by GitHub
commit 4be441c6d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
67 changed files with 1117 additions and 231 deletions

View File

@ -37,9 +37,9 @@ if (OS_LINUX)
# avoid spurious latencies and additional work associated with
# MADV_DONTNEED. See
# https://github.com/ClickHouse/ClickHouse/issues/11121 for motivation.
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:10000")
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000")
else()
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:10000")
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000")
endif()
# CACHE variable is empty, to allow changing defaults without necessity
# to purge cache

View File

@ -18,11 +18,17 @@ Engine parameters:
- `num_layers` Parallelism layer. Physically, the table will be represented as `num_layers` of independent buffers. Recommended value: 16.
- `min_time`, `max_time`, `min_rows`, `max_rows`, `min_bytes`, and `max_bytes` Conditions for flushing data from the buffer.
Optional engine parameters:
- `flush_time`, `flush_rows`, `flush_bytes` Conditions for flushing data from the buffer, that will happen only in background (ommited or zero means no `flush*` parameters).
Data is flushed from the buffer and written to the destination table if all the `min*` conditions or at least one `max*` condition are met.
- `min_time`, `max_time` Condition for the time in seconds from the moment of the first write to the buffer.
- `min_rows`, `max_rows` Condition for the number of rows in the buffer.
- `min_bytes`, `max_bytes` Condition for the number of bytes in the buffer.
Also if at least one `flush*` condition are met flush initiated in background, this is different from `max*`, since `flush*` allows you to configure background flushes separately to avoid adding latency for `INSERT` (into `Buffer`) queries.
- `min_time`, `max_time`, `flush_time` Condition for the time in seconds from the moment of the first write to the buffer.
- `min_rows`, `max_rows`, `flush_rows` Condition for the number of rows in the buffer.
- `min_bytes`, `max_bytes`, `flush_bytes` Condition for the number of bytes in the buffer.
During the write operation, data is inserted to a `num_layers` number of random buffers. Or, if the data part to insert is large enough (greater than `max_rows` or `max_bytes`), it is written directly to the destination table, omitting the buffer.

View File

@ -1213,6 +1213,62 @@ SELECT arrayFill(x -> not isNull(x), [1, null, 3, 11, 12, null, null, 5, 6, 14,
Note that the `arrayFill` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it cant be omitted.
## arrayFold(func, arr1, …, init) {#array-fold}
Returns an result of [folding](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) arrays and value `init` using function `func`.
I.e. result of calculation `func(arr1[n], …, func(arr1[n - 1], …, func(…, func(arr1[2], …, func(arr1[1], …, init)))))`.
Note that the `arrayMap` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it cant be omitted.
**Arguments**
- `func` — The lambda function with `n+1` arguments (where `n` is number of input arrays), first `n` arguments are for
current elements of input arrays, and last argument is for current value of accumulator.
- `arr` — Any number of [arrays](../../sql-reference/data-types/array.md).
- `init` - Initial value of accumulator.
**Returned value**
Final value of accumulator.
**Examples**
The following example shows how to acquire product and sum of elements of array:
``` sql
SELECT arrayMap(x, accum -> (accum.1 * x, accum.2 + x), [1, 2, 3], (0, 1)) as res;
```
``` text
┌─res───────┐
│ (120, 15) │
└───────────┘
```
The following example shows how to reverse elements of array:
``` sql
SELECT arrayFold(x, acc -> arrayPushFront(acc, x), [1,2,3,4,5], emptyArrayUInt64()) as res;
```
``` text
┌─res─────────┐
│ [5,4,3,2,1] │
└─────────────┘
```
Folding may be used to access of already passed elements due to function calculation, for example:
``` sql
SELECT arrayFold(x, acc -> (x, concat(acc.2, toString(acc.1), ',')), [1,2], (0,''))
```
``` text
┌─res────────┐
│ (2,'0,1,') │
└────────────┘
```
## arrayReverseFill(func, arr1, …) {#array-reverse-fill}
Scan through `arr1` from the last element to the first element and replace `arr1[i]` by `arr1[i + 1]` if `func` returns 0. The last element of `arr1` will not be replaced.

View File

@ -16,7 +16,7 @@ The following operations with [partitions](../../../engines/table-engines/merget
- [CLEAR COLUMN IN PARTITION](#alter_clear-column-partition) — Resets the value of a specified column in a partition.
- [CLEAR INDEX IN PARTITION](#alter_clear-index-partition) — Resets the specified secondary index in a partition.
- [FREEZE PARTITION](#alter_freeze-partition) — Creates a backup of a partition.
- [FETCH PARTITION](#alter_fetch-partition) — Downloads a partition from another server.
- [FETCH PARTITION\|PART](#alter_fetch-partition) — Downloads a part or partition from another server.
- [MOVE PARTITION\|PART](#alter_move-partition) — Move partition/data part to another disk or volume.
<!-- -->
@ -198,29 +198,35 @@ ALTER TABLE table_name CLEAR INDEX index_name IN PARTITION partition_expr
The query works similar to `CLEAR COLUMN`, but it resets an index instead of a column data.
## FETCH PARTITION {#alter_fetch-partition}
## FETCH PARTITION|PART {#alter_fetch-partition}
``` sql
ALTER TABLE table_name FETCH PARTITION partition_expr FROM 'path-in-zookeeper'
ALTER TABLE table_name FETCH PARTITION|PART partition_expr FROM 'path-in-zookeeper'
```
Downloads a partition from another server. This query only works for the replicated tables.
The query does the following:
1. Downloads the partition from the specified shard. In path-in-zookeeper you must specify a path to the shard in ZooKeeper.
1. Downloads the partition|part from the specified shard. In path-in-zookeeper you must specify a path to the shard in ZooKeeper.
2. Then the query puts the downloaded data to the `detached` directory of the `table_name` table. Use the [ATTACH PARTITION\|PART](#alter_attach-partition) query to add the data to the table.
For example:
1. FETCH PARTITION
``` sql
ALTER TABLE users FETCH PARTITION 201902 FROM '/clickhouse/tables/01-01/visits';
ALTER TABLE users ATTACH PARTITION 201902;
```
2. FETCH PART
``` sql
ALTER TABLE users FETCH PART 201901_2_2_0 FROM '/clickhouse/tables/01-01/visits';
ALTER TABLE users ATTACH PART 201901_2_2_0;
```
Note that:
- The `ALTER ... FETCH PARTITION` query isnt replicated. It places the partition to the `detached` directory only on the local server.
- The `ALTER ... FETCH PARTITION|PART` query isnt replicated. It places the part or partition to the `detached` directory only on the local server.
- The `ALTER TABLE ... ATTACH` query is replicated. It adds the data to all replicas. The data is added to one of the replicas from the `detached` directory, and to the others - from neighboring replicas.
Before downloading, the system checks if the partition exists and the table structure matches. The most appropriate replica is selected automatically from the healthy replicas.

View File

@ -5,39 +5,78 @@ toc_title: ROW POLICY
# CREATE ROW POLICY {#create-row-policy-statement}
Creates [filters for rows](../../../operations/access-rights.md#row-policy-management), which a user can read from a table.
Creates a [row policy](../../../operations/access-rights.md#row-policy-management), i.e. a filter used to determine which rows a user can read from a table.
Syntax:
``` sql
CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluster_name1] ON [db1.]table1
[, policy_name2 [ON CLUSTER cluster_name2] ON [db2.]table2 ...]
[FOR SELECT] USING condition
[AS {PERMISSIVE | RESTRICTIVE}]
[FOR SELECT]
[USING condition]
[TO {role1 [, role2 ...] | ALL | ALL EXCEPT role1 [, role2 ...]}]
```
`ON CLUSTER` clause allows creating row policies on a cluster, see [Distributed DDL](../../../sql-reference/distributed-ddl.md).
## AS Clause {#create-row-policy-as}
## USING Clause {#create-row-policy-using}
Using this section you can create permissive or restrictive policies.
Permissive policy grants access to rows. Permissive policies which apply to the same table are combined together using the boolean `OR` operator. Policies are permissive by default.
Restrictive policy restricts access to rows. Restrictive policies which apply to the same table are combined together using the boolean `AND` operator.
Restrictive policies apply to rows that passed the permissive filters. If you set restrictive policies but no permissive policies, the user cant get any row from the table.
Allows to specify a condition to filter rows. An user will see a row if the condition is calculated to non-zero for the row.
## TO Clause {#create-row-policy-to}
In the section `TO` you can provide a mixed list of roles and users, for example, `CREATE ROW POLICY ... TO accountant, john@localhost`.
In the section `TO` you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`.
Keyword `ALL` means all the ClickHouse users including current user. Keywords `ALL EXCEPT` allow to exclude some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
Keyword `ALL` means all the ClickHouse users including current user. Keyword `ALL EXCEPT` allow to exclude some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
## Examples {#examples}
!!! note "Note"
If there are no row policies defined for a table then any user can `SELECT` all the row from the table.
Defining one or more row policies for the table makes the access to the table depending on the row policies no matter if
those row policies are defined for the current user or not. For example, the following row policy
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO accountant, john@localhost`
`CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter`
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO ALL EXCEPT mira`
forbids the users `mira` and `peter` to see the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all! If that isn't desirable you can fix it by adding one more row policy, for example:
`CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter`
## AS Clause {#create-row-policy-as}
It's allowed to have more than one policy enabled on the same table for the same user at the one time.
So we need a way to combine the conditions from multiple policies.
By default policies are combined using the boolean `OR` operator. For example, the following policies
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio
```
enables the user `peter` to see rows with either `b=1` or `c=2`.
The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive.
By default policies are permissive, which means they are combined using the boolean `OR` operator.
A policy can be defined as restrictive as an alternative. Restrictive policies are combined using the boolean `AND` operator.
Here is the formula:
```
row_is_visible = (one or more of the permissive policies' conditions are non-zero) AND (all of the restrictive policies's conditions are non-zero)`
```
For example, the following policies
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
```
enables the user `peter` to see rows only if both `b=1` AND `c=2`.
## Examples
`CREATE ROW POLICY filter1 ON mydb.mytable USING a<1000 TO accountant, john@localhost`
`CREATE ROW POLICY filter2 ON mydb.mytable USING a<1000 AND b=5 TO ALL EXCEPT mira`
`CREATE ROW POLICY filter3 ON mydb.mytable USING 1 TO admin`

View File

@ -279,7 +279,7 @@ Allows executing [ALTER](../../sql-reference/statements/alter/index.md) queries
- `ALTER MATERIALIZE TTL`. Level: `TABLE`. Aliases: `MATERIALIZE TTL`
- `ALTER SETTINGS`. Level: `TABLE`. Aliases: `ALTER SETTING`, `ALTER MODIFY SETTING`, `MODIFY SETTING`
- `ALTER MOVE PARTITION`. Level: `TABLE`. Aliases: `ALTER MOVE PART`, `MOVE PARTITION`, `MOVE PART`
- `ALTER FETCH PARTITION`. Level: `TABLE`. Aliases: `FETCH PARTITION`
- `ALTER FETCH PARTITION`. Level: `TABLE`. Aliases: `ALTER FETCH PART`, `FETCH PARTITION`, `FETCH PART`
- `ALTER FREEZE PARTITION`. Level: `TABLE`. Aliases: `FREEZE PARTITION`
- `ALTER VIEW` Level: `GROUP`
- `ALTER VIEW REFRESH`. Level: `VIEW`. Aliases: `ALTER LIVE VIEW REFRESH`, `REFRESH VIEW`

View File

@ -1147,6 +1147,62 @@ SELECT arrayReverseFill(x -> not isNull(x), [1, null, 3, 11, 12, null, null, 5,
Функция `arrayReverseFill` является [функцией высшего порядка](../../sql-reference/functions/index.md#higher-order-functions) — в качестве первого аргумента ей нужно передать лямбда-функцию, и этот аргумент не может быть опущен.
## arrayFold(func, arr1, …, init) {#array-fold}
Возвращает результат [сворачивания](https://ru.wikipedia.org/wiki/%D0%A1%D0%B2%D1%91%D1%80%D1%82%D0%BA%D0%B0_%D1%81%D0%BF%D0%B8%D1%81%D0%BA%D0%B0) массивов и начального значения `init` с помощью функции `func`.
Т.е. результат вычисления `func(arr1[n], …, func(arr1[n - 1], …, func(…, func(arr1[2], …, func(arr1[1], …, init)))))`.
Функция `arrayFold` является [функцией высшего порядка](../../sql-reference/functions/index.md#higher-order-functions) — в качестве первого аргумента ей нужно передать лямбда-функцию, и этот аргумент не может быть опущен.
**Аргументы**
- `func` — лямбда-функция с `n+1` параметром (где `n` это количество входных массивов), причём первые `n` параметров
используются для текущих элементов входных массивов, а последний элемент для текущего значения аккумулятора.
- `arr` — произвольное количество [массивов](../../sql-reference/data-types/array.md).
- `init` - начальное значение аккумулятора.
**Возвращаемое значение**
Итоговое значение аккумулятора.
**Примеры**
Следующий пример показывает, как вычислить произведение и сумму элементов массива:
``` sql
SELECT arrayMap(x, accum -> (accum.1 * x, accum.2 + x), [1, 2, 3], (0, 1)) as res;
```
``` text
┌─res───────┐
│ (120, 15) │
└───────────┘
```
В этом примере показано, как обратить массив:
``` sql
SELECT arrayFold(x, acc -> arrayPushFront(acc, x), [1,2,3,4,5], emptyArrayUInt64()) as res;
```
``` text
┌─res─────────┐
│ [5,4,3,2,1] │
└─────────────┘
```
Свёртка может быть использована для доступа к уже пройденным в процессе вычисления элементам. Например:
``` sql
SELECT arrayFold(x, acc -> (x, concat(acc.2, toString(acc.1), ',')), [1,2], (0,''))
```
``` text
┌─res────────┐
│ (2,'0,1,') │
└────────────┘
```
## arraySplit(func, arr1, …) {#array-split}
Разделяет массив `arr1` на несколько. Если `func` возвращает не 0, то массив разделяется, а элемент помещается в левую часть. Массив не разбивается по первому элементу.
@ -1183,6 +1239,7 @@ SELECT arrayReverseSplit((x, y) -> y, [1, 2, 3, 4, 5], [1, 0, 0, 1, 0]) AS res
Функция `arrayReverseSplit` является [функцией высшего порядка](../../sql-reference/functions/index.md#higher-order-functions) — в качестве первого аргумента ей нужно передать лямбда-функцию, и этот аргумент не может быть опущен.
## arrayExists(\[func,\] arr1, …) {#arrayexistsfunc-arr1}
Возвращает 1, если существует хотя бы один элемент массива `arr`, для которого функция func возвращает не 0. Иначе возвращает 0.

View File

@ -5,7 +5,7 @@ toc_title: "Политика доступа"
# CREATE ROW POLICY {#create-row-policy-statement}
Создает [фильтры для строк](../../../operations/access-rights.md#row-policy-management), которые пользователь может прочесть из таблицы.
Создает [политики доступа к строкам](../../../operations/access-rights.md#row-policy-management), т.е. фильтры, которые определяют, какие строки пользователь может читать из таблицы.
Синтаксис:
@ -13,33 +13,68 @@ toc_title: "Политика доступа"
CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluster_name1] ON [db1.]table1
[, policy_name2 [ON CLUSTER cluster_name2] ON [db2.]table2 ...]
[AS {PERMISSIVE | RESTRICTIVE}]
[FOR SELECT]
[USING condition]
[FOR SELECT] USING condition
[TO {role [,...] | ALL | ALL EXCEPT role [,...]}]
```
Секция `ON CLUSTER` позволяет создавать фильтры для строк на кластере, см. [Распределенные DDL запросы](../../../sql-reference/distributed-ddl.md).
Секция `ON CLUSTER` позволяет создавать политики на кластере, см. [Распределенные DDL запросы](../../../sql-reference/distributed-ddl.md).
## Секция AS {#create-row-policy-as}
## USING Clause {#create-row-policy-using}
С помощью данной секции можно создать политику разрешения или ограничения.
Политика разрешения предоставляет доступ к строкам. Разрешительные политики, которые применяются к одной таблице, объединяются с помощью логического оператора `OR`. Политики являются разрешительными по умолчанию.
Политика ограничения запрещает доступ к строкам. Ограничительные политики, которые применяются к одной таблице, объединяются логическим оператором `AND`.
Ограничительные политики применяются к строкам, прошедшим фильтр разрешительной политики. Если вы не зададите разрешительные политики, пользователь не сможет обращаться ни к каким строкам из таблицы.
Секция `USING` указывает условие для фильтрации строк. Пользователь может видеть строку, если это условие, вычисленное для строки, дает ненулевой результат.
## Секция TO {#create-row-policy-to}
В секции `TO` вы можете перечислить как роли, так и пользователей. Например, `CREATE ROW POLICY ... TO accountant, john@localhost`.
В секции `TO` перечисляются пользователи и роли, для которых должна действовать политика. Например, `CREATE ROW POLICY ... TO accountant, john@localhost`.
Ключевым словом `ALL` обозначаются все пользователи, включая текущего. Ключевые слова `ALL EXCEPT` позволяют исключить пользователей из списка всех пользователей. Например, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
!!! note "Note"
Если для таблицы не задано ни одной политики доступа к строкам, то любой пользователь может выполнить `SELECT` и получить все строки таблицы.
Если определить хотя бы одну политику для таблицы, до доступ к строкам будет управляться этими политиками, причем для всех пользователей
(даже для тех, для кого политики не определялись). Например, следующая политика
`CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter`
запретит пользователям `mira` и `peter` видеть строки с `b != 1`, и еще запретит всем остальным пользователям (например, пользователю `paul`)
видеть какие-либо строки вообще из таблицы `mydb.table1`! Если это нежелательно, такое поведение можно исправить, определив дополнительную политику:
`CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter`
## Секция AS {#create-row-policy-as}
Может быть одновременно активно более одной политики для одной и той же таблицы и одного и того же пользователя.
Поэтому нам нужен способ комбинировать политики. По умолчанию политики комбинируются с использованием логического оператора `OR`.
Например, политики:
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio
```
разрешат пользователю с именем `peter` видеть строки, для которых будет верно `b=1` или `c=2`.
Секция `AS` указывает, как политики должны комбинироваться с другими политиками. Политики могут быть или разрешительными (`PERMISSIVE`), или ограничительными (`RESTRICTIVE`). По умолчанию политики создаются разрешительными (`PERMISSIVE`); такие политики комбинируются с использованием логического оператора `OR`.
Ограничительные (`RESTRICTIVE`) политики комбинируются с использованием логического оператора `AND`.
Используется следующая формула:
`строка_видима = (одна или больше permissive-политик дала ненулевой результат проверки условия) И (все restrictive-политики дали ненулевой результат проверки условия)`
Например, политики
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
```
разрешат пользователю с именем `peter` видеть только те строки, для которых будет одновременно `b=1` и `c=2`.
## Примеры
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO accountant, john@localhost`
`CREATE ROW POLICY filter1 ON mydb.mytable USING a<1000 TO accountant, john@localhost`
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO ALL EXCEPT mira`
`CREATE ROW POLICY filter2 ON mydb.mytable USING a<1000 AND b=5 TO ALL EXCEPT mira`
`CREATE ROW POLICY filter3 ON mydb.mytable USING 1 TO admin`
<!--hide-->

View File

@ -62,7 +62,7 @@ enum class AccessType
enabled implicitly by the grant ALTER_TABLE */\
M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
\
M(ALTER_TABLE, "", GROUP, ALTER) \

View File

@ -146,6 +146,9 @@
M(StorageBufferPassedTimeMaxThreshold, "") \
M(StorageBufferPassedRowsMaxThreshold, "") \
M(StorageBufferPassedBytesMaxThreshold, "") \
M(StorageBufferPassedTimeFlushThreshold, "") \
M(StorageBufferPassedRowsFlushThreshold, "") \
M(StorageBufferPassedBytesFlushThreshold, "") \
M(StorageBufferLayerLockReadersWaitMilliseconds, "Time for waiting for Buffer layer during reading") \
M(StorageBufferLayerLockWritersWaitMilliseconds, "Time for waiting free Buffer layer to write to (can be used to tune Buffer layers)") \
\

View File

@ -252,8 +252,6 @@ class IColumn;
* Almost all limits apply to each stream individually. \
*/ \
\
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \
M(UInt64, offset, 0, "Offset on read rows from the most 'end' result for select query", 0) \
M(UInt64, max_rows_to_read, 0, "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(UInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(OverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
@ -464,6 +462,8 @@ class IColumn;
\
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \
M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Passible values: default, stream.", 0) \
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \
M(UInt64, offset, 0, "Offset on read rows from the most 'end' result for select query", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.

View File

@ -51,6 +51,14 @@ Columns DirectDictionary<dictionary_key_type>::getColumns(
key_to_fetched_index.reserve(requested_keys.size());
auto fetched_columns_from_storage = request.makeAttributesResultColumns();
for (size_t attribute_index = 0; attribute_index < request.attributesSize(); ++attribute_index)
{
if (!request.shouldFillResultColumnWithIndex(attribute_index))
continue;
auto & fetched_column_from_storage = fetched_columns_from_storage[attribute_index];
fetched_column_from_storage->reserve(requested_keys.size());
}
size_t fetched_key_index = 0;

View File

@ -1,5 +1,7 @@
configure_file(config_functions.h.in ${ConfigIncludePath}/config_functions.h)
add_subdirectory(divide)
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_functions .)
@ -25,7 +27,7 @@ target_link_libraries(clickhouse_functions
PRIVATE
${ZLIB_LIBRARIES}
boost::filesystem
libdivide
divide_impl
)
if (OPENSSL_CRYPTO_LIBRARY)

View File

@ -0,0 +1,187 @@
#include "FunctionArrayMapped.h"
#include <Functions/FunctionFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
extern const int TYPE_MISMATCH;
}
/** arrayFold(x1,...,xn,accum -> expression, array1,...,arrayn, init_accum) - apply the expression to each element of the array (or set of parallel arrays).
*/
class FunctionArrayFold : public IFunction
{
public:
static constexpr auto name = "arrayFold";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayFold>(); }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
void getLambdaArgumentTypes(DataTypes & arguments) const override
{
if (arguments.size() < 3)
throw Exception("Function " + getName() + " needs lambda function, at least one array argument and one accumulator argument.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
DataTypes nested_types(arguments.size() - 1);
for (size_t i = 0; i < nested_types.size() - 1; ++i)
{
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(&*arguments[i + 1]);
if (!array_type)
throw Exception("Argument " + toString(i + 2) + " of function " + getName() + " must be array. Found "
+ arguments[i + 1]->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
nested_types[i] = recursiveRemoveLowCardinality(array_type->getNestedType());
}
nested_types[nested_types.size() - 1] = arguments[arguments.size() - 1];
const DataTypeFunction * function_type = checkAndGetDataType<DataTypeFunction>(arguments[0].get());
if (!function_type || function_type->getArgumentTypes().size() != nested_types.size())
throw Exception("First argument for this overload of " + getName() + " must be a function with "
+ toString(nested_types.size()) + " arguments. Found "
+ arguments[0]->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
arguments[0] = std::make_shared<DataTypeFunction>(nested_types);
}
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() < 2)
throw Exception("Function " + getName() + " needs at least 2 arguments; passed "
+ toString(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto * data_type_function = checkAndGetDataType<DataTypeFunction>(arguments[0].type.get());
if (!data_type_function)
throw Exception("First argument for function " + getName() + " must be a function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto const accumulator_type = arguments.back().type;
auto const lambda_type = data_type_function->getReturnType();
if (! accumulator_type->equals(*lambda_type))
throw Exception("Return type of lambda function must be the same as the accumulator type. "
"Inferred type of lambda " + lambda_type->getName() + ", "
+ "inferred type of accumulator " + accumulator_type->getName() + ".",
ErrorCodes::TYPE_MISMATCH);
return DataTypePtr(accumulator_type);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
const auto & column_with_type_and_name = arguments[0];
if (!column_with_type_and_name.column)
throw Exception("First argument for function " + getName() + " must be a function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const auto * column_function = typeid_cast<const ColumnFunction *>(column_with_type_and_name.column.get());
if (!column_function)
throw Exception("First argument for function " + getName() + " must be a function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ColumnPtr offsets_column;
ColumnPtr column_first_array_ptr;
const ColumnArray * column_first_array = nullptr;
ColumnsWithTypeAndName arrays;
arrays.reserve(arguments.size() - 1);
for (size_t i = 1; i < arguments.size() - 1; ++i)
{
const auto & array_with_type_and_name = arguments[i];
ColumnPtr column_array_ptr = array_with_type_and_name.column;
const auto * column_array = checkAndGetColumn<ColumnArray>(column_array_ptr.get());
const DataTypePtr & array_type_ptr = array_with_type_and_name.type;
const auto * array_type = checkAndGetDataType<DataTypeArray>(array_type_ptr.get());
if (!column_array)
{
const ColumnConst * column_const_array = checkAndGetColumnConst<ColumnArray>(column_array_ptr.get());
if (!column_const_array)
throw Exception("Expected array column, found " + column_array_ptr->getName(), ErrorCodes::ILLEGAL_COLUMN);
column_array_ptr = recursiveRemoveLowCardinality(column_const_array->convertToFullColumn());
column_array = checkAndGetColumn<ColumnArray>(column_array_ptr.get());
}
if (!array_type)
throw Exception("Expected array type, found " + array_type_ptr->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!offsets_column)
{
offsets_column = column_array->getOffsetsPtr();
}
else
{
/// The first condition is optimization: do not compare data if the pointers are equal.
if (column_array->getOffsetsPtr() != offsets_column
&& column_array->getOffsets() != typeid_cast<const ColumnArray::ColumnOffsets &>(*offsets_column).getData())
throw Exception("Arrays passed to " + getName() + " must have equal size", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
if (i == 1)
{
column_first_array_ptr = column_array_ptr;
column_first_array = column_array;
}
arrays.emplace_back(ColumnWithTypeAndName(column_array->getDataPtr(),
recursiveRemoveLowCardinality(array_type->getNestedType()),
array_with_type_and_name.name));
}
arrays.emplace_back(arguments.back());
MutableColumnPtr result = arguments.back().column->convertToFullColumnIfConst()->cloneEmpty();
size_t arr_cursor = 0;
for (size_t irow = 0; irow < column_first_array->size(); ++irow) // for each row of result
{
// Make accumulator column for this row. We initialize it
// with the starting value given as the last argument.
ColumnWithTypeAndName accumulator_column = arguments.back();
ColumnPtr acc(accumulator_column.column->cut(irow, 1));
auto accumulator = ColumnWithTypeAndName(acc,
accumulator_column.type,
accumulator_column.name);
ColumnPtr res(acc);
size_t const arr_next = column_first_array->getOffsets()[irow]; // when we do folding
for (size_t iter = 0; arr_cursor < arr_next; ++iter, ++arr_cursor)
{
// Make slice of input arrays and accumulator for lambda
ColumnsWithTypeAndName iter_arrays;
iter_arrays.reserve(arrays.size() + 1);
for (size_t icolumn = 0; icolumn < arrays.size() - 1; ++icolumn)
{
auto const & arr = arrays[icolumn];
iter_arrays.emplace_back(ColumnWithTypeAndName(arr.column->cut(arr_cursor, 1),
arr.type,
arr.name));
}
iter_arrays.emplace_back(accumulator);
// Calculate function on arguments
auto replicated_column_function_ptr = IColumn::mutate(column_function->replicate(ColumnArray::Offsets(column_first_array->getOffsets().size(), 1)));
auto * replicated_column_function = typeid_cast<ColumnFunction *>(replicated_column_function_ptr.get());
replicated_column_function->appendArguments(iter_arrays);
auto lambda_result = replicated_column_function->reduce().column;
if (lambda_result->lowCardinality())
lambda_result = lambda_result->convertToFullColumnIfLowCardinality();
res = lambda_result->cut(0, 1);
accumulator.column = res;
}
result->insert((*res)[0]);
}
return result;
}
};
void registerFunctionArrayFold(FunctionFactory & factory)
{
factory.registerFunction<FunctionArrayFold>();
}
}

View File

@ -0,0 +1,22 @@
# A library for integer division by constant with CPU dispatching.
if (ARCH_AMD64)
add_library(divide_impl_sse2 divideImpl.cpp)
target_compile_options(divide_impl_sse2 PRIVATE -msse2 -DNAMESPACE=SSE2)
target_link_libraries(divide_impl_sse2 libdivide)
add_library(divide_impl_avx2 divideImpl.cpp)
target_compile_options(divide_impl_avx2 PRIVATE -mavx2 -DNAMESPACE=AVX2)
target_link_libraries(divide_impl_avx2 libdivide)
set(IMPLEMENTATIONS divide_impl_sse2 divide_impl_avx2)
else ()
add_library(divide_impl_generic divideImpl.cpp)
target_compile_options(divide_impl_generic PRIVATE -DNAMESPACE=Generic)
target_link_libraries(divide_impl_generic libdivide)
set(IMPLEMENTATIONS divide_impl_generic)
endif ()
add_library(divide_impl divide.cpp)
target_link_libraries(divide_impl ${IMPLEMENTATIONS} clickhouse_common_io)

View File

@ -0,0 +1,57 @@
#include "divide.h"
#include <Common/CpuId.h>
#if defined(__x86_64__) && !defined(ARCADIA_BUILD)
namespace SSE2
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
namespace AVX2
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
#else
namespace Generic
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
#endif
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size)
{
#if defined(__x86_64__) && !defined(ARCADIA_BUILD)
if (DB::Cpu::CpuFlagsCache::have_AVX2)
AVX2::divideImpl(a_pos, b, c_pos, size);
else if (DB::Cpu::CpuFlagsCache::have_SSE2)
SSE2::divideImpl(a_pos, b, c_pos, size);
#else
Generic::divideImpl(a_pos, b, c_pos, size);
#endif
}
template void divideImpl<uint64_t, uint64_t, uint64_t>(const uint64_t * __restrict, uint64_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint32_t, uint64_t>(const uint64_t * __restrict, uint32_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint16_t, uint64_t>(const uint64_t * __restrict, uint16_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, char8_t, uint64_t>(const uint64_t * __restrict, char8_t, uint64_t * __restrict, size_t);
template void divideImpl<uint32_t, uint64_t, uint32_t>(const uint32_t * __restrict, uint64_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint32_t, uint32_t>(const uint32_t * __restrict, uint32_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint16_t, uint32_t>(const uint32_t * __restrict, uint16_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, char8_t, uint32_t>(const uint32_t * __restrict, char8_t, uint32_t * __restrict, size_t);
template void divideImpl<int64_t, int64_t, int64_t>(const int64_t * __restrict, int64_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int32_t, int64_t>(const int64_t * __restrict, int32_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int16_t, int64_t>(const int64_t * __restrict, int16_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int8_t, int64_t>(const int64_t * __restrict, int8_t, int64_t * __restrict, size_t);
template void divideImpl<int32_t, int64_t, int32_t>(const int32_t * __restrict, int64_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int32_t, int32_t>(const int32_t * __restrict, int32_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int16_t, int32_t>(const int32_t * __restrict, int16_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int8_t, int32_t>(const int32_t * __restrict, int8_t, int32_t * __restrict, size_t);

View File

@ -0,0 +1,6 @@
#pragma once
#include <cstddef>
template <typename A, typename B, typename ResultType>
extern void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);

View File

@ -0,0 +1,79 @@
/// This translation unit should be compiled multiple times
/// with different values of NAMESPACE and machine flags (sse2, avx2).
#if !defined(NAMESPACE)
#if defined(ARCADIA_BUILD)
#define NAMESPACE Generic
#else
#error "NAMESPACE macro must be defined"
#endif
#endif
#if defined(__AVX2__)
#define REG_SIZE 32
#define LIBDIVIDE_AVX2
#elif defined(__SSE2__)
#define REG_SIZE 16
#define LIBDIVIDE_SSE2
#endif
#include <libdivide.h>
namespace NAMESPACE
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size)
{
libdivide::divider<A> divider(b);
const A * a_end = a_pos + size;
#if defined(__SSE2__)
static constexpr size_t values_per_simd_register = REG_SIZE / sizeof(A);
const A * a_end_simd = a_pos + size / values_per_simd_register * values_per_simd_register;
while (a_pos < a_end_simd)
{
#if defined(__AVX2__)
_mm256_storeu_si256(reinterpret_cast<__m256i *>(c_pos),
_mm256_loadu_si256(reinterpret_cast<const __m256i *>(a_pos)) / divider);
#else
_mm_storeu_si128(reinterpret_cast<__m128i *>(c_pos),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a_pos)) / divider);
#endif
a_pos += values_per_simd_register;
c_pos += values_per_simd_register;
}
#endif
while (a_pos < a_end)
{
*c_pos = *a_pos / divider;
++a_pos;
++c_pos;
}
}
template void divideImpl<uint64_t, uint64_t, uint64_t>(const uint64_t * __restrict, uint64_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint32_t, uint64_t>(const uint64_t * __restrict, uint32_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint16_t, uint64_t>(const uint64_t * __restrict, uint16_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, char8_t, uint64_t>(const uint64_t * __restrict, char8_t, uint64_t * __restrict, size_t);
template void divideImpl<uint32_t, uint64_t, uint32_t>(const uint32_t * __restrict, uint64_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint32_t, uint32_t>(const uint32_t * __restrict, uint32_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint16_t, uint32_t>(const uint32_t * __restrict, uint16_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, char8_t, uint32_t>(const uint32_t * __restrict, char8_t, uint32_t * __restrict, size_t);
template void divideImpl<int64_t, int64_t, int64_t>(const int64_t * __restrict, int64_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int32_t, int64_t>(const int64_t * __restrict, int32_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int16_t, int64_t>(const int64_t * __restrict, int16_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int8_t, int64_t>(const int64_t * __restrict, int8_t, int64_t * __restrict, size_t);
template void divideImpl<int32_t, int64_t, int32_t>(const int32_t * __restrict, int64_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int32_t, int32_t>(const int32_t * __restrict, int32_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int16_t, int32_t>(const int32_t * __restrict, int16_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int8_t, int32_t>(const int32_t * __restrict, int8_t, int32_t * __restrict, size_t);
}

View File

@ -1,11 +1,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionBinaryArithmetic.h>
#if defined(__SSE2__)
# define LIBDIVIDE_SSE2 1
#endif
#include <libdivide.h>
#include "divide/divide.h"
namespace DB
@ -70,34 +66,11 @@ struct DivideIntegralByConstantImpl
if (unlikely(static_cast<A>(b) == 0))
throw Exception("Division by zero", ErrorCodes::ILLEGAL_DIVISION);
libdivide::divider<A> divider(b);
const A * a_end = a_pos + size;
#if defined(__SSE2__)
static constexpr size_t values_per_sse_register = 16 / sizeof(A);
const A * a_end_sse = a_pos + size / values_per_sse_register * values_per_sse_register;
while (a_pos < a_end_sse)
{
_mm_storeu_si128(reinterpret_cast<__m128i *>(c_pos),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a_pos)) / divider);
a_pos += values_per_sse_register;
c_pos += values_per_sse_register;
}
#endif
while (a_pos < a_end)
{
*c_pos = *a_pos / divider;
++a_pos;
++c_pos;
}
divideImpl(a_pos, b, c_pos, size);
}
};
/** Specializations are specified for dividing numbers of the type UInt64 and UInt32 by the numbers of the same sign.
/** Specializations are specified for dividing numbers of the type UInt64, UInt32, Int64, Int32 by the numbers of the same sign.
* Can be expanded to all possible combinations, but more code is needed.
*/

View File

@ -4,6 +4,7 @@ namespace DB
class FunctionFactory;
void registerFunctionArrayMap(FunctionFactory & factory);
void registerFunctionArrayFold(FunctionFactory & factory);
void registerFunctionArrayFilter(FunctionFactory & factory);
void registerFunctionArrayCount(FunctionFactory & factory);
void registerFunctionArrayExists(FunctionFactory & factory);
@ -22,6 +23,7 @@ void registerFunctionArrayDifference(FunctionFactory & factory);
void registerFunctionsHigherOrder(FunctionFactory & factory)
{
registerFunctionArrayMap(factory);
registerFunctionArrayFold(factory);
registerFunctionArrayFilter(factory);
registerFunctionArrayCount(factory);
registerFunctionArrayExists(factory);

View File

@ -144,6 +144,7 @@ SRCS(
array/arrayFirst.cpp
array/arrayFirstIndex.cpp
array/arrayFlatten.cpp
array/arrayFold.cpp
array/arrayIntersect.cpp
array/arrayJoin.cpp
array/arrayMap.cpp
@ -229,6 +230,8 @@ SRCS(
defaultValueOfTypeName.cpp
demange.cpp
divide.cpp
divide/divide.cpp
divide/divideImpl.cpp
dumpColumnStructure.cpp
e.cpp
empty.cpp

View File

@ -834,7 +834,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, uncompressed_bytes);
LOG_TRACE(log,
LOG_DEBUG(log,
"Written part in {} sec., {} rows, {} uncompressed, {} compressed,"
" {} uncompressed bytes per row, {} compressed bytes per row, compression rate: {}"
" ({} rows/sec., {}/sec. uncompressed, {}/sec. compressed)",
@ -947,7 +947,7 @@ void Aggregator::writeToTemporaryFileImpl(
/// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
data_variants.aggregator = nullptr;
LOG_TRACE(log, "Max size of temporary block: {} rows, {}.", max_temporary_block_size_rows, ReadableSize(max_temporary_block_size_bytes));
LOG_DEBUG(log, "Max size of temporary block: {} rows, {}.", max_temporary_block_size_rows, ReadableSize(max_temporary_block_size_bytes));
}
@ -1481,7 +1481,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
}
double elapsed_seconds = watch.elapsedSeconds();
LOG_TRACE(log,
LOG_DEBUG(log,
"Converted aggregated data to blocks. {} rows, {} in {} sec. ({} rows/sec., {}/sec.)",
rows, ReadableSize(bytes),
elapsed_seconds, rows / elapsed_seconds,
@ -2109,7 +2109,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
size_t rows = block.rows();
size_t bytes = block.bytes();
double elapsed_seconds = watch.elapsedSeconds();
LOG_TRACE(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({} rows/sec., {}/sec.)",
LOG_DEBUG(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({} rows/sec., {}/sec.)",
rows, ReadableSize(bytes),
elapsed_seconds, rows / elapsed_seconds,
ReadableSize(bytes / elapsed_seconds));

View File

@ -372,7 +372,20 @@ void DDLWorker::scheduleTasks(bool reinitialized)
}
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
size_t size_before_filtering = queue_nodes.size();
filterAndSortQueueNodes(queue_nodes);
/// The following message is too verbose, but it can be useful too debug mysterious test failures in CI
LOG_TRACE(log, "scheduleTasks: initialized={}, size_before_filtering={}, queue_size={}, "
"entries={}..{}, "
"first_failed_task_name={}, current_tasks_size={},"
"last_current_task={},"
"last_skipped_entry_name={}",
initialized, size_before_filtering, queue_nodes.size(),
queue_nodes.empty() ? "none" : queue_nodes.front(), queue_nodes.empty() ? "none" : queue_nodes.back(),
first_failed_task_name ? *first_failed_task_name : "none", current_tasks.size(),
current_tasks.empty() ? "none" : current_tasks.back()->entry_name,
last_skipped_entry_name ? *last_skipped_entry_name : "none");
if (max_tasks_in_queue < queue_nodes.size())
cleanup_event->set();

View File

@ -245,7 +245,7 @@ void ASTAlterCommand::formatImpl(
else if (type == ASTAlterCommand::FETCH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FETCH "
<< "PARTITION " << (settings.hilite ? hilite_none : "");
<< (part ? "PART " : "PARTITION ") << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< " FROM " << (settings.hilite ? hilite_none : "") << DB::quote << from;

View File

@ -61,6 +61,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_drop_detached_partition("DROP DETACHED PARTITION");
ParserKeyword s_drop_detached_part("DROP DETACHED PART");
ParserKeyword s_fetch_partition("FETCH PARTITION");
ParserKeyword s_fetch_part("FETCH PART");
ParserKeyword s_replace_partition("REPLACE PARTITION");
ParserKeyword s_freeze("FREEZE");
ParserKeyword s_unfreeze("UNFREEZE");
@ -428,6 +429,21 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->from = ast_from->as<ASTLiteral &>().value.get<const String &>();
command->type = ASTAlterCommand::FETCH_PARTITION;
}
else if (s_fetch_part.ignore(pos, expected))
{
if (!parser_string_literal.parse(pos, command->partition, expected))
return false;
if (!s_from.ignore(pos, expected))
return false;
ASTPtr ast_from;
if (!parser_string_literal.parse(pos, ast_from, expected))
return false;
command->from = ast_from->as<ASTLiteral &>().value.get<const String &>();
command->part = true;
command->type = ASTAlterCommand::FETCH_PARTITION;
}
else if (s_freeze.ignore(pos, expected))
{
if (s_partition.ignore(pos, expected))

View File

@ -190,7 +190,7 @@ Chunk IRowInputFormat::generate()
if (num_errors && (params.allow_errors_num > 0 || params.allow_errors_ratio > 0))
{
Poco::Logger * log = &Poco::Logger::get("IRowInputFormat");
LOG_TRACE(log, "Skipped {} rows with errors while reading the input stream", num_errors);
LOG_DEBUG(log, "Skipped {} rows with errors while reading the input stream", num_errors);
}
readSuffix();

View File

@ -21,16 +21,13 @@ void MarkdownRowOutputFormat::writePrefix()
}
writeCString("\n|", out);
String left_alignment = ":-|";
String central_alignment = ":-:|";
String right_alignment = "-:|";
for (size_t i = 0; i < columns; ++i)
{
if (isInteger(types[i]))
if (types[i]->shouldAlignRightInPrettyFormats())
writeString(right_alignment, out);
else if (isString(types[i]))
writeString(left_alignment, out);
else
writeString(central_alignment, out);
writeString(left_alignment, out);
}
writeChar('\n', out);
}

View File

@ -214,8 +214,8 @@ IProcessor::Status AggregatingInOrderTransform::prepare()
{
output.push(std::move(to_push_chunk));
output.finish();
LOG_TRACE(log, "Aggregated. {} to {} rows (from {})", src_rows, res_rows,
formatReadableSizeWithBinarySuffix(src_bytes));
LOG_DEBUG(log, "Aggregated. {} to {} rows (from {})",
src_rows, res_rows, formatReadableSizeWithBinarySuffix(src_bytes));
return Status::Finished;
}
if (input.isFinished())

View File

@ -541,7 +541,7 @@ void AggregatingTransform::initGenerate()
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = variants.sizeWithoutOverflowRow();
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
LOG_DEBUG(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
src_rows, rows, ReadableSize(src_bytes),
elapsed_seconds, src_rows / elapsed_seconds,
ReadableSize(src_bytes / elapsed_seconds));
@ -599,7 +599,7 @@ void AggregatingTransform::initGenerate()
pipe = Pipe::unitePipes(std::move(pipes));
}
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
LOG_DEBUG(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads);

View File

@ -52,7 +52,7 @@ Chunk MergingAggregatedTransform::generate()
if (!generate_started)
{
generate_started = true;
LOG_TRACE(log, "Read {} blocks of partially aggregated data, total {} rows.", total_input_blocks, total_input_rows);
LOG_DEBUG(log, "Read {} blocks of partially aggregated data, total {} rows.", total_input_blocks, total_input_rows);
/// Exception safety. Make iterator valid in case any method below throws.
next_block = blocks.begin();

View File

@ -535,7 +535,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
ReadBufferFromFile in(file_path);
const auto & distributed_header = readDistributedHeader(in, log);
LOG_TRACE(log, "Started processing `{}` ({} rows, {} bytes)", file_path,
LOG_DEBUG(log, "Started processing `{}` ({} rows, {} bytes)", file_path,
formatReadableQuantity(distributed_header.rows),
formatReadableSizeWithBinarySuffix(distributed_header.bytes));
@ -631,7 +631,7 @@ struct StorageDistributedDirectoryMonitor::Batch
Stopwatch watch;
LOG_TRACE(parent.log, "Sending a batch of {} files ({} rows, {} bytes).", file_indices.size(),
LOG_DEBUG(parent.log, "Sending a batch of {} files ({} rows, {} bytes).", file_indices.size(),
formatReadableQuantity(total_rows),
formatReadableSizeWithBinarySuffix(total_bytes));
@ -876,7 +876,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
if (!total_rows || !header)
{
LOG_TRACE(log, "Processing batch {} with old format (no header/rows)", in.getFileName());
LOG_DEBUG(log, "Processing batch {} with old format (no header/rows)", in.getFileName());
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);

View File

@ -2563,7 +2563,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({}). Parts cleaning are processing significantly slower than inserts",
"Too many parts ({}). Merges are processing significantly slower than inserts",
parts_count_in_partition);
}
@ -2909,7 +2909,12 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED);
}
void MergeTreeData::fetchPartition(const ASTPtr & /*partition*/, const StorageMetadataPtr & /*metadata_snapshot*/, const String & /*from*/, ContextPtr /*query_context*/)
void MergeTreeData::fetchPartition(
const ASTPtr & /*partition*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const String & /*from*/,
bool /*fetch_part*/,
ContextPtr /*query_context*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "FETCH PARTITION is not supported by storage {}", getName());
}
@ -2972,7 +2977,7 @@ Pipe MergeTreeData::alterPartition(
break;
case PartitionCommand::FETCH_PARTITION:
fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, query_context);
fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, command.part, query_context);
break;
case PartitionCommand::FREEZE_PARTITION:

View File

@ -970,7 +970,12 @@ protected:
virtual void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) = 0;
/// Makes sense only for replicated tables
virtual void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, ContextPtr query_context);
virtual void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from,
bool fetch_part,
ContextPtr query_context);
void writePartLog(
PartLogElement::Type type,

View File

@ -1054,7 +1054,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
false);
/// Let's estimate total number of rows for progress bar.
LOG_TRACE(log, "Reading approx. {} rows with {} streams", total_rows, num_streams);
LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows, num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
@ -1576,7 +1576,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
settings.preferred_block_size_bytes,
false);
LOG_TRACE(log, "Reading approx. {} rows with {} streams", total_rows_in_lonely_parts, num_streams_for_lonely_parts);
LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows_in_lonely_parts, num_streams_for_lonely_parts);
for (size_t i = 0; i < num_streams_for_lonely_parts; ++i)
{

View File

@ -182,7 +182,7 @@ bool MergeTreePartsMover::selectPartsForMove(
if (!parts_to_move.empty())
{
LOG_TRACE(log, "Selected {} parts to move according to storage policy rules and {} parts according to TTL rules, {} total", parts_to_move_by_policy_rules, parts_to_move_by_ttl_rules, ReadableSize(parts_to_move_total_size_bytes));
LOG_DEBUG(log, "Selected {} parts to move according to storage policy rules and {} parts according to TTL rules, {} total", parts_to_move_by_policy_rules, parts_to_move_by_ttl_rules, ReadableSize(parts_to_move_total_size_bytes));
return true;
}
else

View File

@ -47,7 +47,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
if (!quiet)
LOG_TRACE(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}",
LOG_DEBUG(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));

View File

@ -47,7 +47,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
if (!quiet)
LOG_TRACE(log, "Reading {} ranges from part {}, approx. {} rows starting from {}",
LOG_DEBUG(log, "Reading {} ranges from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));

View File

@ -29,10 +29,10 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
{
/// Print column name but don't pollute logs in case of many columns.
if (columns_to_read.size() == 1)
LOG_TRACE(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part, column {}",
LOG_DEBUG(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part, column {}",
data_part->getMarksCount(), data_part->name, data_part->rows_count, columns_to_read.front());
else
LOG_TRACE(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part",
LOG_DEBUG(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part",
data_part->getMarksCount(), data_part->name, data_part->rows_count);
}

View File

@ -342,6 +342,15 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
if (!num_nodes_to_delete)
return;
auto last_outdated_block = timed_blocks.end() - 1;
LOG_TRACE(log, "Will clear {} old blocks from {} (ctime {}) to {} (ctime {})", num_nodes_to_delete,
first_outdated_block->node, first_outdated_block->ctime,
last_outdated_block->node, last_outdated_block->ctime);
zkutil::AsyncResponses<Coordination::RemoveResponse> try_remove_futures;
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
{
@ -372,9 +381,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
first_outdated_block++;
}
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
if (num_nodes_to_delete)
LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete);
LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete);
}

View File

@ -82,6 +82,7 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
res.type = FETCH_PARTITION;
res.partition = command_ast->partition;
res.from_zookeeper_path = command_ast->from;
res.part = command_ast->part;
return res;
}
else if (command_ast->type == ASTAlterCommand::FREEZE_PARTITION)
@ -140,7 +141,10 @@ std::string PartitionCommand::typeToString() const
else
return "DROP DETACHED PARTITION";
case PartitionCommand::Type::FETCH_PARTITION:
return "FETCH PARTITION";
if (part)
return "FETCH PART";
else
return "FETCH PARTITION";
case PartitionCommand::Type::FREEZE_ALL_PARTITIONS:
return "FREEZE ALL";
case PartitionCommand::Type::FREEZE_PARTITION:

View File

@ -40,6 +40,9 @@ namespace ProfileEvents
extern const Event StorageBufferPassedTimeMaxThreshold;
extern const Event StorageBufferPassedRowsMaxThreshold;
extern const Event StorageBufferPassedBytesMaxThreshold;
extern const Event StorageBufferPassedTimeFlushThreshold;
extern const Event StorageBufferPassedRowsFlushThreshold;
extern const Event StorageBufferPassedBytesFlushThreshold;
extern const Event StorageBufferLayerLockReadersWaitMilliseconds;
extern const Event StorageBufferLayerLockWritersWaitMilliseconds;
}
@ -103,6 +106,7 @@ StorageBuffer::StorageBuffer(
size_t num_shards_,
const Thresholds & min_thresholds_,
const Thresholds & max_thresholds_,
const Thresholds & flush_thresholds_,
const StorageID & destination_id_,
bool allow_materialized_)
: IStorage(table_id_)
@ -110,6 +114,7 @@ StorageBuffer::StorageBuffer(
, num_shards(num_shards_), buffers(num_shards_)
, min_thresholds(min_thresholds_)
, max_thresholds(max_thresholds_)
, flush_thresholds(flush_thresholds_)
, destination_id(destination_id_)
, allow_materialized(allow_materialized_)
, log(&Poco::Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
@ -542,7 +547,7 @@ public:
{
if (storage.destination_id)
{
LOG_TRACE(storage.log, "Writing block with {} rows, {} bytes directly.", rows, bytes);
LOG_DEBUG(storage.log, "Writing block with {} rows, {} bytes directly.", rows, bytes);
storage.writeBlockToDestination(block, destination);
}
return;
@ -602,7 +607,7 @@ private:
{
buffer.data = sorted_block.cloneEmpty();
}
else if (storage.checkThresholds(buffer, current_time, sorted_block.rows(), sorted_block.bytes()))
else if (storage.checkThresholds(buffer, /* direct= */true, current_time, sorted_block.rows(), sorted_block.bytes()))
{
/** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
* This also protects against unlimited consumption of RAM, since if it is impossible to write to the table,
@ -713,7 +718,7 @@ bool StorageBuffer::supportsPrewhere() const
return false;
}
bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
bool StorageBuffer::checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows, size_t additional_bytes) const
{
time_t time_passed = 0;
if (buffer.first_write_time)
@ -722,11 +727,11 @@ bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time,
size_t rows = buffer.data.rows() + additional_rows;
size_t bytes = buffer.data.bytes() + additional_bytes;
return checkThresholdsImpl(rows, bytes, time_passed);
return checkThresholdsImpl(direct, rows, bytes, time_passed);
}
bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const
bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const
{
if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
{
@ -752,6 +757,27 @@ bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_p
return true;
}
if (!direct)
{
if (flush_thresholds.time && time_passed > flush_thresholds.time)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeFlushThreshold);
return true;
}
if (flush_thresholds.rows && rows > flush_thresholds.rows)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsFlushThreshold);
return true;
}
if (flush_thresholds.bytes && bytes > flush_thresholds.bytes)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesFlushThreshold);
return true;
}
}
return false;
}
@ -785,7 +811,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
if (check_thresholds)
{
if (!checkThresholdsImpl(rows, bytes, time_passed))
if (!checkThresholdsImpl(/* direct= */false, rows, bytes, time_passed))
return;
}
else
@ -804,7 +830,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
if (!destination_id)
{
LOG_TRACE(log, "Flushing buffer with {} rows (discarded), {} bytes, age {} seconds {}.", rows, bytes, time_passed, (check_thresholds ? "(bg)" : "(direct)"));
LOG_DEBUG(log, "Flushing buffer with {} rows (discarded), {} bytes, age {} seconds {}.", rows, bytes, time_passed, (check_thresholds ? "(bg)" : "(direct)"));
return;
}
@ -841,7 +867,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
}
UInt64 milliseconds = watch.elapsedMilliseconds();
LOG_TRACE(log, "Flushing buffer with {} rows, {} bytes, age {} seconds, took {} ms {}.", rows, bytes, time_passed, milliseconds, (check_thresholds ? "(bg)" : "(direct)"));
LOG_DEBUG(log, "Flushing buffer with {} rows, {} bytes, age {} seconds, took {} ms {}.", rows, bytes, time_passed, milliseconds, (check_thresholds ? "(bg)" : "(direct)"));
}
@ -1040,16 +1066,17 @@ void registerStorageBuffer(StorageFactory & factory)
*
* db, table - in which table to put data from buffer.
* num_buckets - level of parallelism.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer,
* flush_time, flush_rows, flush_bytes - conditions for flushing.
*/
factory.registerStorage("Buffer", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 9)
throw Exception("Storage Buffer requires 9 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
if (engine_args.size() < 9 || engine_args.size() > 12)
throw Exception("Storage Buffer requires from 9 to 12 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes[, flush_time, flush_rows, flush_bytes].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
// Table and database name arguments accept expressions, evaluate them.
@ -1058,7 +1085,7 @@ void registerStorageBuffer(StorageFactory & factory)
// After we evaluated all expressions, check that all arguments are
// literals.
for (size_t i = 0; i < 9; i++)
for (size_t i = 0; i < engine_args.size(); i++)
{
if (!typeid_cast<ASTLiteral *>(engine_args[i].get()))
{
@ -1068,17 +1095,29 @@ void registerStorageBuffer(StorageFactory & factory)
}
}
String destination_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
String destination_table = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
size_t i = 0;
UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[2]->as<ASTLiteral &>().value);
String destination_database = engine_args[i++]->as<ASTLiteral &>().value.safeGet<String>();
String destination_table = engine_args[i++]->as<ASTLiteral &>().value.safeGet<String>();
Int64 min_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[3]->as<ASTLiteral &>().value);
Int64 max_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[4]->as<ASTLiteral &>().value);
UInt64 min_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[5]->as<ASTLiteral &>().value);
UInt64 max_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[6]->as<ASTLiteral &>().value);
UInt64 min_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[7]->as<ASTLiteral &>().value);
UInt64 max_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[8]->as<ASTLiteral &>().value);
UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
StorageBuffer::Thresholds min;
StorageBuffer::Thresholds max;
StorageBuffer::Thresholds flush;
min.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
min.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
min.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
/// If destination_id is not set, do not write data from the buffer, but simply empty the buffer.
StorageID destination_id = StorageID::createEmpty();
@ -1094,8 +1133,7 @@ void registerStorageBuffer(StorageFactory & factory)
args.constraints,
args.getContext(),
num_buckets,
StorageBuffer::Thresholds{min_time, min_rows, min_bytes},
StorageBuffer::Thresholds{max_time, max_rows, max_bytes},
min, max, flush,
destination_id,
static_cast<bool>(args.getLocalContext()->getSettingsRef().insert_allow_materialized_columns));
},

View File

@ -35,6 +35,10 @@ namespace DB
* Thresholds can be exceeded. For example, if max_rows = 1 000 000, the buffer already had 500 000 rows,
* and a part of 800 000 rows is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table.
*
* There are also separate thresholds for flush, those thresholds are checked only for non-direct flush.
* This maybe useful if you do not want to add extra latency for INSERT queries,
* so you can set max_rows=1e6 and flush_rows=500e3, then each 500e3 rows buffer will be flushed in background only.
*
* When you destroy a Buffer table, all remaining data is flushed to the subordinate table.
* The data in the buffer is not replicated, not logged to disk, not indexed. With a rough restart of the server, the data is lost.
*/
@ -45,12 +49,11 @@ friend class BufferSource;
friend class BufferBlockOutputStream;
public:
/// Thresholds.
struct Thresholds
{
time_t time; /// The number of seconds from the insertion of the first row into the block.
size_t rows; /// The number of rows in the block.
size_t bytes; /// The number of (uncompressed) bytes in the block.
time_t time = 0; /// The number of seconds from the insertion of the first row into the block.
size_t rows = 0; /// The number of rows in the block.
size_t bytes = 0; /// The number of (uncompressed) bytes in the block.
};
std::string getName() const override { return "Buffer"; }
@ -135,6 +138,7 @@ private:
const Thresholds min_thresholds;
const Thresholds max_thresholds;
const Thresholds flush_thresholds;
StorageID destination_id;
bool allow_materialized;
@ -153,8 +157,8 @@ private:
/// are exceeded. If reset_block_structure is set - clears inner block
/// structure inside buffer (useful in OPTIMIZE and ALTER).
void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false, bool reset_block_structure = false);
bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const;
bool checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const;
/// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`.
void writeBlockToDestination(const Block & block, StoragePtr table);
@ -177,6 +181,7 @@ protected:
size_t num_shards_,
const Thresholds & min_thresholds_,
const Thresholds & max_thresholds_,
const Thresholds & flush_thresholds_,
const StorageID & destination_id,
bool allow_materialized_);
};

View File

@ -130,6 +130,7 @@ namespace ErrorCodes
extern const int UNKNOWN_POLICY;
extern const int NO_SUCH_DATA_PART;
extern const int INTERSERVER_SCHEME_DOESNT_MATCH;
extern const int DUPLICATE_DATA_PART;
}
namespace ActionLocks
@ -5356,11 +5357,11 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
}
}
void StorageReplicatedMergeTree::fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from_,
bool fetch_part,
ContextPtr query_context)
{
Macros::MacroExpansionInfo info;
@ -5373,40 +5374,54 @@ void StorageReplicatedMergeTree::fetchPartition(
if (from.empty())
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
String partition_id = getPartitionIDFromQuery(partition, query_context);
zkutil::ZooKeeperPtr zookeeper;
if (auxiliary_zookeeper_name != default_zookeeper_name)
{
zookeeper = getContext()->getAuxiliaryZooKeeper(auxiliary_zookeeper_name);
LOG_INFO(log, "Will fetch partition {} from shard {} (auxiliary zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name);
}
else
{
zookeeper = getZooKeeper();
LOG_INFO(log, "Will fetch partition {} from shard {}", partition_id, from_);
}
if (from.back() == '/')
from.resize(from.size() - 1);
if (fetch_part)
{
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
auto part_path = findReplicaHavingPart(part_name, from, zookeeper);
if (part_path.empty())
throw Exception(ErrorCodes::NO_REPLICA_HAS_PART, "Part {} does not exist on any replica", part_name);
/** Let's check that there is no such part in the `detached` directory (where we will write the downloaded parts).
* Unreliable (there is a race condition) - such a part may appear a little later.
*/
if (checkIfDetachedPartExists(part_name))
throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Detached part " + part_name + " already exists.");
LOG_INFO(log, "Will fetch part {} from shard {} (zookeeper '{}')", part_name, from_, auxiliary_zookeeper_name);
try
{
/// part name , metadata, part_path , true, 0, zookeeper
if (!fetchPart(part_name, metadata_snapshot, part_path, true, 0, zookeeper))
throw Exception(ErrorCodes::UNFINISHED, "Failed to fetch part {} from {}", part_name, from_);
}
catch (const DB::Exception & e)
{
if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER && e.code() != ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
&& e.code() != ErrorCodes::CANNOT_READ_ALL_DATA)
throw;
LOG_INFO(log, e.displayText());
}
return;
}
String partition_id = getPartitionIDFromQuery(partition, query_context);
LOG_INFO(log, "Will fetch partition {} from shard {} (zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name);
/** Let's check that there is no such partition in the `detached` directory (where we will write the downloaded parts).
* Unreliable (there is a race condition) - such a partition may appear a little later.
*/
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
{
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version)
&& part_info.partition_id == partition_id)
throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
}
}
if (checkIfDetachedPartitionExists(partition_id))
throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
zkutil::Strings replicas;
zkutil::Strings active_replicas;
@ -6913,4 +6928,46 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
return best_replica;
}
String StorageReplicatedMergeTree::findReplicaHavingPart(
const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_)
{
Strings replicas = zookeeper_->getChildren(zookeeper_path_ + "/replicas");
/// Select replicas in uniformly random order.
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
for (const String & replica : replicas)
{
if (zookeeper_->exists(zookeeper_path_ + "/replicas/" + replica + "/parts/" + part_name)
&& zookeeper_->exists(zookeeper_path_ + "/replicas/" + replica + "/is_active"))
return zookeeper_path_ + "/replicas/" + replica;
}
return {};
}
bool StorageReplicatedMergeTree::checkIfDetachedPartExists(const String & part_name)
{
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
if (dir_it.name() == part_name)
return true;
return false;
}
bool StorageReplicatedMergeTree::checkIfDetachedPartitionExists(const String & partition_name)
{
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
{
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version) && part_info.partition_id == partition_name)
return true;
}
}
return false;
}
}

View File

@ -522,8 +522,11 @@ private:
/** Returns an empty string if no one has a part.
*/
String findReplicaHavingPart(const String & part_name, bool active);
static String findReplicaHavingPart(const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_);
bool checkReplicaHavePart(const String & replica, const String & part_name);
bool checkIfDetachedPartExists(const String & part_name);
bool checkIfDetachedPartitionExists(const String & partition_name);
/** Find replica having specified part or any part that covers it.
* If active = true, consider only active replicas.
@ -626,7 +629,12 @@ private:
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;
void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, ContextPtr query_context) override;
void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from,
bool fetch_part,
ContextPtr query_context) override;
/// Check granularity of already existing replicated table in zookeeper if it exists
/// return true if it's fixed

View File

@ -156,6 +156,7 @@
"extractURLParameterNames"
"extractURLParameters"
"FETCH PARTITION"
"FETCH PART"
"FINAL"
"FIRST"
"firstSignificantSubdomain"

View File

@ -1,5 +1,3 @@
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
@ -18,23 +16,33 @@ def start_cluster():
cluster.shutdown()
def test_fetch_part_from_allowed_zookeeper(start_cluster):
@pytest.mark.parametrize(
('part', 'date', 'part_name'),
[
('PARTITION', '2020-08-27', '2020-08-27'),
('PART', '2020-08-28', '20200828_0_0_0'),
]
)
def test_fetch_part_from_allowed_zookeeper(start_cluster, part, date, part_name):
node.query(
"CREATE TABLE simple (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', 'node') ORDER BY tuple() PARTITION BY date;"
"CREATE TABLE IF NOT EXISTS simple (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', 'node') ORDER BY tuple() PARTITION BY date;"
)
node.query("INSERT INTO simple VALUES ('2020-08-27', 1)")
node.query("""INSERT INTO simple VALUES ('{date}', 1)""".format(date=date))
node.query(
"CREATE TABLE simple2 (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/simple', 'node') ORDER BY tuple() PARTITION BY date;"
"CREATE TABLE IF NOT EXISTS simple2 (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/simple', 'node') ORDER BY tuple() PARTITION BY date;"
)
node.query(
"ALTER TABLE simple2 FETCH PARTITION '2020-08-27' FROM 'zookeeper2:/clickhouse/tables/0/simple';"
)
node.query("ALTER TABLE simple2 ATTACH PARTITION '2020-08-27';")
"""ALTER TABLE simple2 FETCH {part} '{part_name}' FROM 'zookeeper2:/clickhouse/tables/0/simple';""".format(
part=part, part_name=part_name))
node.query("""ALTER TABLE simple2 ATTACH {part} '{part_name}';""".format(part=part, part_name=part_name))
with pytest.raises(QueryRuntimeException):
node.query(
"ALTER TABLE simple2 FETCH PARTITION '2020-08-27' FROM 'zookeeper:/clickhouse/tables/0/simple';"
)
"""ALTER TABLE simple2 FETCH {part} '{part_name}' FROM 'zookeeper:/clickhouse/tables/0/simple';""".format(
part=part, part_name=part_name))
assert node.query("SELECT id FROM simple2").strip() == "1"
assert node.query("""SELECT id FROM simple2 where date = '{date}'""".format(date=date)).strip() == "1"

View File

@ -0,0 +1,4 @@
<test>
<query>SELECT arrayFold(x, acc -> acc + 1, range(100000), toUInt64(0))</query> <!-- count -->
<query>SELECT arrayFold(x, acc -> acc + x, range(100000), toUInt64(0))</query> <!-- sum -->
</test>

View File

@ -55,14 +55,14 @@
INSERT INTO simple_key_direct_dictionary_source_table
SELECT number, number, toString(number), toDecimal64(number, 8), toString(number)
FROM system.numbers
LIMIT 100000;
LIMIT 50000;
</fill_query>
<fill_query>
INSERT INTO complex_key_direct_dictionary_source_table
SELECT number, toString(number), number, toString(number), toDecimal64(number, 8), toString(number)
FROM system.numbers
LIMIT 100000;
LIMIT 50000;
</fill_query>
<substitutions>
@ -79,47 +79,51 @@
<substitution>
<name>elements_count</name>
<values>
<value>25000</value>
<value>50000</value>
<value>75000</value>
<value>100000</value>
</values>
</substitution>
</substitutions>
<query>
SELECT dictGet('default.simple_key_direct_dictionary', {column_name}, number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_direct_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.simple_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.simple_key_direct_dictionary', number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictHas('default.simple_key_direct_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.complex_key_direct_dictionary', {column_name}, (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictGet('default.complex_key_direct_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.complex_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictGet('default.complex_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.complex_key_direct_dictionary', (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictHas('default.complex_key_direct_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;

View File

@ -1,8 +1,4 @@
<test max_ignored_relative_change="0.3">
<preconditions>
<table_exists>please_fix_me</table_exists>
</preconditions>
<create_query>
CREATE TABLE simple_key_flat_dictionary_source_table
(
@ -50,25 +46,30 @@
<substitution>
<name>elements_count</name>
<values>
<value>2500000</value>
<value>5000000</value>
<value>7500000</value>
<value>10000000</value>
</values>
</substitution>
</substitutions>
<query>
SELECT dictGet('default.simple_key_flat_dictionary', {column_name}, number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_flat_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.simple_key_flat_dictionary', number)
SELECT * FROM simple_key_flat_dictionary
FORMAT Null;
</query>
<query>
WITH rand64() % toUInt64(75000000) as key
SELECT dictHas('default.simple_key_flat_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
LIMIT 75000000
FORMAT Null;
</query>

View File

@ -81,35 +81,37 @@
<substitution>
<name>elements_count</name>
<values>
<value>2500000</value>
<value>5000000</value>
<value>7500000</value>
<value>10000000</value>
</values>
</substitution>
</substitutions>
<query>
SELECT dictGet('default.simple_key_hashed_dictionary', {column_name}, number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_hashed_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.simple_key_hashed_dictionary', number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictHas('default.simple_key_hashed_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.complex_key_hashed_dictionary', {column_name}, (number, toString(number)))
WITH (rand64() % toUInt64({elements_count}), toString(rand64() % toUInt64({elements_count}))) as key
SELECT dictGet('default.complex_key_hashed_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.complex_key_hashed_dictionary', (number, toString(number)))
WITH (rand64() % toUInt64({elements_count}), toString(rand64() % toUInt64({elements_count}))) as key
SELECT dictHas('default.complex_key_hashed_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;

View File

@ -0,0 +1,5 @@
<test>
<query>SELECT count() FROM numbers(200000000) WHERE NOT ignore(intDiv(number, 1000000000))</query>
<query>SELECT count() FROM numbers(200000000) WHERE NOT ignore(divide(number, 1000000000))</query>
<query>SELECT count() FROM numbers(200000000) WHERE NOT ignore(toUInt32(divide(number, 1000000000)))</query>
</test>

View File

@ -21,15 +21,12 @@ ORDER BY (engine_id)
SETTINGS replicated_deduplication_window = 2, cleanup_delay_period=4, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 1, 'hello')"
sleep 1
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'hello')"
sleep 1
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 3, 'hello')"
$CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 3 rows
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
while [[ $count != 2 ]]
do
sleep 1
@ -39,9 +36,8 @@ done
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 1, 'hello')"
$CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 4 rows
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
while [[ $count != 2 ]]
do
sleep 1
@ -53,12 +49,10 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'h
$CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 5 rows
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
while [[ $count != 2 ]]
do
sleep 1
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
done
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'hello')"

View File

@ -1,5 +1,5 @@
| id | name | array |
|-:|:-|:-:|
| 1 | name1 | [1,2,3] |
| 2 | name2 | [4,5,6] |
| 3 | name3 | [7,8,9] |
| id | name | array | nullable | low_cardinality | decimal |
|-:|:-|:-|:-|:-|-:|
| 1 | name1 | [1,2,3] | Some long string | name1 | 1.110000 |
| 2 | name2 | [4,5,60000] | \N | Another long string | 222.222222 |
| 30000 | One more long string | [7,8,9] | name3 | name3 | 3.330000 |

View File

@ -1,6 +1,6 @@
DROP TABLE IF EXISTS makrdown;
CREATE TABLE markdown (id UInt32, name String, array Array(Int8)) ENGINE = Memory;
INSERT INTO markdown VALUES (1, 'name1', [1,2,3]), (2, 'name2', [4,5,6]), (3, 'name3', [7,8,9]);
CREATE TABLE markdown (id UInt32, name String, array Array(Int32), nullable Nullable(String), low_cardinality LowCardinality(String), decimal Decimal32(6)) ENGINE = Memory;
INSERT INTO markdown VALUES (1, 'name1', [1,2,3], 'Some long string', 'name1', 1.11), (2, 'name2', [4,5,60000], Null, 'Another long string', 222.222222), (30000, 'One more long string', [7,8,9], 'name3', 'name3', 3.33);
SELECT * FROM markdown FORMAT Markdown;
DROP TABLE IF EXISTS markdown

View File

@ -28,7 +28,7 @@ ALTER TTL ['ALTER MODIFY TTL','MODIFY TTL'] TABLE ALTER TABLE
ALTER MATERIALIZE TTL ['MATERIALIZE TTL'] TABLE ALTER TABLE
ALTER SETTINGS ['ALTER SETTING','ALTER MODIFY SETTING','MODIFY SETTING'] TABLE ALTER TABLE
ALTER MOVE PARTITION ['ALTER MOVE PART','MOVE PARTITION','MOVE PART'] TABLE ALTER TABLE
ALTER FETCH PARTITION ['FETCH PARTITION'] TABLE ALTER TABLE
ALTER FETCH PARTITION ['ALTER FETCH PART','FETCH PARTITION'] TABLE ALTER TABLE
ALTER FREEZE PARTITION ['FREEZE PARTITION','UNFREEZE'] TABLE ALTER TABLE
ALTER TABLE [] \N ALTER
ALTER VIEW REFRESH ['ALTER LIVE VIEW REFRESH','REFRESH VIEW'] VIEW ALTER VIEW

View File

@ -8,21 +8,11 @@ set -e
function thread()
{
db_engine=`$CLICKHOUSE_CLIENT -q "SELECT engine FROM system.databases WHERE name='$CLICKHOUSE_DATABASE'"`
if [[ $db_engine == "Atomic" ]]; then
# Ignore "Replica already exists" exception
while true; do
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS test_table_$1 NO DELAY;
CREATE TABLE test_table_$1 (a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$1') ORDER BY tuple();" 2>&1 |
grep -vP '(^$)|(^Received exception from server)|(^\d+\. )|because the last replica of the table was dropped right now|is already started to be removing by another replica right now|is already finished removing by another replica right now|Removing leftovers from table|Another replica was suddenly created|was successfully removed from ZooKeeper|was created by another server at the same moment|was suddenly removed|some other replicas were created at the same time|already exists'
done
else
while true; do
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS test_table_$1;
CREATE TABLE test_table_$1 (a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$1') ORDER BY tuple();" 2>&1 |
grep -vP '(^$)|(^Received exception from server)|(^\d+\. )|because the last replica of the table was dropped right now|is already started to be removing by another replica right now|is already finished removing by another replica right now|Removing leftovers from table|Another replica was suddenly created|was successfully removed from ZooKeeper|was created by another server at the same moment|was suddenly removed|some other replicas were created at the same time'
done
fi
while true; do
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS test_table_$1 SYNC;
CREATE TABLE test_table_$1 (a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$1') ORDER BY tuple();" 2>&1 |
grep -vP '(^$)|(^Received exception from server)|(^\d+\. )|because the last replica of the table was dropped right now|is already started to be removing by another replica right now|is already finished removing by another replica right now|Removing leftovers from table|Another replica was suddenly created|was successfully removed from ZooKeeper|was created by another server at the same moment|was suddenly removed|some other replicas were created at the same time'
done
}

View File

@ -0,0 +1,8 @@
23
3
101
269
[1,2,3,4]
[4,3,2,1]
([4,3,2,1],[1,2,3,4])
([1,3,5],[2,4,6])

View File

@ -0,0 +1,8 @@
SELECT arrayFold(x,acc -> acc + x * 2, [1,2,3,4], toInt64(3));
SELECT arrayFold(x,acc -> acc + x * 2, emptyArrayInt64(), toInt64(3));
SELECT arrayFold(x,y,acc -> acc + x * 2 + y * 3, [1,2,3,4], [5,6,7,8], toInt64(3));
SELECT arrayFold(x,y,z,acc -> acc + x * 2 + y * 3 + z * 4, [1,2,3,4], [5,6,7,8], [9,10,11,12], toInt64(3));
SELECT arrayFold(x,acc -> arrayPushBack(acc,x), [1,2,3,4], emptyArrayInt64());
SELECT arrayFold(x,acc -> arrayPushFront(acc,x), [1,2,3,4], emptyArrayInt64());
SELECT arrayFold(x,acc -> (arrayPushFront(acc.1,x), arrayPushBack(acc.2,x)), [1,2,3,4], (emptyArrayInt64(), emptyArrayInt64()));
SELECT arrayFold(x,acc -> x % 2 ? (arrayPushBack(acc.1,x), acc.2): (acc.1, arrayPushBack(acc.2,x)), [1,2,3,4,5,6], (emptyArrayInt64(), emptyArrayInt64()));

View File

@ -0,0 +1,22 @@
drop table if exists data_01811;
drop table if exists buffer_01811;
create table data_01811 (key Int) Engine=Memory();
/* Buffer with flush_rows=1000 */
create table buffer_01811 (key Int) Engine=Buffer(currentDatabase(), data_01811,
/* num_layers= */ 1,
/* min_time= */ 1, /* max_time= */ 86400,
/* min_rows= */ 1e9, /* max_rows= */ 1e6,
/* min_bytes= */ 0, /* max_bytes= */ 4e6,
/* flush_time= */ 86400, /* flush_rows= */ 10, /* flush_bytes= */0
);
insert into buffer_01811 select * from numbers(10);
insert into buffer_01811 select * from numbers(10);
-- wait for background buffer flush
select sleep(3) format Null;
select count() from data_01811;
drop table buffer_01811;
drop table data_01811;

View File

@ -0,0 +1,80 @@
0
0
1
3
6
10
15
21
28
36
0
1
3
6
10
15
21
28
36
45
[]
[0]
[1,0]
[2,1,0]
[3,2,1,0]
[4,3,2,1,0]
[5,4,3,2,1,0]
[6,5,4,3,2,1,0]
[7,6,5,4,3,2,1,0]
[8,7,6,5,4,3,2,1,0]
[]
[0]
[1,0]
[1,0,2]
[3,1,0,2]
[3,1,0,2,4]
[5,3,1,0,2,4]
[5,3,1,0,2,4,6]
[7,5,3,1,0,2,4,6]
[7,5,3,1,0,2,4,6,8]
(0,0)
(0,0)
(1,-1)
(3,-3)
(6,-6)
(10,-10)
(15,-15)
(21,-21)
(28,-28)
(36,-36)
(0,0)
(0,0)
(1,-1)
(3,-3)
(6,-6)
(10,-10)
(15,-15)
(21,-21)
(28,-28)
(36,-36)
[(0,0)]
[(0,1),(0,0)]
[(1,2),(0,1),(0,0)]
[(2,3),(1,2),(0,1),(0,0)]
[(3,4),(2,3),(1,2),(0,1),(0,0)]
[(4,5),(3,4),(2,3),(1,2),(0,1),(0,0)]
[(5,6),(4,5),(3,4),(2,3),(1,2),(0,1),(0,0)]
[(6,7),(5,6),(4,5),(3,4),(2,3),(1,2),(0,1),(0,0)]
[(7,8),(6,7),(5,6),(4,5),(3,4),(2,3),(1,2),(0,1),(0,0)]
[(8,9),(7,8),(6,7),(5,6),(4,5),(3,4),(2,3),(1,2),(0,1),(0,0)]
[]
['0']
['0','1']
['0','1','2']
['0','1','2','3']
['0','1','2','3','4']
['0','1','2','3','4','5']
['0','1','2','3','4','5','6']
['0','1','2','3','4','5','6','7']
['0','1','2','3','4','5','6','7','8']

View File

@ -0,0 +1,8 @@
SELECT arrayFold(x,acc -> acc+x, range(number), toInt64(0)) FROM system.numbers LIMIT 10;
SELECT arrayFold(x,acc -> acc+x, range(number), number) FROM system.numbers LIMIT 10;
SELECT arrayFold(x,acc -> arrayPushFront(acc, x), range(number), emptyArrayUInt64()) FROM system.numbers LIMIT 10;
SELECT arrayFold(x,acc -> x % 2 ? arrayPushFront(acc, x) : arrayPushBack(acc, x), range(number), emptyArrayUInt64()) FROM system.numbers LIMIT 10;
SELECT arrayFold(x,acc -> (acc.1+x, acc.2-x), range(number), (toInt64(0), toInt64(0))) FROM system.numbers LIMIT 10;
SELECT arrayFold(x,acc -> (acc.1+x.1, acc.2-x.2), arrayZip(range(number), range(number)), (toInt64(0), toInt64(0))) FROM system.numbers LIMIT 10;
SELECT arrayFold(x,acc -> arrayPushFront(acc, (x, x+1)), range(number), [(toUInt64(0),toUInt64(0))]) FROM system.numbers LIMIT 10;
SELECT arrayFold(x, acc -> concat(acc, arrayMap(z -> toString(x), [number])) , range(number), CAST([] as Array(String))) FROM system.numbers LIMIT 10;

View File

@ -0,0 +1,12 @@
SELECT arrayFold([]); -- { serverError 42 }
SELECT arrayFold([1,2,3]); -- { serverError 42 }
SELECT arrayFold([1,2,3], [4,5,6]); -- { serverError 43 }
SELECT arrayFold(1234); -- { serverError 42 }
SELECT arrayFold(x, acc -> acc + x, 10, 20); -- { serverError 43 }
SELECT arrayFold(x, acc -> acc + x, 10, [20, 30, 40]); -- { serverError 43 }
SELECT arrayFold(x -> x * 2, [1,2,3,4], toInt64(3)); -- { serverError 43 }
SELECT arrayFold(x,acc -> acc+x, number, toInt64(0)) FROM system.numbers LIMIT 10; -- { serverError 43 }
SELECT arrayFold(x,y,acc -> acc + x * 2 + y * 3, [1,2,3,4], [5,6,7], toInt64(3)); -- { serverError 190 }
SELECT arrayFold(x,acc -> acc + x * 2 + y * 3, [1,2,3,4], [5,6,7,8], toInt64(3)); -- { serverError 47 }
SELECT arrayFold(x,acc -> acc + x * 2, [1,2,3,4], [5,6,7,8], toInt64(3)); -- { serverError 43 }
SELECT arrayFold(x,acc -> concat(acc,', ', x), [1, 2, 3, 4], '0') -- { serverError 44 }

View File

@ -0,0 +1,42 @@
drop table if exists data_01817;
drop table if exists buffer_01817;
create table data_01817 (key Int) Engine=Null();
-- w/ flush_*
create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817,
/* num_layers= */ 1,
/* min_time= */ 1, /* max_time= */ 86400,
/* min_rows= */ 1e9, /* max_rows= */ 1e6,
/* min_bytes= */ 0, /* max_bytes= */ 4e6,
/* flush_time= */ 86400, /* flush_rows= */ 10, /* flush_bytes= */0
);
drop table buffer_01817;
-- w/o flush_*
create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817,
/* num_layers= */ 1,
/* min_time= */ 1, /* max_time= */ 86400,
/* min_rows= */ 1e9, /* max_rows= */ 1e6,
/* min_bytes= */ 0, /* max_bytes= */ 4e6
);
drop table buffer_01817;
-- not enough args
create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817,
/* num_layers= */ 1,
/* min_time= */ 1, /* max_time= */ 86400,
/* min_rows= */ 1e9, /* max_rows= */ 1e6,
/* min_bytes= */ 0 /* max_bytes= 4e6 */
); -- { serverError 42 }
-- too much args
create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817,
/* num_layers= */ 1,
/* min_time= */ 1, /* max_time= */ 86400,
/* min_rows= */ 1e9, /* max_rows= */ 1e6,
/* min_bytes= */ 0, /* max_bytes= */ 4e6,
/* flush_time= */ 86400, /* flush_rows= */ 10, /* flush_bytes= */0,
0
); -- { serverError 42 }
drop table data_01817;

View File

@ -89,7 +89,7 @@ def grant_option_check(grant_option_target, grant_target, user_name, table_type,
@Examples("privilege", [
("ALTER MOVE PARTITION",), ("ALTER MOVE PART",), ("MOVE PARTITION",), ("MOVE PART",),
("ALTER DELETE",), ("DELETE",),
("ALTER FETCH PARTITION",), ("FETCH PARTITION",),
("ALTER FETCH PARTITION",), ("ALTER FETCH PART",), ("FETCH PARTITION",),
("ALTER FREEZE PARTITION",), ("FREEZE PARTITION",),
("ALTER UPDATE",), ("UPDATE",),
("ALTER ADD COLUMN",), ("ADD COLUMN",),