Merge branch 'master' into protocol-compression-auto

This commit is contained in:
Alexey Milovidov 2021-04-17 16:46:51 +03:00
commit 77e64b3ebd
137 changed files with 1461 additions and 628 deletions

View File

@ -290,6 +290,12 @@ if (COMPILER_GCC OR COMPILER_CLANG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsized-deallocation")
endif ()
# falign-functions=32 prevents from random performance regressions with the code change. Thus, providing more stable
# benchmarks.
if (COMPILER_GCC OR COMPILER_CLANG)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -falign-functions=32")
endif ()
# Compiler-specific coverage flags e.g. -fcoverage-mapping for gcc
option(WITH_COVERAGE "Profile the resulting binary/binaries" OFF)

View File

@ -12,7 +12,8 @@
///
/// NOTE: it should be used with caution.
#define SCOPE_EXIT_MEMORY(...) SCOPE_EXIT( \
MemoryTracker::LockExceptionInThread lock_memory_tracker; \
MemoryTracker::LockExceptionInThread \
lock_memory_tracker(VariableContext::Global); \
__VA_ARGS__; \
)
@ -56,7 +57,8 @@
#define SCOPE_EXIT_MEMORY_SAFE(...) SCOPE_EXIT( \
try \
{ \
MemoryTracker::LockExceptionInThread lock_memory_tracker; \
MemoryTracker::LockExceptionInThread \
lock_memory_tracker(VariableContext::Global); \
__VA_ARGS__; \
} \
catch (...) \

View File

@ -96,14 +96,8 @@ if (USE_INTERNAL_ZLIB_LIBRARY)
add_subdirectory (${INTERNAL_ZLIB_NAME})
# We should use same defines when including zlib.h as used when zlib compiled
target_compile_definitions (zlib PUBLIC ZLIB_COMPAT WITH_GZFILEOP)
if (TARGET zlibstatic)
target_compile_definitions (zlibstatic PUBLIC ZLIB_COMPAT WITH_GZFILEOP)
endif ()
if (ARCH_AMD64 OR ARCH_AARCH64)
target_compile_definitions (zlib PUBLIC X86_64 UNALIGNED_OK)
if (TARGET zlibstatic)
target_compile_definitions (zlibstatic PUBLIC X86_64 UNALIGNED_OK)
endif ()
endif ()
endif ()

View File

@ -34,9 +34,9 @@ if (OS_LINUX)
# avoid spurious latencies and additional work associated with
# MADV_DONTNEED. See
# https://github.com/ClickHouse/ClickHouse/issues/11121 for motivation.
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:10000")
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000")
else()
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:10000")
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000")
endif()
# CACHE variable is empty, to allow changing defaults without necessity
# to purge cache

2
contrib/zlib-ng vendored

@ -1 +1 @@
Subproject commit 6fd1846c8b8f59436fe2dd752d0f316ddbb64df6
Subproject commit 4039bb4623905e73c6e32a0c022f144bab87b2b3

View File

@ -367,6 +367,9 @@ function run_tests
# JSON functions
01666_blns
# Requires postgresql-client
01802_test_postgresql_protocol_with_row_policy
# Depends on AWS
01801_s3_cluster
)

View File

@ -21,14 +21,14 @@ function start()
-- --path /var/lib/clickhouse1/ --logger.stderr /var/log/clickhouse-server/stderr1.log \
--logger.log /var/log/clickhouse-server/clickhouse-server1.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server1.err.log \
--tcp_port 19000 --tcp_port_secure 19440 --http_port 18123 --https_port 18443 --interserver_http_port 19009 --tcp_with_proxy_port 19010 \
--mysql_port 19004 \
--mysql_port 19004 --postgresql_port 19005 \
--keeper_server.tcp_port 19181 --keeper_server.server_id 2
sudo -E -u clickhouse /usr/bin/clickhouse server --config /etc/clickhouse-server2/config.xml --daemon \
-- --path /var/lib/clickhouse2/ --logger.stderr /var/log/clickhouse-server/stderr2.log \
--logger.log /var/log/clickhouse-server/clickhouse-server2.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server2.err.log \
--tcp_port 29000 --tcp_port_secure 29440 --http_port 28123 --https_port 28443 --interserver_http_port 29009 --tcp_with_proxy_port 29010 \
--mysql_port 29004 \
--mysql_port 29004 --postgresql_port 29005 \
--keeper_server.tcp_port 29181 --keeper_server.server_id 3
fi

View File

@ -28,7 +28,8 @@ RUN apt-get update -y \
tree \
unixodbc \
wget \
mysql-client=5.7*
mysql-client=5.7* \
postgresql-client
RUN pip3 install numpy scipy pandas

View File

@ -44,7 +44,7 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
-- --path /var/lib/clickhouse1/ --logger.stderr /var/log/clickhouse-server/stderr1.log \
--logger.log /var/log/clickhouse-server/clickhouse-server1.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server1.err.log \
--tcp_port 19000 --tcp_port_secure 19440 --http_port 18123 --https_port 18443 --interserver_http_port 19009 --tcp_with_proxy_port 19010 \
--mysql_port 19004 \
--mysql_port 19004 --postgresql_port 19005 \
--keeper_server.tcp_port 19181 --keeper_server.server_id 2 \
--macros.replica r2 # It doesn't work :(
@ -52,7 +52,7 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
-- --path /var/lib/clickhouse2/ --logger.stderr /var/log/clickhouse-server/stderr2.log \
--logger.log /var/log/clickhouse-server/clickhouse-server2.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server2.err.log \
--tcp_port 29000 --tcp_port_secure 29440 --http_port 28123 --https_port 28443 --interserver_http_port 29009 --tcp_with_proxy_port 29010 \
--mysql_port 29004 \
--mysql_port 29004 --postgresql_port 29005 \
--keeper_server.tcp_port 29181 --keeper_server.server_id 3 \
--macros.shard s2 # It doesn't work :(
@ -112,10 +112,13 @@ if [[ -n "$WITH_COVERAGE" ]] && [[ "$WITH_COVERAGE" -eq 1 ]]; then
fi
tar -chf /test_output/text_log_dump.tar /var/lib/clickhouse/data/system/text_log ||:
tar -chf /test_output/query_log_dump.tar /var/lib/clickhouse/data/system/query_log ||:
tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||:
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
pigz < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.gz ||:
pigz < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.gz ||:
mv /var/log/clickhouse-server/stderr1.log /test_output/ ||:
mv /var/log/clickhouse-server/stderr2.log /test_output/ ||:
tar -chf /test_output/coordination1.tar /var/lib/clickhouse1/coordination ||:
tar -chf /test_output/coordination2.tar /var/lib/clickhouse2/coordination ||:
fi

View File

@ -62,7 +62,7 @@ $ cd ClickHouse
$ rm -rf build
$ mkdir build
$ cd build
$ cmake -DCMAKE_C_COMPILER=$(brew --prefix llvm)/bin/clang -DCMAKE_CXX_COMPILER==$(brew --prefix llvm)/bin/clang++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -DENABLE_JEMALLOC=OFF ..
$ cmake -DCMAKE_C_COMPILER=$(brew --prefix llvm)/bin/clang -DCMAKE_CXX_COMPILER=$(brew --prefix llvm)/bin/clang++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -DENABLE_JEMALLOC=OFF ..
$ cmake --build . --config RelWithDebInfo
$ cd ..
```

View File

@ -18,11 +18,17 @@ Engine parameters:
- `num_layers` Parallelism layer. Physically, the table will be represented as `num_layers` of independent buffers. Recommended value: 16.
- `min_time`, `max_time`, `min_rows`, `max_rows`, `min_bytes`, and `max_bytes` Conditions for flushing data from the buffer.
Optional engine parameters:
- `flush_time`, `flush_rows`, `flush_bytes` Conditions for flushing data from the buffer, that will happen only in background (ommited or zero means no `flush*` parameters).
Data is flushed from the buffer and written to the destination table if all the `min*` conditions or at least one `max*` condition are met.
- `min_time`, `max_time` Condition for the time in seconds from the moment of the first write to the buffer.
- `min_rows`, `max_rows` Condition for the number of rows in the buffer.
- `min_bytes`, `max_bytes` Condition for the number of bytes in the buffer.
Also if at least one `flush*` condition are met flush initiated in background, this is different from `max*`, since `flush*` allows you to configure background flushes separately to avoid adding latency for `INSERT` (into `Buffer`) queries.
- `min_time`, `max_time`, `flush_time` Condition for the time in seconds from the moment of the first write to the buffer.
- `min_rows`, `max_rows`, `flush_rows` Condition for the number of rows in the buffer.
- `min_bytes`, `max_bytes`, `flush_bytes` Condition for the number of bytes in the buffer.
During the write operation, data is inserted to a `num_layers` number of random buffers. Or, if the data part to insert is large enough (greater than `max_rows` or `max_bytes`), it is written directly to the destination table, omitting the buffer.

View File

@ -1213,6 +1213,62 @@ SELECT arrayFill(x -> not isNull(x), [1, null, 3, 11, 12, null, null, 5, 6, 14,
Note that the `arrayFill` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it cant be omitted.
## arrayFold(func, arr1, …, init) {#array-fold}
Returns an result of [folding](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) arrays and value `init` using function `func`.
I.e. result of calculation `func(arr1[n], …, func(arr1[n - 1], …, func(…, func(arr1[2], …, func(arr1[1], …, init)))))`.
Note that the `arrayMap` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it cant be omitted.
**Arguments**
- `func` — The lambda function with `n+1` arguments (where `n` is number of input arrays), first `n` arguments are for
current elements of input arrays, and last argument is for current value of accumulator.
- `arr` — Any number of [arrays](../../sql-reference/data-types/array.md).
- `init` - Initial value of accumulator.
**Returned value**
Final value of accumulator.
**Examples**
The following example shows how to acquire product and sum of elements of array:
``` sql
SELECT arrayMap(x, accum -> (accum.1 * x, accum.2 + x), [1, 2, 3], (0, 1)) as res;
```
``` text
┌─res───────┐
│ (120, 15) │
└───────────┘
```
The following example shows how to reverse elements of array:
``` sql
SELECT arrayFold(x, acc -> arrayPushFront(acc, x), [1,2,3,4,5], emptyArrayUInt64()) as res;
```
``` text
┌─res─────────┐
│ [5,4,3,2,1] │
└─────────────┘
```
Folding may be used to access of already passed elements due to function calculation, for example:
``` sql
SELECT arrayFold(x, acc -> (x, concat(acc.2, toString(acc.1), ',')), [1,2], (0,''))
```
``` text
┌─res────────┐
│ (2,'0,1,') │
└────────────┘
```
## arrayReverseFill(func, arr1, …) {#array-reverse-fill}
Scan through `arr1` from the last element to the first element and replace `arr1[i]` by `arr1[i + 1]` if `func` returns 0. The last element of `arr1` will not be replaced.

View File

@ -16,7 +16,7 @@ The following operations with [partitions](../../../engines/table-engines/merget
- [CLEAR COLUMN IN PARTITION](#alter_clear-column-partition) — Resets the value of a specified column in a partition.
- [CLEAR INDEX IN PARTITION](#alter_clear-index-partition) — Resets the specified secondary index in a partition.
- [FREEZE PARTITION](#alter_freeze-partition) — Creates a backup of a partition.
- [FETCH PARTITION](#alter_fetch-partition) — Downloads a partition from another server.
- [FETCH PARTITION\|PART](#alter_fetch-partition) — Downloads a part or partition from another server.
- [MOVE PARTITION\|PART](#alter_move-partition) — Move partition/data part to another disk or volume.
<!-- -->
@ -198,29 +198,35 @@ ALTER TABLE table_name CLEAR INDEX index_name IN PARTITION partition_expr
The query works similar to `CLEAR COLUMN`, but it resets an index instead of a column data.
## FETCH PARTITION {#alter_fetch-partition}
## FETCH PARTITION|PART {#alter_fetch-partition}
``` sql
ALTER TABLE table_name FETCH PARTITION partition_expr FROM 'path-in-zookeeper'
ALTER TABLE table_name FETCH PARTITION|PART partition_expr FROM 'path-in-zookeeper'
```
Downloads a partition from another server. This query only works for the replicated tables.
The query does the following:
1. Downloads the partition from the specified shard. In path-in-zookeeper you must specify a path to the shard in ZooKeeper.
1. Downloads the partition|part from the specified shard. In path-in-zookeeper you must specify a path to the shard in ZooKeeper.
2. Then the query puts the downloaded data to the `detached` directory of the `table_name` table. Use the [ATTACH PARTITION\|PART](#alter_attach-partition) query to add the data to the table.
For example:
1. FETCH PARTITION
``` sql
ALTER TABLE users FETCH PARTITION 201902 FROM '/clickhouse/tables/01-01/visits';
ALTER TABLE users ATTACH PARTITION 201902;
```
2. FETCH PART
``` sql
ALTER TABLE users FETCH PART 201901_2_2_0 FROM '/clickhouse/tables/01-01/visits';
ALTER TABLE users ATTACH PART 201901_2_2_0;
```
Note that:
- The `ALTER ... FETCH PARTITION` query isnt replicated. It places the partition to the `detached` directory only on the local server.
- The `ALTER ... FETCH PARTITION|PART` query isnt replicated. It places the part or partition to the `detached` directory only on the local server.
- The `ALTER TABLE ... ATTACH` query is replicated. It adds the data to all replicas. The data is added to one of the replicas from the `detached` directory, and to the others - from neighboring replicas.
Before downloading, the system checks if the partition exists and the table structure matches. The most appropriate replica is selected automatically from the healthy replicas.

View File

@ -5,39 +5,78 @@ toc_title: ROW POLICY
# CREATE ROW POLICY {#create-row-policy-statement}
Creates [filters for rows](../../../operations/access-rights.md#row-policy-management), which a user can read from a table.
Creates a [row policy](../../../operations/access-rights.md#row-policy-management), i.e. a filter used to determine which rows a user can read from a table.
Syntax:
``` sql
CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluster_name1] ON [db1.]table1
[, policy_name2 [ON CLUSTER cluster_name2] ON [db2.]table2 ...]
[FOR SELECT] USING condition
[AS {PERMISSIVE | RESTRICTIVE}]
[FOR SELECT]
[USING condition]
[TO {role1 [, role2 ...] | ALL | ALL EXCEPT role1 [, role2 ...]}]
```
`ON CLUSTER` clause allows creating row policies on a cluster, see [Distributed DDL](../../../sql-reference/distributed-ddl.md).
## AS Clause {#create-row-policy-as}
## USING Clause {#create-row-policy-using}
Using this section you can create permissive or restrictive policies.
Permissive policy grants access to rows. Permissive policies which apply to the same table are combined together using the boolean `OR` operator. Policies are permissive by default.
Restrictive policy restricts access to rows. Restrictive policies which apply to the same table are combined together using the boolean `AND` operator.
Restrictive policies apply to rows that passed the permissive filters. If you set restrictive policies but no permissive policies, the user cant get any row from the table.
Allows to specify a condition to filter rows. An user will see a row if the condition is calculated to non-zero for the row.
## TO Clause {#create-row-policy-to}
In the section `TO` you can provide a mixed list of roles and users, for example, `CREATE ROW POLICY ... TO accountant, john@localhost`.
In the section `TO` you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`.
Keyword `ALL` means all the ClickHouse users including current user. Keywords `ALL EXCEPT` allow to exclude some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
Keyword `ALL` means all the ClickHouse users including current user. Keyword `ALL EXCEPT` allow to exclude some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
## Examples {#examples}
!!! note "Note"
If there are no row policies defined for a table then any user can `SELECT` all the row from the table.
Defining one or more row policies for the table makes the access to the table depending on the row policies no matter if
those row policies are defined for the current user or not. For example, the following row policy
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO accountant, john@localhost`
`CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter`
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO ALL EXCEPT mira`
forbids the users `mira` and `peter` to see the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all! If that isn't desirable you can fix it by adding one more row policy, for example:
`CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter`
## AS Clause {#create-row-policy-as}
It's allowed to have more than one policy enabled on the same table for the same user at the one time.
So we need a way to combine the conditions from multiple policies.
By default policies are combined using the boolean `OR` operator. For example, the following policies
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio
```
enables the user `peter` to see rows with either `b=1` or `c=2`.
The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive.
By default policies are permissive, which means they are combined using the boolean `OR` operator.
A policy can be defined as restrictive as an alternative. Restrictive policies are combined using the boolean `AND` operator.
Here is the formula:
```
row_is_visible = (one or more of the permissive policies' conditions are non-zero) AND (all of the restrictive policies's conditions are non-zero)`
```
For example, the following policies
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
```
enables the user `peter` to see rows only if both `b=1` AND `c=2`.
## Examples
`CREATE ROW POLICY filter1 ON mydb.mytable USING a<1000 TO accountant, john@localhost`
`CREATE ROW POLICY filter2 ON mydb.mytable USING a<1000 AND b=5 TO ALL EXCEPT mira`
`CREATE ROW POLICY filter3 ON mydb.mytable USING 1 TO admin`

View File

@ -279,7 +279,7 @@ Allows executing [ALTER](../../sql-reference/statements/alter/index.md) queries
- `ALTER MATERIALIZE TTL`. Level: `TABLE`. Aliases: `MATERIALIZE TTL`
- `ALTER SETTINGS`. Level: `TABLE`. Aliases: `ALTER SETTING`, `ALTER MODIFY SETTING`, `MODIFY SETTING`
- `ALTER MOVE PARTITION`. Level: `TABLE`. Aliases: `ALTER MOVE PART`, `MOVE PARTITION`, `MOVE PART`
- `ALTER FETCH PARTITION`. Level: `TABLE`. Aliases: `FETCH PARTITION`
- `ALTER FETCH PARTITION`. Level: `TABLE`. Aliases: `ALTER FETCH PART`, `FETCH PARTITION`, `FETCH PART`
- `ALTER FREEZE PARTITION`. Level: `TABLE`. Aliases: `FREEZE PARTITION`
- `ALTER VIEW` Level: `GROUP`
- `ALTER VIEW REFRESH`. Level: `VIEW`. Aliases: `ALTER LIVE VIEW REFRESH`, `REFRESH VIEW`

View File

@ -1147,6 +1147,62 @@ SELECT arrayReverseFill(x -> not isNull(x), [1, null, 3, 11, 12, null, null, 5,
Функция `arrayReverseFill` является [функцией высшего порядка](../../sql-reference/functions/index.md#higher-order-functions) — в качестве первого аргумента ей нужно передать лямбда-функцию, и этот аргумент не может быть опущен.
## arrayFold(func, arr1, …, init) {#array-fold}
Возвращает результат [сворачивания](https://ru.wikipedia.org/wiki/%D0%A1%D0%B2%D1%91%D1%80%D1%82%D0%BA%D0%B0_%D1%81%D0%BF%D0%B8%D1%81%D0%BA%D0%B0) массивов и начального значения `init` с помощью функции `func`.
Т.е. результат вычисления `func(arr1[n], …, func(arr1[n - 1], …, func(…, func(arr1[2], …, func(arr1[1], …, init)))))`.
Функция `arrayFold` является [функцией высшего порядка](../../sql-reference/functions/index.md#higher-order-functions) — в качестве первого аргумента ей нужно передать лямбда-функцию, и этот аргумент не может быть опущен.
**Аргументы**
- `func` — лямбда-функция с `n+1` параметром (где `n` это количество входных массивов), причём первые `n` параметров
используются для текущих элементов входных массивов, а последний элемент для текущего значения аккумулятора.
- `arr` — произвольное количество [массивов](../../sql-reference/data-types/array.md).
- `init` - начальное значение аккумулятора.
**Возвращаемое значение**
Итоговое значение аккумулятора.
**Примеры**
Следующий пример показывает, как вычислить произведение и сумму элементов массива:
``` sql
SELECT arrayMap(x, accum -> (accum.1 * x, accum.2 + x), [1, 2, 3], (0, 1)) as res;
```
``` text
┌─res───────┐
│ (120, 15) │
└───────────┘
```
В этом примере показано, как обратить массив:
``` sql
SELECT arrayFold(x, acc -> arrayPushFront(acc, x), [1,2,3,4,5], emptyArrayUInt64()) as res;
```
``` text
┌─res─────────┐
│ [5,4,3,2,1] │
└─────────────┘
```
Свёртка может быть использована для доступа к уже пройденным в процессе вычисления элементам. Например:
``` sql
SELECT arrayFold(x, acc -> (x, concat(acc.2, toString(acc.1), ',')), [1,2], (0,''))
```
``` text
┌─res────────┐
│ (2,'0,1,') │
└────────────┘
```
## arraySplit(func, arr1, …) {#array-split}
Разделяет массив `arr1` на несколько. Если `func` возвращает не 0, то массив разделяется, а элемент помещается в левую часть. Массив не разбивается по первому элементу.
@ -1183,6 +1239,7 @@ SELECT arrayReverseSplit((x, y) -> y, [1, 2, 3, 4, 5], [1, 0, 0, 1, 0]) AS res
Функция `arrayReverseSplit` является [функцией высшего порядка](../../sql-reference/functions/index.md#higher-order-functions) — в качестве первого аргумента ей нужно передать лямбда-функцию, и этот аргумент не может быть опущен.
## arrayExists(\[func,\] arr1, …) {#arrayexistsfunc-arr1}
Возвращает 1, если существует хотя бы один элемент массива `arr`, для которого функция func возвращает не 0. Иначе возвращает 0.

View File

@ -5,7 +5,7 @@ toc_title: "Политика доступа"
# CREATE ROW POLICY {#create-row-policy-statement}
Создает [фильтры для строк](../../../operations/access-rights.md#row-policy-management), которые пользователь может прочесть из таблицы.
Создает [политики доступа к строкам](../../../operations/access-rights.md#row-policy-management), т.е. фильтры, которые определяют, какие строки пользователь может читать из таблицы.
Синтаксис:
@ -13,33 +13,68 @@ toc_title: "Политика доступа"
CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluster_name1] ON [db1.]table1
[, policy_name2 [ON CLUSTER cluster_name2] ON [db2.]table2 ...]
[AS {PERMISSIVE | RESTRICTIVE}]
[FOR SELECT]
[USING condition]
[FOR SELECT] USING condition
[TO {role [,...] | ALL | ALL EXCEPT role [,...]}]
```
Секция `ON CLUSTER` позволяет создавать фильтры для строк на кластере, см. [Распределенные DDL запросы](../../../sql-reference/distributed-ddl.md).
Секция `ON CLUSTER` позволяет создавать политики на кластере, см. [Распределенные DDL запросы](../../../sql-reference/distributed-ddl.md).
## Секция AS {#create-row-policy-as}
## USING Clause {#create-row-policy-using}
С помощью данной секции можно создать политику разрешения или ограничения.
Политика разрешения предоставляет доступ к строкам. Разрешительные политики, которые применяются к одной таблице, объединяются с помощью логического оператора `OR`. Политики являются разрешительными по умолчанию.
Политика ограничения запрещает доступ к строкам. Ограничительные политики, которые применяются к одной таблице, объединяются логическим оператором `AND`.
Ограничительные политики применяются к строкам, прошедшим фильтр разрешительной политики. Если вы не зададите разрешительные политики, пользователь не сможет обращаться ни к каким строкам из таблицы.
Секция `USING` указывает условие для фильтрации строк. Пользователь может видеть строку, если это условие, вычисленное для строки, дает ненулевой результат.
## Секция TO {#create-row-policy-to}
В секции `TO` вы можете перечислить как роли, так и пользователей. Например, `CREATE ROW POLICY ... TO accountant, john@localhost`.
В секции `TO` перечисляются пользователи и роли, для которых должна действовать политика. Например, `CREATE ROW POLICY ... TO accountant, john@localhost`.
Ключевым словом `ALL` обозначаются все пользователи, включая текущего. Ключевые слова `ALL EXCEPT` позволяют исключить пользователей из списка всех пользователей. Например, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
!!! note "Note"
Если для таблицы не задано ни одной политики доступа к строкам, то любой пользователь может выполнить `SELECT` и получить все строки таблицы.
Если определить хотя бы одну политику для таблицы, до доступ к строкам будет управляться этими политиками, причем для всех пользователей
(даже для тех, для кого политики не определялись). Например, следующая политика
`CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter`
запретит пользователям `mira` и `peter` видеть строки с `b != 1`, и еще запретит всем остальным пользователям (например, пользователю `paul`)
видеть какие-либо строки вообще из таблицы `mydb.table1`! Если это нежелательно, такое поведение можно исправить, определив дополнительную политику:
`CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter`
## Секция AS {#create-row-policy-as}
Может быть одновременно активно более одной политики для одной и той же таблицы и одного и того же пользователя.
Поэтому нам нужен способ комбинировать политики. По умолчанию политики комбинируются с использованием логического оператора `OR`.
Например, политики:
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio
```
разрешат пользователю с именем `peter` видеть строки, для которых будет верно `b=1` или `c=2`.
Секция `AS` указывает, как политики должны комбинироваться с другими политиками. Политики могут быть или разрешительными (`PERMISSIVE`), или ограничительными (`RESTRICTIVE`). По умолчанию политики создаются разрешительными (`PERMISSIVE`); такие политики комбинируются с использованием логического оператора `OR`.
Ограничительные (`RESTRICTIVE`) политики комбинируются с использованием логического оператора `AND`.
Используется следующая формула:
`строка_видима = (одна или больше permissive-политик дала ненулевой результат проверки условия) И (все restrictive-политики дали ненулевой результат проверки условия)`
Например, политики
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
```
разрешат пользователю с именем `peter` видеть только те строки, для которых будет одновременно `b=1` и `c=2`.
## Примеры
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO accountant, john@localhost`
`CREATE ROW POLICY filter1 ON mydb.mytable USING a<1000 TO accountant, john@localhost`
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO ALL EXCEPT mira`
`CREATE ROW POLICY filter2 ON mydb.mytable USING a<1000 AND b=5 TO ALL EXCEPT mira`
`CREATE ROW POLICY filter3 ON mydb.mytable USING 1 TO admin`
<!--hide-->

View File

@ -7,7 +7,20 @@
-->
<yandex>
<logger>
<!-- Possible levels: https://github.com/pocoproject/poco/blob/poco-1.9.4-release/Foundation/include/Poco/Logger.h#L105 -->
<!-- Possible levels [1]:
- none (turns off logging)
- fatal
- critical
- error
- warning
- notice
- information
- debug
- trace
[1]: https://github.com/pocoproject/poco/blob/poco-1.9.4-release/Foundation/include/Poco/Logger.h#L105-L114
-->
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
@ -76,7 +89,7 @@
<!-- Compatibility with PostgreSQL protocol.
ClickHouse will pretend to be PostgreSQL for applications connecting to this port.
-->
<!-- <postgresql_port>9005</postgresql_port> -->
<postgresql_port>9005</postgresql_port>
<!-- HTTP API with TLS (HTTPS).
You have to configure certificate to enable this interface.

View File

@ -62,7 +62,7 @@ enum class AccessType
enabled implicitly by the grant ALTER_TABLE */\
M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
\
M(ALTER_TABLE, "", GROUP, ALTER) \

View File

@ -150,7 +150,7 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_
///
/// And in this case the exception will not be logged, so let's block the
/// MemoryTracker until the exception will be logged.
MemoryTracker::LockExceptionInThread lock_memory_tracker;
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
try
{

View File

@ -24,8 +24,8 @@ namespace
///
/// - when it is explicitly blocked with LockExceptionInThread
///
/// - to avoid std::terminate(), when stack unwinding is currently in progress
/// in this thread.
/// - when there are uncaught exceptions objects in the current thread
/// (to avoid std::terminate())
///
/// NOTE: that since C++11 destructor marked with noexcept by default, and
/// this means that any throw from destructor (that is not marked with

View File

@ -146,6 +146,9 @@
M(StorageBufferPassedTimeMaxThreshold, "") \
M(StorageBufferPassedRowsMaxThreshold, "") \
M(StorageBufferPassedBytesMaxThreshold, "") \
M(StorageBufferPassedTimeFlushThreshold, "") \
M(StorageBufferPassedRowsFlushThreshold, "") \
M(StorageBufferPassedBytesFlushThreshold, "") \
M(StorageBufferLayerLockReadersWaitMilliseconds, "Time for waiting for Buffer layer during reading") \
M(StorageBufferLayerLockWritersWaitMilliseconds, "Time for waiting free Buffer layer to write to (can be used to tune Buffer layers)") \
\

View File

@ -832,10 +832,13 @@ class NoPasswordAuth : public AuthenticationMethod
{
public:
void authenticate(
const String & /* user_name */,
ContextPtr /* context */,
Messaging::MessageTransport & /* mt */,
const Poco::Net::SocketAddress & /* address */) override {}
const String & user_name,
ContextPtr context,
Messaging::MessageTransport & mt,
const Poco::Net::SocketAddress & address) override
{
setPassword(user_name, "", context, mt, address);
}
Authentication::Type getType() const override
{

View File

@ -252,8 +252,6 @@ class IColumn;
* Almost all limits apply to each stream individually. \
*/ \
\
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \
M(UInt64, offset, 0, "Offset on read rows from the most 'end' result for select query", 0) \
M(UInt64, max_rows_to_read, 0, "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(UInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(OverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
@ -464,6 +462,8 @@ class IColumn;
\
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \
M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Passible values: default, stream.", 0) \
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \
M(UInt64, offset, 0, "Offset on read rows from the most 'end' result for select query", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.

View File

@ -311,7 +311,7 @@ void PushingToViewsBlockOutputStream::writeSuffix()
UInt64 milliseconds = main_watch.elapsedMilliseconds();
if (views.size() > 1)
{
LOG_TRACE(log, "Pushing from {} to {} views took {} ms.",
LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.",
storage->getStorageID().getNameForLogs(), views.size(),
milliseconds);
}

View File

@ -49,6 +49,16 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
to.writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<void(const Block & block)> & progress,
std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
{
return isAtomicSet(is_cancelled);
};
copyDataImpl(from, to, is_cancelled_pred, progress);
}
inline void doNothing(const Block &) {}

View File

@ -16,6 +16,9 @@ class Block;
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<void(const Block & block)> & progress,
std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,

View File

@ -51,6 +51,14 @@ Columns DirectDictionary<dictionary_key_type>::getColumns(
key_to_fetched_index.reserve(requested_keys.size());
auto fetched_columns_from_storage = request.makeAttributesResultColumns();
for (size_t attribute_index = 0; attribute_index < request.attributesSize(); ++attribute_index)
{
if (!request.shouldFillResultColumnWithIndex(attribute_index))
continue;
auto & fetched_column_from_storage = fetched_columns_from_storage[attribute_index];
fetched_column_from_storage->reserve(requested_keys.size());
}
size_t fetched_key_index = 0;

View File

@ -51,7 +51,6 @@ SRCS(
HierarchyDictionariesUtils.cpp
IPAddressDictionary.cpp
LibraryDictionarySource.cpp
LibraryDictionarySourceExternal.cpp
MongoDBDictionarySource.cpp
MySQLDictionarySource.cpp
PolygonDictionary.cpp

View File

@ -1,9 +1,17 @@
#include <aws/core/client/DefaultRetryStrategy.h>
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#include <IO/ReadHelpers.h>
#include <IO/S3Common.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <common/logger_useful.h>
#if USE_AWS_S3
#include <aws/core/client/DefaultRetryStrategy.h>
#include <IO/S3Common.h>
#include "DiskS3.h"
#include "Disks/DiskCacheWrapper.h"
#include "Disks/DiskFactory.h"
@ -122,7 +130,7 @@ void registerDiskS3(DiskFactory & factory)
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000);
client_configuration.httpRequestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
client_configuration.endpointOverride = uri.endpoint;
@ -196,3 +204,10 @@ void registerDiskS3(DiskFactory & factory)
}
}
#else
void registerDiskS3(DiskFactory &) {}
#endif

View File

@ -1,5 +1,7 @@
configure_file(config_functions.h.in ${ConfigIncludePath}/config_functions.h)
add_subdirectory(divide)
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_functions .)
@ -25,7 +27,7 @@ target_link_libraries(clickhouse_functions
PRIVATE
${ZLIB_LIBRARIES}
boost::filesystem
libdivide
divide_impl
)
if (OPENSSL_CRYPTO_LIBRARY)

View File

@ -0,0 +1,187 @@
#include "FunctionArrayMapped.h"
#include <Functions/FunctionFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
extern const int TYPE_MISMATCH;
}
/** arrayFold(x1,...,xn,accum -> expression, array1,...,arrayn, init_accum) - apply the expression to each element of the array (or set of parallel arrays).
*/
class FunctionArrayFold : public IFunction
{
public:
static constexpr auto name = "arrayFold";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayFold>(); }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
void getLambdaArgumentTypes(DataTypes & arguments) const override
{
if (arguments.size() < 3)
throw Exception("Function " + getName() + " needs lambda function, at least one array argument and one accumulator argument.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
DataTypes nested_types(arguments.size() - 1);
for (size_t i = 0; i < nested_types.size() - 1; ++i)
{
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(&*arguments[i + 1]);
if (!array_type)
throw Exception("Argument " + toString(i + 2) + " of function " + getName() + " must be array. Found "
+ arguments[i + 1]->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
nested_types[i] = recursiveRemoveLowCardinality(array_type->getNestedType());
}
nested_types[nested_types.size() - 1] = arguments[arguments.size() - 1];
const DataTypeFunction * function_type = checkAndGetDataType<DataTypeFunction>(arguments[0].get());
if (!function_type || function_type->getArgumentTypes().size() != nested_types.size())
throw Exception("First argument for this overload of " + getName() + " must be a function with "
+ toString(nested_types.size()) + " arguments. Found "
+ arguments[0]->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
arguments[0] = std::make_shared<DataTypeFunction>(nested_types);
}
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() < 2)
throw Exception("Function " + getName() + " needs at least 2 arguments; passed "
+ toString(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto * data_type_function = checkAndGetDataType<DataTypeFunction>(arguments[0].type.get());
if (!data_type_function)
throw Exception("First argument for function " + getName() + " must be a function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto const accumulator_type = arguments.back().type;
auto const lambda_type = data_type_function->getReturnType();
if (! accumulator_type->equals(*lambda_type))
throw Exception("Return type of lambda function must be the same as the accumulator type. "
"Inferred type of lambda " + lambda_type->getName() + ", "
+ "inferred type of accumulator " + accumulator_type->getName() + ".",
ErrorCodes::TYPE_MISMATCH);
return DataTypePtr(accumulator_type);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
const auto & column_with_type_and_name = arguments[0];
if (!column_with_type_and_name.column)
throw Exception("First argument for function " + getName() + " must be a function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const auto * column_function = typeid_cast<const ColumnFunction *>(column_with_type_and_name.column.get());
if (!column_function)
throw Exception("First argument for function " + getName() + " must be a function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ColumnPtr offsets_column;
ColumnPtr column_first_array_ptr;
const ColumnArray * column_first_array = nullptr;
ColumnsWithTypeAndName arrays;
arrays.reserve(arguments.size() - 1);
for (size_t i = 1; i < arguments.size() - 1; ++i)
{
const auto & array_with_type_and_name = arguments[i];
ColumnPtr column_array_ptr = array_with_type_and_name.column;
const auto * column_array = checkAndGetColumn<ColumnArray>(column_array_ptr.get());
const DataTypePtr & array_type_ptr = array_with_type_and_name.type;
const auto * array_type = checkAndGetDataType<DataTypeArray>(array_type_ptr.get());
if (!column_array)
{
const ColumnConst * column_const_array = checkAndGetColumnConst<ColumnArray>(column_array_ptr.get());
if (!column_const_array)
throw Exception("Expected array column, found " + column_array_ptr->getName(), ErrorCodes::ILLEGAL_COLUMN);
column_array_ptr = recursiveRemoveLowCardinality(column_const_array->convertToFullColumn());
column_array = checkAndGetColumn<ColumnArray>(column_array_ptr.get());
}
if (!array_type)
throw Exception("Expected array type, found " + array_type_ptr->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!offsets_column)
{
offsets_column = column_array->getOffsetsPtr();
}
else
{
/// The first condition is optimization: do not compare data if the pointers are equal.
if (column_array->getOffsetsPtr() != offsets_column
&& column_array->getOffsets() != typeid_cast<const ColumnArray::ColumnOffsets &>(*offsets_column).getData())
throw Exception("Arrays passed to " + getName() + " must have equal size", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
if (i == 1)
{
column_first_array_ptr = column_array_ptr;
column_first_array = column_array;
}
arrays.emplace_back(ColumnWithTypeAndName(column_array->getDataPtr(),
recursiveRemoveLowCardinality(array_type->getNestedType()),
array_with_type_and_name.name));
}
arrays.emplace_back(arguments.back());
MutableColumnPtr result = arguments.back().column->convertToFullColumnIfConst()->cloneEmpty();
size_t arr_cursor = 0;
for (size_t irow = 0; irow < column_first_array->size(); ++irow) // for each row of result
{
// Make accumulator column for this row. We initialize it
// with the starting value given as the last argument.
ColumnWithTypeAndName accumulator_column = arguments.back();
ColumnPtr acc(accumulator_column.column->cut(irow, 1));
auto accumulator = ColumnWithTypeAndName(acc,
accumulator_column.type,
accumulator_column.name);
ColumnPtr res(acc);
size_t const arr_next = column_first_array->getOffsets()[irow]; // when we do folding
for (size_t iter = 0; arr_cursor < arr_next; ++iter, ++arr_cursor)
{
// Make slice of input arrays and accumulator for lambda
ColumnsWithTypeAndName iter_arrays;
iter_arrays.reserve(arrays.size() + 1);
for (size_t icolumn = 0; icolumn < arrays.size() - 1; ++icolumn)
{
auto const & arr = arrays[icolumn];
iter_arrays.emplace_back(ColumnWithTypeAndName(arr.column->cut(arr_cursor, 1),
arr.type,
arr.name));
}
iter_arrays.emplace_back(accumulator);
// Calculate function on arguments
auto replicated_column_function_ptr = IColumn::mutate(column_function->replicate(ColumnArray::Offsets(column_first_array->getOffsets().size(), 1)));
auto * replicated_column_function = typeid_cast<ColumnFunction *>(replicated_column_function_ptr.get());
replicated_column_function->appendArguments(iter_arrays);
auto lambda_result = replicated_column_function->reduce().column;
if (lambda_result->lowCardinality())
lambda_result = lambda_result->convertToFullColumnIfLowCardinality();
res = lambda_result->cut(0, 1);
accumulator.column = res;
}
result->insert((*res)[0]);
}
return result;
}
};
void registerFunctionArrayFold(FunctionFactory & factory)
{
factory.registerFunction<FunctionArrayFold>();
}
}

View File

@ -0,0 +1,22 @@
# A library for integer division by constant with CPU dispatching.
if (ARCH_AMD64)
add_library(divide_impl_sse2 divideImpl.cpp)
target_compile_options(divide_impl_sse2 PRIVATE -msse2 -DNAMESPACE=SSE2)
target_link_libraries(divide_impl_sse2 libdivide)
add_library(divide_impl_avx2 divideImpl.cpp)
target_compile_options(divide_impl_avx2 PRIVATE -mavx2 -DNAMESPACE=AVX2)
target_link_libraries(divide_impl_avx2 libdivide)
set(IMPLEMENTATIONS divide_impl_sse2 divide_impl_avx2)
else ()
add_library(divide_impl_generic divideImpl.cpp)
target_compile_options(divide_impl_generic PRIVATE -DNAMESPACE=Generic)
target_link_libraries(divide_impl_generic libdivide)
set(IMPLEMENTATIONS divide_impl_generic)
endif ()
add_library(divide_impl divide.cpp)
target_link_libraries(divide_impl ${IMPLEMENTATIONS} clickhouse_common_io)

View File

@ -0,0 +1,57 @@
#include "divide.h"
#include <Common/CpuId.h>
#if defined(__x86_64__) && !defined(ARCADIA_BUILD)
namespace SSE2
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
namespace AVX2
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
#else
namespace Generic
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
#endif
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size)
{
#if defined(__x86_64__) && !defined(ARCADIA_BUILD)
if (DB::Cpu::CpuFlagsCache::have_AVX2)
AVX2::divideImpl(a_pos, b, c_pos, size);
else if (DB::Cpu::CpuFlagsCache::have_SSE2)
SSE2::divideImpl(a_pos, b, c_pos, size);
#else
Generic::divideImpl(a_pos, b, c_pos, size);
#endif
}
template void divideImpl<uint64_t, uint64_t, uint64_t>(const uint64_t * __restrict, uint64_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint32_t, uint64_t>(const uint64_t * __restrict, uint32_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint16_t, uint64_t>(const uint64_t * __restrict, uint16_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, char8_t, uint64_t>(const uint64_t * __restrict, char8_t, uint64_t * __restrict, size_t);
template void divideImpl<uint32_t, uint64_t, uint32_t>(const uint32_t * __restrict, uint64_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint32_t, uint32_t>(const uint32_t * __restrict, uint32_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint16_t, uint32_t>(const uint32_t * __restrict, uint16_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, char8_t, uint32_t>(const uint32_t * __restrict, char8_t, uint32_t * __restrict, size_t);
template void divideImpl<int64_t, int64_t, int64_t>(const int64_t * __restrict, int64_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int32_t, int64_t>(const int64_t * __restrict, int32_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int16_t, int64_t>(const int64_t * __restrict, int16_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int8_t, int64_t>(const int64_t * __restrict, int8_t, int64_t * __restrict, size_t);
template void divideImpl<int32_t, int64_t, int32_t>(const int32_t * __restrict, int64_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int32_t, int32_t>(const int32_t * __restrict, int32_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int16_t, int32_t>(const int32_t * __restrict, int16_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int8_t, int32_t>(const int32_t * __restrict, int8_t, int32_t * __restrict, size_t);

View File

@ -0,0 +1,6 @@
#pragma once
#include <cstddef>
template <typename A, typename B, typename ResultType>
extern void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);

View File

@ -0,0 +1,79 @@
/// This translation unit should be compiled multiple times
/// with different values of NAMESPACE and machine flags (sse2, avx2).
#if !defined(NAMESPACE)
#if defined(ARCADIA_BUILD)
#define NAMESPACE Generic
#else
#error "NAMESPACE macro must be defined"
#endif
#endif
#if defined(__AVX2__)
#define REG_SIZE 32
#define LIBDIVIDE_AVX2
#elif defined(__SSE2__)
#define REG_SIZE 16
#define LIBDIVIDE_SSE2
#endif
#include <libdivide.h>
namespace NAMESPACE
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size)
{
libdivide::divider<A> divider(b);
const A * a_end = a_pos + size;
#if defined(__SSE2__)
static constexpr size_t values_per_simd_register = REG_SIZE / sizeof(A);
const A * a_end_simd = a_pos + size / values_per_simd_register * values_per_simd_register;
while (a_pos < a_end_simd)
{
#if defined(__AVX2__)
_mm256_storeu_si256(reinterpret_cast<__m256i *>(c_pos),
_mm256_loadu_si256(reinterpret_cast<const __m256i *>(a_pos)) / divider);
#else
_mm_storeu_si128(reinterpret_cast<__m128i *>(c_pos),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a_pos)) / divider);
#endif
a_pos += values_per_simd_register;
c_pos += values_per_simd_register;
}
#endif
while (a_pos < a_end)
{
*c_pos = *a_pos / divider;
++a_pos;
++c_pos;
}
}
template void divideImpl<uint64_t, uint64_t, uint64_t>(const uint64_t * __restrict, uint64_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint32_t, uint64_t>(const uint64_t * __restrict, uint32_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint16_t, uint64_t>(const uint64_t * __restrict, uint16_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, char8_t, uint64_t>(const uint64_t * __restrict, char8_t, uint64_t * __restrict, size_t);
template void divideImpl<uint32_t, uint64_t, uint32_t>(const uint32_t * __restrict, uint64_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint32_t, uint32_t>(const uint32_t * __restrict, uint32_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint16_t, uint32_t>(const uint32_t * __restrict, uint16_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, char8_t, uint32_t>(const uint32_t * __restrict, char8_t, uint32_t * __restrict, size_t);
template void divideImpl<int64_t, int64_t, int64_t>(const int64_t * __restrict, int64_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int32_t, int64_t>(const int64_t * __restrict, int32_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int16_t, int64_t>(const int64_t * __restrict, int16_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int8_t, int64_t>(const int64_t * __restrict, int8_t, int64_t * __restrict, size_t);
template void divideImpl<int32_t, int64_t, int32_t>(const int32_t * __restrict, int64_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int32_t, int32_t>(const int32_t * __restrict, int32_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int16_t, int32_t>(const int32_t * __restrict, int16_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int8_t, int32_t>(const int32_t * __restrict, int8_t, int32_t * __restrict, size_t);
}

View File

@ -1,11 +1,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionBinaryArithmetic.h>
#if defined(__SSE2__)
# define LIBDIVIDE_SSE2 1
#endif
#include <libdivide.h>
#include "divide/divide.h"
namespace DB
@ -70,34 +66,11 @@ struct DivideIntegralByConstantImpl
if (unlikely(static_cast<A>(b) == 0))
throw Exception("Division by zero", ErrorCodes::ILLEGAL_DIVISION);
libdivide::divider<A> divider(b);
const A * a_end = a_pos + size;
#if defined(__SSE2__)
static constexpr size_t values_per_sse_register = 16 / sizeof(A);
const A * a_end_sse = a_pos + size / values_per_sse_register * values_per_sse_register;
while (a_pos < a_end_sse)
{
_mm_storeu_si128(reinterpret_cast<__m128i *>(c_pos),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a_pos)) / divider);
a_pos += values_per_sse_register;
c_pos += values_per_sse_register;
}
#endif
while (a_pos < a_end)
{
*c_pos = *a_pos / divider;
++a_pos;
++c_pos;
}
divideImpl(a_pos, b, c_pos, size);
}
};
/** Specializations are specified for dividing numbers of the type UInt64 and UInt32 by the numbers of the same sign.
/** Specializations are specified for dividing numbers of the type UInt64, UInt32, Int64, Int32 by the numbers of the same sign.
* Can be expanded to all possible combinations, but more code is needed.
*/

View File

@ -78,12 +78,11 @@ struct ModuloByConstantImpl
if (b < 0)
b = -b;
libdivide::divider<A> divider(b);
/// Here we failed to make the SSE variant from libdivide give an advantage.
if (b & (b - 1))
{
libdivide::divider<A> divider(b);
for (size_t i = 0; i < size; ++i)
dst[i] = src[i] - (src[i] / divider) * b; /// NOTE: perhaps, the division semantics with the remainder of negative numbers is not preserved.
}

View File

@ -4,6 +4,7 @@ namespace DB
class FunctionFactory;
void registerFunctionArrayMap(FunctionFactory & factory);
void registerFunctionArrayFold(FunctionFactory & factory);
void registerFunctionArrayFilter(FunctionFactory & factory);
void registerFunctionArrayCount(FunctionFactory & factory);
void registerFunctionArrayExists(FunctionFactory & factory);
@ -22,6 +23,7 @@ void registerFunctionArrayDifference(FunctionFactory & factory);
void registerFunctionsHigherOrder(FunctionFactory & factory)
{
registerFunctionArrayMap(factory);
registerFunctionArrayFold(factory);
registerFunctionArrayFilter(factory);
registerFunctionArrayCount(factory);
registerFunctionArrayExists(factory);

View File

@ -144,6 +144,7 @@ SRCS(
array/arrayFirst.cpp
array/arrayFirstIndex.cpp
array/arrayFlatten.cpp
array/arrayFold.cpp
array/arrayIntersect.cpp
array/arrayJoin.cpp
array/arrayMap.cpp
@ -229,6 +230,8 @@ SRCS(
defaultValueOfTypeName.cpp
demange.cpp
divide.cpp
divide/divide.cpp
divide/divideImpl.cpp
dumpColumnStructure.cpp
e.cpp
empty.cpp

View File

@ -50,7 +50,7 @@ BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int comp
BrotliWriteBuffer::~BrotliWriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
}

View File

@ -50,7 +50,7 @@ LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer(
LZMADeflatingWriteBuffer::~LZMADeflatingWriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
lzma_end(&lstr);

View File

@ -81,8 +81,8 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & clientConfigu
: per_request_configuration(clientConfiguration.perRequestConfiguration)
, timeouts(ConnectionTimeouts(
Poco::Timespan(clientConfiguration.connectTimeoutMs * 1000), /// connection timeout.
Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000), /// send timeout.
Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000) /// receive timeout.
Poco::Timespan(clientConfiguration.requestTimeoutMs * 1000), /// send timeout.
Poco::Timespan(clientConfiguration.requestTimeoutMs * 1000) /// receive timeout.
))
, remote_host_filter(clientConfiguration.remote_host_filter)
, s3_max_redirects(clientConfiguration.s3_max_redirects)

View File

@ -1,13 +1,17 @@
#pragma once
#include <Common/config.h>
#if USE_AWS_S3
#include <Common/RemoteHostFilter.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/S3/SessionAwareIOStream.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/http/HttpClient.h>
#include <aws/core/http/HttpRequest.h>
#include <aws/core/http/standard/StandardHttpResponse.h>
#include <aws/core/client/ClientConfiguration.h> // Y_IGNORE
#include <aws/core/http/HttpClient.h> // Y_IGNORE
#include <aws/core/http/HttpRequest.h> // Y_IGNORE
#include <aws/core/http/standard/StandardHttpResponse.h> // Y_IGNORE
namespace Aws::Http::Standard
{
@ -94,3 +98,5 @@ private:
};
}
#endif

View File

@ -5,8 +5,8 @@
#if USE_AWS_S3
#include <common/types.h>
#include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/Aws.h> // Y_IGNORE
#include <aws/core/client/ClientConfiguration.h> // Y_IGNORE
#include <IO/S3/PocoHTTPClient.h>
#include <Poco/URI.h>

View File

@ -79,7 +79,7 @@ WriteBufferFromFile::~WriteBufferFromFile()
return;
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();

View File

@ -98,7 +98,7 @@ WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
}
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();
}

View File

@ -43,7 +43,7 @@ WriteBufferFromOStream::WriteBufferFromOStream(
WriteBufferFromOStream::~WriteBufferFromOStream()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();
}

View File

@ -73,7 +73,7 @@ WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_
WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();
}

View File

@ -87,7 +87,7 @@ void WriteBufferFromS3::allocateBuffer()
void WriteBufferFromS3::finalize()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}

View File

@ -95,7 +95,7 @@ public:
~WriteBufferFromVector() override
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalize();
}
};

View File

@ -138,7 +138,7 @@ void WriteBufferValidUTF8::finish()
WriteBufferValidUTF8::~WriteBufferValidUTF8()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
}

View File

@ -49,7 +49,7 @@ ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer(
ZlibDeflatingWriteBuffer::~ZlibDeflatingWriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();

View File

@ -31,7 +31,7 @@ ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();

View File

@ -834,7 +834,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, uncompressed_bytes);
LOG_TRACE(log,
LOG_DEBUG(log,
"Written part in {} sec., {} rows, {} uncompressed, {} compressed,"
" {} uncompressed bytes per row, {} compressed bytes per row, compression rate: {}"
" ({} rows/sec., {}/sec. uncompressed, {}/sec. compressed)",
@ -947,7 +947,7 @@ void Aggregator::writeToTemporaryFileImpl(
/// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
data_variants.aggregator = nullptr;
LOG_TRACE(log, "Max size of temporary block: {} rows, {}.", max_temporary_block_size_rows, ReadableSize(max_temporary_block_size_bytes));
LOG_DEBUG(log, "Max size of temporary block: {} rows, {}.", max_temporary_block_size_rows, ReadableSize(max_temporary_block_size_bytes));
}
@ -1481,7 +1481,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
}
double elapsed_seconds = watch.elapsedSeconds();
LOG_TRACE(log,
LOG_DEBUG(log,
"Converted aggregated data to blocks. {} rows, {} in {} sec. ({} rows/sec., {}/sec.)",
rows, ReadableSize(bytes),
elapsed_seconds, rows / elapsed_seconds,
@ -2109,7 +2109,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
size_t rows = block.rows();
size_t bytes = block.bytes();
double elapsed_seconds = watch.elapsedSeconds();
LOG_TRACE(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({} rows/sec., {}/sec.)",
LOG_DEBUG(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({} rows/sec., {}/sec.)",
rows, ReadableSize(bytes),
elapsed_seconds, rows / elapsed_seconds,
ReadableSize(bytes / elapsed_seconds));

View File

@ -13,28 +13,7 @@ namespace DB
void ApplyWithSubqueryVisitor::visit(ASTPtr & ast, const Data & data)
{
if (auto * node_select = ast->as<ASTSelectQuery>())
{
std::optional<Data> new_data;
if (auto with = node_select->with())
{
for (auto & child : with->children)
{
visit(child, new_data ? *new_data : data);
if (auto * ast_with_elem = child->as<ASTWithElement>())
{
if (!new_data)
new_data = data;
new_data->subqueries[ast_with_elem->name] = ast_with_elem->subquery;
}
}
}
for (auto & child : node_select->children)
{
if (child != node_select->with())
visit(child, new_data ? *new_data : data);
}
}
visit(*node_select, data);
else
{
for (auto & child : ast->children)
@ -46,6 +25,36 @@ void ApplyWithSubqueryVisitor::visit(ASTPtr & ast, const Data & data)
}
}
void ApplyWithSubqueryVisitor::visit(ASTSelectQuery & ast, const Data & data)
{
std::optional<Data> new_data;
if (auto with = ast.with())
{
for (auto & child : with->children)
{
visit(child, new_data ? *new_data : data);
if (auto * ast_with_elem = child->as<ASTWithElement>())
{
if (!new_data)
new_data = data;
new_data->subqueries[ast_with_elem->name] = ast_with_elem->subquery;
}
}
}
for (auto & child : ast.children)
{
if (child != ast.with())
visit(child, new_data ? *new_data : data);
}
}
void ApplyWithSubqueryVisitor::visit(ASTSelectWithUnionQuery & ast, const Data & data)
{
for (auto & child : ast.children)
visit(child, data);
}
void ApplyWithSubqueryVisitor::visit(ASTTableExpression & table, const Data & data)
{
if (table.database_and_table_name)

View File

@ -7,6 +7,8 @@
namespace DB
{
class ASTFunction;
class ASTSelectQuery;
class ASTSelectWithUnionQuery;
struct ASTTableExpression;
class ApplyWithSubqueryVisitor
@ -18,9 +20,13 @@ public:
};
static void visit(ASTPtr & ast) { visit(ast, {}); }
static void visit(ASTSelectQuery & select) { visit(select, {}); }
static void visit(ASTSelectWithUnionQuery & select) { visit(select, {}); }
private:
static void visit(ASTPtr & ast, const Data & data);
static void visit(ASTSelectQuery & ast, const Data & data);
static void visit(ASTSelectWithUnionQuery & ast, const Data & data);
static void visit(ASTTableExpression & table, const Data & data);
static void visit(ASTFunction & func, const Data & data);
};

View File

@ -16,26 +16,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace
{
void addAndTerm(ASTPtr & ast, const ASTPtr & term)
{
if (!ast)
ast = term;
else
ast = makeASTFunction("and", ast, term);
}
/// If this is an inner join and the expression related to less than 2 tables, then move it to WHERE
bool canMoveToWhere(std::pair<size_t, size_t> table_numbers, ASTTableJoin::Kind kind)
{
return kind == ASTTableJoin::Kind::Inner &&
(table_numbers.first == table_numbers.second || table_numbers.first == 0 || table_numbers.second == 0);
}
}
void CollectJoinOnKeysMatcher::Data::addJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast,
const std::pair<size_t, size_t> & table_no)
{
@ -49,8 +29,7 @@ void CollectJoinOnKeysMatcher::Data::addJoinKeys(const ASTPtr & left_ast, const
else
throw Exception("Cannot detect left and right JOIN keys. JOIN ON section is ambiguous.",
ErrorCodes::AMBIGUOUS_COLUMN_NAME);
if (table_no.first != table_no.second && table_no.first > 0 && table_no.second > 0)
has_some = true;
has_some = true;
}
void CollectJoinOnKeysMatcher::Data::addAsofJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast,
@ -99,45 +78,22 @@ void CollectJoinOnKeysMatcher::visit(const ASTFunction & func, const ASTPtr & as
{
ASTPtr left = func.arguments->children.at(0);
ASTPtr right = func.arguments->children.at(1);
auto table_numbers = getTableNumbers(left, right, data);
if (canMoveToWhere(table_numbers, data.kind))
{
addAndTerm(data.new_where_conditions, ast);
}
else
{
if (data.kind == ASTTableJoin::Kind::Inner)
{
addAndTerm(data.new_on_expression, ast);
}
data.addJoinKeys(left, right, table_numbers);
}
auto table_numbers = getTableNumbers(ast, left, right, data);
data.addJoinKeys(left, right, table_numbers);
}
else if (inequality != ASOF::Inequality::None && !data.is_asof)
else if (inequality != ASOF::Inequality::None)
{
ASTPtr left = func.arguments->children.at(0);
ASTPtr right = func.arguments->children.at(1);
auto table_numbers = getTableNumbers(left, right, data);
if (canMoveToWhere(table_numbers, data.kind))
{
addAndTerm(data.new_where_conditions, ast);
}
else
{
if (!data.is_asof)
throw Exception("JOIN ON inequalities are not supported. Unexpected '" + queryToString(ast) + "'",
ErrorCodes::NOT_IMPLEMENTED);
}
}
else if (inequality != ASOF::Inequality::None && data.is_asof)
{
ErrorCodes::NOT_IMPLEMENTED);
if (data.asof_left_key || data.asof_right_key)
throw Exception("ASOF JOIN expects exactly one inequality in ON section. Unexpected '" + queryToString(ast) + "'",
ErrorCodes::INVALID_JOIN_ON_EXPRESSION);
ErrorCodes::INVALID_JOIN_ON_EXPRESSION);
ASTPtr left = func.arguments->children.at(0);
ASTPtr right = func.arguments->children.at(1);
auto table_numbers = getTableNumbers(left, right, data);
auto table_numbers = getTableNumbers(ast, left, right, data);
data.addAsofJoinKeys(left, right, table_numbers, inequality);
}
@ -162,8 +118,7 @@ void CollectJoinOnKeysMatcher::getIdentifiers(const ASTPtr & ast, std::vector<co
getIdentifiers(child, out);
}
std::pair<size_t, size_t> CollectJoinOnKeysMatcher::getTableNumbers(const ASTPtr & left_ast, const ASTPtr & right_ast,
std::pair<size_t, size_t> CollectJoinOnKeysMatcher::getTableNumbers(const ASTPtr & expr, const ASTPtr & left_ast, const ASTPtr & right_ast,
Data & data)
{
std::vector<const ASTIdentifier *> left_identifiers;
@ -172,13 +127,23 @@ std::pair<size_t, size_t> CollectJoinOnKeysMatcher::getTableNumbers(const ASTPtr
getIdentifiers(left_ast, left_identifiers);
getIdentifiers(right_ast, right_identifiers);
size_t left_idents_table = 0;
size_t right_idents_table = 0;
if (left_identifiers.empty() || right_identifiers.empty())
{
throw Exception("Not equi-join ON expression: " + queryToString(expr) + ". No columns in one of equality side.",
ErrorCodes::INVALID_JOIN_ON_EXPRESSION);
}
if (!left_identifiers.empty())
left_idents_table = getTableForIdentifiers(left_identifiers, data);
if (!right_identifiers.empty())
right_idents_table = getTableForIdentifiers(right_identifiers, data);
size_t left_idents_table = getTableForIdentifiers(left_identifiers, data);
size_t right_idents_table = getTableForIdentifiers(right_identifiers, data);
if (left_idents_table && left_idents_table == right_idents_table)
{
auto left_name = queryToString(*left_identifiers[0]);
auto right_name = queryToString(*right_identifiers[0]);
throw Exception("In expression " + queryToString(expr) + " columns " + left_name + " and " + right_name
+ " are from the same table but from different arguments of equal function", ErrorCodes::INVALID_JOIN_ON_EXPRESSION);
}
return std::make_pair(left_idents_table, right_idents_table);
}

View File

@ -5,7 +5,6 @@
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/Aliases.h>
#include <Parsers/ASTTablesInSelectQuery.h>
namespace DB
@ -31,11 +30,8 @@ public:
const TableWithColumnNamesAndTypes & right_table;
const Aliases & aliases;
const bool is_asof{false};
ASTTableJoin::Kind kind;
ASTPtr asof_left_key{};
ASTPtr asof_right_key{};
ASTPtr new_on_expression{};
ASTPtr new_where_conditions{};
bool has_some{false};
void addJoinKeys(const ASTPtr & left_ast, const ASTPtr & right_ast, const std::pair<size_t, size_t> & table_no);
@ -61,7 +57,7 @@ private:
static void visit(const ASTFunction & func, const ASTPtr & ast, Data & data);
static void getIdentifiers(const ASTPtr & ast, std::vector<const ASTIdentifier *> & out);
static std::pair<size_t, size_t> getTableNumbers(const ASTPtr & left_ast, const ASTPtr & right_ast, Data & data);
static std::pair<size_t, size_t> getTableNumbers(const ASTPtr & expr, const ASTPtr & left_ast, const ASTPtr & right_ast, Data & data);
static const ASTIdentifier * unrollAliases(const ASTIdentifier * identifier, const Aliases & aliases);
static size_t getTableForIdentifiers(std::vector<const ASTIdentifier *> & identifiers, const Data & data);
};

View File

@ -372,7 +372,20 @@ void DDLWorker::scheduleTasks(bool reinitialized)
}
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
size_t size_before_filtering = queue_nodes.size();
filterAndSortQueueNodes(queue_nodes);
/// The following message is too verbose, but it can be useful too debug mysterious test failures in CI
LOG_TRACE(log, "scheduleTasks: initialized={}, size_before_filtering={}, queue_size={}, "
"entries={}..{}, "
"first_failed_task_name={}, current_tasks_size={},"
"last_current_task={},"
"last_skipped_entry_name={}",
initialized, size_before_filtering, queue_nodes.size(),
queue_nodes.empty() ? "none" : queue_nodes.front(), queue_nodes.empty() ? "none" : queue_nodes.back(),
first_failed_task_name ? *first_failed_task_name : "none", current_tasks.size(),
current_tasks.empty() ? "none" : current_tasks.back()->entry_name,
last_skipped_entry_name ? *last_skipped_entry_name : "none");
if (max_tasks_in_queue < queue_nodes.size())
cleanup_event->set();

View File

@ -59,6 +59,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/ApplyWithSubqueryVisitor.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <common/logger_useful.h>
@ -890,6 +891,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (create.select && create.isView())
{
// Expand CTE before filling default database
ApplyWithSubqueryVisitor().visit(*create.select);
AddDefaultDatabaseVisitor visitor(current_database);
visitor.visit(*create.select);
}

View File

@ -323,7 +323,7 @@ void ThreadStatus::finalizeQueryProfiler()
void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
{
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery)
{

View File

@ -403,13 +403,13 @@ void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_defaul
/// Find the columns that are obtained by JOIN.
void collectJoinedColumns(TableJoin & analyzed_join, const ASTSelectQuery & select_query,
const TablesWithColumns & tables, const Aliases & aliases, ASTPtr & new_where_conditions)
const TablesWithColumns & tables, const Aliases & aliases)
{
const ASTTablesInSelectQueryElement * node = select_query.join();
if (!node || tables.size() < 2)
return;
auto & table_join = node->table_join->as<ASTTableJoin &>();
const auto & table_join = node->table_join->as<ASTTableJoin &>();
if (table_join.using_expression_list)
{
@ -428,33 +428,16 @@ void collectJoinedColumns(TableJoin & analyzed_join, const ASTSelectQuery & sele
{
bool is_asof = (table_join.strictness == ASTTableJoin::Strictness::Asof);
CollectJoinOnKeysVisitor::Data data{analyzed_join, tables[0], tables[1], aliases, is_asof, table_join.kind};
CollectJoinOnKeysVisitor::Data data{analyzed_join, tables[0], tables[1], aliases, is_asof};
CollectJoinOnKeysVisitor(data).visit(table_join.on_expression);
if (!data.has_some)
throw Exception("Cannot get JOIN keys from JOIN ON section: " + queryToString(table_join.on_expression),
ErrorCodes::INVALID_JOIN_ON_EXPRESSION);
if (is_asof)
{
data.asofToJoinKeys();
}
else if (data.new_on_expression)
{
table_join.on_expression = data.new_on_expression;
new_where_conditions = data.new_where_conditions;
}
}
}
/// Move joined key related to only one table to WHERE clause
void moveJoinedKeyToWhere(ASTSelectQuery * select_query, ASTPtr & new_where_conditions)
{
if (select_query->where())
select_query->setExpression(ASTSelectQuery::Expression::WHERE,
makeASTFunction("and", new_where_conditions, select_query->where()));
else
select_query->setExpression(ASTSelectQuery::Expression::WHERE, new_where_conditions->clone());
}
std::vector<const ASTFunction *> getAggregates(ASTPtr & query, const ASTSelectQuery & select_query)
{
@ -840,11 +823,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
setJoinStrictness(*select_query, settings.join_default_strictness, settings.any_join_distinct_right_table_keys,
result.analyzed_join->table_join);
ASTPtr new_where_condition = nullptr;
collectJoinedColumns(*result.analyzed_join, *select_query, tables_with_columns, result.aliases, new_where_condition);
if (new_where_condition)
moveJoinedKeyToWhere(select_query, new_where_condition);
collectJoinedColumns(*result.analyzed_join, *select_query, tables_with_columns, result.aliases);
/// rewrite filters for select query, must go after getArrayJoinedColumns
if (settings.optimize_respect_aliases && result.metadata_snapshot)

View File

@ -245,7 +245,7 @@ void ASTAlterCommand::formatImpl(
else if (type == ASTAlterCommand::FETCH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FETCH "
<< "PARTITION " << (settings.hilite ? hilite_none : "");
<< (part ? "PART " : "PARTITION ") << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< " FROM " << (settings.hilite ? hilite_none : "") << DB::quote << from;

View File

@ -61,6 +61,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_drop_detached_partition("DROP DETACHED PARTITION");
ParserKeyword s_drop_detached_part("DROP DETACHED PART");
ParserKeyword s_fetch_partition("FETCH PARTITION");
ParserKeyword s_fetch_part("FETCH PART");
ParserKeyword s_replace_partition("REPLACE PARTITION");
ParserKeyword s_freeze("FREEZE");
ParserKeyword s_unfreeze("UNFREEZE");
@ -428,6 +429,21 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->from = ast_from->as<ASTLiteral &>().value.get<const String &>();
command->type = ASTAlterCommand::FETCH_PARTITION;
}
else if (s_fetch_part.ignore(pos, expected))
{
if (!parser_string_literal.parse(pos, command->partition, expected))
return false;
if (!s_from.ignore(pos, expected))
return false;
ASTPtr ast_from;
if (!parser_string_literal.parse(pos, ast_from, expected))
return false;
command->from = ast_from->as<ASTLiteral &>().value.get<const String &>();
command->part = true;
command->type = ASTAlterCommand::FETCH_PARTITION;
}
else if (s_freeze.ignore(pos, expected))
{
if (s_partition.ignore(pos, expected))

View File

@ -190,7 +190,7 @@ Chunk IRowInputFormat::generate()
if (num_errors && (params.allow_errors_num > 0 || params.allow_errors_ratio > 0))
{
Poco::Logger * log = &Poco::Logger::get("IRowInputFormat");
LOG_TRACE(log, "Skipped {} rows with errors while reading the input stream", num_errors);
LOG_DEBUG(log, "Skipped {} rows with errors while reading the input stream", num_errors);
}
readSuffix();

View File

@ -21,16 +21,13 @@ void MarkdownRowOutputFormat::writePrefix()
}
writeCString("\n|", out);
String left_alignment = ":-|";
String central_alignment = ":-:|";
String right_alignment = "-:|";
for (size_t i = 0; i < columns; ++i)
{
if (isInteger(types[i]))
if (types[i]->shouldAlignRightInPrettyFormats())
writeString(right_alignment, out);
else if (isString(types[i]))
writeString(left_alignment, out);
else
writeString(central_alignment, out);
writeString(left_alignment, out);
}
writeChar('\n', out);
}

View File

@ -214,8 +214,8 @@ IProcessor::Status AggregatingInOrderTransform::prepare()
{
output.push(std::move(to_push_chunk));
output.finish();
LOG_TRACE(log, "Aggregated. {} to {} rows (from {})", src_rows, res_rows,
formatReadableSizeWithBinarySuffix(src_bytes));
LOG_DEBUG(log, "Aggregated. {} to {} rows (from {})",
src_rows, res_rows, formatReadableSizeWithBinarySuffix(src_bytes));
return Status::Finished;
}
if (input.isFinished())

View File

@ -541,7 +541,7 @@ void AggregatingTransform::initGenerate()
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = variants.sizeWithoutOverflowRow();
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
LOG_DEBUG(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
src_rows, rows, ReadableSize(src_bytes),
elapsed_seconds, src_rows / elapsed_seconds,
ReadableSize(src_bytes / elapsed_seconds));
@ -599,7 +599,7 @@ void AggregatingTransform::initGenerate()
pipe = Pipe::unitePipes(std::move(pipes));
}
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
LOG_DEBUG(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads);

View File

@ -52,7 +52,7 @@ Chunk MergingAggregatedTransform::generate()
if (!generate_started)
{
generate_started = true;
LOG_TRACE(log, "Read {} blocks of partially aggregated data, total {} rows.", total_input_blocks, total_input_rows);
LOG_DEBUG(log, "Read {} blocks of partially aggregated data, total {} rows.", total_input_blocks, total_input_rows);
/// Exception safety. Make iterator valid in case any method below throws.
next_block = blocks.begin();

View File

@ -196,7 +196,7 @@ void WriteBufferFromHTTPServerResponse::finalize()
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalize();
}

View File

@ -548,7 +548,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
ReadBufferFromFile in(file_path);
const auto & distributed_header = readDistributedHeader(in, log);
LOG_TRACE(log, "Started processing `{}` ({} rows, {} bytes)", file_path,
LOG_DEBUG(log, "Started processing `{}` ({} rows, {} bytes)", file_path,
formatReadableQuantity(distributed_header.rows),
formatReadableSizeWithBinarySuffix(distributed_header.bytes));
@ -645,7 +645,7 @@ struct StorageDistributedDirectoryMonitor::Batch
Stopwatch watch;
LOG_TRACE(parent.log, "Sending a batch of {} files ({} rows, {} bytes).", file_indices.size(),
LOG_DEBUG(parent.log, "Sending a batch of {} files ({} rows, {} bytes).", file_indices.size(),
formatReadableQuantity(total_rows),
formatReadableSizeWithBinarySuffix(total_bytes));
@ -891,7 +891,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
if (!total_rows || !header)
{
LOG_TRACE(log, "Processing batch {} with old format (no header/rows)", in.getFileName());
LOG_DEBUG(log, "Processing batch {} with old format (no header/rows)", in.getFileName());
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);

View File

@ -28,6 +28,7 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/formatReadable.h>
#include <Common/config_version.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
@ -585,6 +586,8 @@ void StorageKafka::threadFunc(size_t idx)
bool StorageKafka::streamToViews()
{
Stopwatch watch;
auto table_id = getStorageID();
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (!table)
@ -637,7 +640,11 @@ bool StorageKafka::streamToViews()
// We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff.
// It will be cancelled on underlying layer (kafka buffer)
std::atomic<bool> stub = {false};
copyData(*in, *block_io.out, &stub);
size_t rows = 0;
copyData(*in, *block_io.out, [&rows](const Block & block)
{
rows += block.rows();
}, &stub);
bool some_stream_is_stalled = false;
for (auto & stream : streams)
@ -646,6 +653,10 @@ bool StorageKafka::streamToViews()
stream->as<KafkaBlockInputStream>()->commit();
}
UInt64 milliseconds = watch.elapsedMilliseconds();
LOG_DEBUG(log, "Pushing {} rows to {} took {} ms.",
formatReadableQuantity(rows), table_id.getNameForLogs(), milliseconds);
return some_stream_is_stalled;
}

View File

@ -2563,7 +2563,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({}). Parts cleaning are processing significantly slower than inserts",
"Too many parts ({}). Merges are processing significantly slower than inserts",
parts_count_in_partition);
}
@ -2909,7 +2909,12 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED);
}
void MergeTreeData::fetchPartition(const ASTPtr & /*partition*/, const StorageMetadataPtr & /*metadata_snapshot*/, const String & /*from*/, ContextPtr /*query_context*/)
void MergeTreeData::fetchPartition(
const ASTPtr & /*partition*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const String & /*from*/,
bool /*fetch_part*/,
ContextPtr /*query_context*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "FETCH PARTITION is not supported by storage {}", getName());
}
@ -2972,7 +2977,7 @@ Pipe MergeTreeData::alterPartition(
break;
case PartitionCommand::FETCH_PARTITION:
fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, query_context);
fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, command.part, query_context);
break;
case PartitionCommand::FREEZE_PARTITION:

View File

@ -970,7 +970,12 @@ protected:
virtual void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) = 0;
/// Makes sense only for replicated tables
virtual void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, ContextPtr query_context);
virtual void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from,
bool fetch_part,
ContextPtr query_context);
void writePartLog(
PartLogElement::Type type,

View File

@ -1054,7 +1054,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
false);
/// Let's estimate total number of rows for progress bar.
LOG_TRACE(log, "Reading approx. {} rows with {} streams", total_rows, num_streams);
LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows, num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
@ -1576,7 +1576,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
settings.preferred_block_size_bytes,
false);
LOG_TRACE(log, "Reading approx. {} rows with {} streams", total_rows_in_lonely_parts, num_streams_for_lonely_parts);
LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows_in_lonely_parts, num_streams_for_lonely_parts);
for (size_t i = 0; i < num_streams_for_lonely_parts; ++i)
{

View File

@ -182,7 +182,7 @@ bool MergeTreePartsMover::selectPartsForMove(
if (!parts_to_move.empty())
{
LOG_TRACE(log, "Selected {} parts to move according to storage policy rules and {} parts according to TTL rules, {} total", parts_to_move_by_policy_rules, parts_to_move_by_ttl_rules, ReadableSize(parts_to_move_total_size_bytes));
LOG_DEBUG(log, "Selected {} parts to move according to storage policy rules and {} parts according to TTL rules, {} total", parts_to_move_by_policy_rules, parts_to_move_by_ttl_rules, ReadableSize(parts_to_move_total_size_bytes));
return true;
}
else

View File

@ -47,7 +47,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
if (!quiet)
LOG_TRACE(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}",
LOG_DEBUG(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));

View File

@ -47,7 +47,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
if (!quiet)
LOG_TRACE(log, "Reading {} ranges from part {}, approx. {} rows starting from {}",
LOG_DEBUG(log, "Reading {} ranges from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));

View File

@ -29,10 +29,10 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
{
/// Print column name but don't pollute logs in case of many columns.
if (columns_to_read.size() == 1)
LOG_TRACE(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part, column {}",
LOG_DEBUG(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part, column {}",
data_part->getMarksCount(), data_part->name, data_part->rows_count, columns_to_read.front());
else
LOG_TRACE(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part",
LOG_DEBUG(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part",
data_part->getMarksCount(), data_part->name, data_part->rows_count);
}

View File

@ -342,6 +342,15 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
if (!num_nodes_to_delete)
return;
auto last_outdated_block = timed_blocks.end() - 1;
LOG_TRACE(log, "Will clear {} old blocks from {} (ctime {}) to {} (ctime {})", num_nodes_to_delete,
first_outdated_block->node, first_outdated_block->ctime,
last_outdated_block->node, last_outdated_block->ctime);
zkutil::AsyncResponses<Coordination::RemoveResponse> try_remove_futures;
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
{
@ -372,9 +381,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
first_outdated_block++;
}
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
if (num_nodes_to_delete)
LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete);
LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete);
}

View File

@ -82,6 +82,7 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
res.type = FETCH_PARTITION;
res.partition = command_ast->partition;
res.from_zookeeper_path = command_ast->from;
res.part = command_ast->part;
return res;
}
else if (command_ast->type == ASTAlterCommand::FREEZE_PARTITION)
@ -140,7 +141,10 @@ std::string PartitionCommand::typeToString() const
else
return "DROP DETACHED PARTITION";
case PartitionCommand::Type::FETCH_PARTITION:
return "FETCH PARTITION";
if (part)
return "FETCH PART";
else
return "FETCH PARTITION";
case PartitionCommand::Type::FREEZE_ALL_PARTITIONS:
return "FREEZE ALL";
case PartitionCommand::Type::FREEZE_PARTITION:

View File

@ -40,6 +40,9 @@ namespace ProfileEvents
extern const Event StorageBufferPassedTimeMaxThreshold;
extern const Event StorageBufferPassedRowsMaxThreshold;
extern const Event StorageBufferPassedBytesMaxThreshold;
extern const Event StorageBufferPassedTimeFlushThreshold;
extern const Event StorageBufferPassedRowsFlushThreshold;
extern const Event StorageBufferPassedBytesFlushThreshold;
extern const Event StorageBufferLayerLockReadersWaitMilliseconds;
extern const Event StorageBufferLayerLockWritersWaitMilliseconds;
}
@ -103,6 +106,7 @@ StorageBuffer::StorageBuffer(
size_t num_shards_,
const Thresholds & min_thresholds_,
const Thresholds & max_thresholds_,
const Thresholds & flush_thresholds_,
const StorageID & destination_id_,
bool allow_materialized_)
: IStorage(table_id_)
@ -110,6 +114,7 @@ StorageBuffer::StorageBuffer(
, num_shards(num_shards_), buffers(num_shards_)
, min_thresholds(min_thresholds_)
, max_thresholds(max_thresholds_)
, flush_thresholds(flush_thresholds_)
, destination_id(destination_id_)
, allow_materialized(allow_materialized_)
, log(&Poco::Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
@ -542,7 +547,7 @@ public:
{
if (storage.destination_id)
{
LOG_TRACE(storage.log, "Writing block with {} rows, {} bytes directly.", rows, bytes);
LOG_DEBUG(storage.log, "Writing block with {} rows, {} bytes directly.", rows, bytes);
storage.writeBlockToDestination(block, destination);
}
return;
@ -602,7 +607,7 @@ private:
{
buffer.data = sorted_block.cloneEmpty();
}
else if (storage.checkThresholds(buffer, current_time, sorted_block.rows(), sorted_block.bytes()))
else if (storage.checkThresholds(buffer, /* direct= */true, current_time, sorted_block.rows(), sorted_block.bytes()))
{
/** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
* This also protects against unlimited consumption of RAM, since if it is impossible to write to the table,
@ -713,7 +718,7 @@ bool StorageBuffer::supportsPrewhere() const
return false;
}
bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
bool StorageBuffer::checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows, size_t additional_bytes) const
{
time_t time_passed = 0;
if (buffer.first_write_time)
@ -722,11 +727,11 @@ bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time,
size_t rows = buffer.data.rows() + additional_rows;
size_t bytes = buffer.data.bytes() + additional_bytes;
return checkThresholdsImpl(rows, bytes, time_passed);
return checkThresholdsImpl(direct, rows, bytes, time_passed);
}
bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const
bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const
{
if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
{
@ -752,6 +757,27 @@ bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_p
return true;
}
if (!direct)
{
if (flush_thresholds.time && time_passed > flush_thresholds.time)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeFlushThreshold);
return true;
}
if (flush_thresholds.rows && rows > flush_thresholds.rows)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsFlushThreshold);
return true;
}
if (flush_thresholds.bytes && bytes > flush_thresholds.bytes)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesFlushThreshold);
return true;
}
}
return false;
}
@ -785,7 +811,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
if (check_thresholds)
{
if (!checkThresholdsImpl(rows, bytes, time_passed))
if (!checkThresholdsImpl(/* direct= */false, rows, bytes, time_passed))
return;
}
else
@ -804,7 +830,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
if (!destination_id)
{
LOG_TRACE(log, "Flushing buffer with {} rows (discarded), {} bytes, age {} seconds {}.", rows, bytes, time_passed, (check_thresholds ? "(bg)" : "(direct)"));
LOG_DEBUG(log, "Flushing buffer with {} rows (discarded), {} bytes, age {} seconds {}.", rows, bytes, time_passed, (check_thresholds ? "(bg)" : "(direct)"));
return;
}
@ -841,7 +867,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
}
UInt64 milliseconds = watch.elapsedMilliseconds();
LOG_TRACE(log, "Flushing buffer with {} rows, {} bytes, age {} seconds, took {} ms {}.", rows, bytes, time_passed, milliseconds, (check_thresholds ? "(bg)" : "(direct)"));
LOG_DEBUG(log, "Flushing buffer with {} rows, {} bytes, age {} seconds, took {} ms {}.", rows, bytes, time_passed, milliseconds, (check_thresholds ? "(bg)" : "(direct)"));
}
@ -1040,16 +1066,17 @@ void registerStorageBuffer(StorageFactory & factory)
*
* db, table - in which table to put data from buffer.
* num_buckets - level of parallelism.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer,
* flush_time, flush_rows, flush_bytes - conditions for flushing.
*/
factory.registerStorage("Buffer", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 9)
throw Exception("Storage Buffer requires 9 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
if (engine_args.size() < 9 || engine_args.size() > 12)
throw Exception("Storage Buffer requires from 9 to 12 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes[, flush_time, flush_rows, flush_bytes].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
// Table and database name arguments accept expressions, evaluate them.
@ -1058,7 +1085,7 @@ void registerStorageBuffer(StorageFactory & factory)
// After we evaluated all expressions, check that all arguments are
// literals.
for (size_t i = 0; i < 9; i++)
for (size_t i = 0; i < engine_args.size(); i++)
{
if (!typeid_cast<ASTLiteral *>(engine_args[i].get()))
{
@ -1068,17 +1095,29 @@ void registerStorageBuffer(StorageFactory & factory)
}
}
String destination_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
String destination_table = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
size_t i = 0;
UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[2]->as<ASTLiteral &>().value);
String destination_database = engine_args[i++]->as<ASTLiteral &>().value.safeGet<String>();
String destination_table = engine_args[i++]->as<ASTLiteral &>().value.safeGet<String>();
Int64 min_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[3]->as<ASTLiteral &>().value);
Int64 max_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[4]->as<ASTLiteral &>().value);
UInt64 min_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[5]->as<ASTLiteral &>().value);
UInt64 max_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[6]->as<ASTLiteral &>().value);
UInt64 min_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[7]->as<ASTLiteral &>().value);
UInt64 max_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[8]->as<ASTLiteral &>().value);
UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
StorageBuffer::Thresholds min;
StorageBuffer::Thresholds max;
StorageBuffer::Thresholds flush;
min.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
min.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
min.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
/// If destination_id is not set, do not write data from the buffer, but simply empty the buffer.
StorageID destination_id = StorageID::createEmpty();
@ -1094,8 +1133,7 @@ void registerStorageBuffer(StorageFactory & factory)
args.constraints,
args.getContext(),
num_buckets,
StorageBuffer::Thresholds{min_time, min_rows, min_bytes},
StorageBuffer::Thresholds{max_time, max_rows, max_bytes},
min, max, flush,
destination_id,
static_cast<bool>(args.getLocalContext()->getSettingsRef().insert_allow_materialized_columns));
},

View File

@ -35,6 +35,10 @@ namespace DB
* Thresholds can be exceeded. For example, if max_rows = 1 000 000, the buffer already had 500 000 rows,
* and a part of 800 000 rows is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table.
*
* There are also separate thresholds for flush, those thresholds are checked only for non-direct flush.
* This maybe useful if you do not want to add extra latency for INSERT queries,
* so you can set max_rows=1e6 and flush_rows=500e3, then each 500e3 rows buffer will be flushed in background only.
*
* When you destroy a Buffer table, all remaining data is flushed to the subordinate table.
* The data in the buffer is not replicated, not logged to disk, not indexed. With a rough restart of the server, the data is lost.
*/
@ -45,12 +49,11 @@ friend class BufferSource;
friend class BufferBlockOutputStream;
public:
/// Thresholds.
struct Thresholds
{
time_t time; /// The number of seconds from the insertion of the first row into the block.
size_t rows; /// The number of rows in the block.
size_t bytes; /// The number of (uncompressed) bytes in the block.
time_t time = 0; /// The number of seconds from the insertion of the first row into the block.
size_t rows = 0; /// The number of rows in the block.
size_t bytes = 0; /// The number of (uncompressed) bytes in the block.
};
std::string getName() const override { return "Buffer"; }
@ -135,6 +138,7 @@ private:
const Thresholds min_thresholds;
const Thresholds max_thresholds;
const Thresholds flush_thresholds;
StorageID destination_id;
bool allow_materialized;
@ -153,8 +157,8 @@ private:
/// are exceeded. If reset_block_structure is set - clears inner block
/// structure inside buffer (useful in OPTIMIZE and ALTER).
void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false, bool reset_block_structure = false);
bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const;
bool checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const;
/// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`.
void writeBlockToDestination(const Block & block, StoragePtr table);
@ -177,6 +181,7 @@ protected:
size_t num_shards_,
const Thresholds & min_thresholds_,
const Thresholds & max_thresholds_,
const Thresholds & flush_thresholds_,
const StorageID & destination_id,
bool allow_materialized_);
};

View File

@ -469,7 +469,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
/// Always calculate optimized cluster here, to avoid conditions during read()
/// (Anyway it will be calculated in the read())
if (settings.optimize_skip_unused_shards)
if (getClusterQueriedNodes(settings, cluster) > 1 && settings.optimize_skip_unused_shards)
{
ClusterPtr optimized_cluster = getOptimizedCluster(local_context, metadata_snapshot, query_info.query);
if (optimized_cluster)

View File

@ -130,6 +130,7 @@ namespace ErrorCodes
extern const int UNKNOWN_POLICY;
extern const int NO_SUCH_DATA_PART;
extern const int INTERSERVER_SCHEME_DOESNT_MATCH;
extern const int DUPLICATE_DATA_PART;
}
namespace ActionLocks
@ -5356,11 +5357,11 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
}
}
void StorageReplicatedMergeTree::fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from_,
bool fetch_part,
ContextPtr query_context)
{
Macros::MacroExpansionInfo info;
@ -5373,40 +5374,54 @@ void StorageReplicatedMergeTree::fetchPartition(
if (from.empty())
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
String partition_id = getPartitionIDFromQuery(partition, query_context);
zkutil::ZooKeeperPtr zookeeper;
if (auxiliary_zookeeper_name != default_zookeeper_name)
{
zookeeper = getContext()->getAuxiliaryZooKeeper(auxiliary_zookeeper_name);
LOG_INFO(log, "Will fetch partition {} from shard {} (auxiliary zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name);
}
else
{
zookeeper = getZooKeeper();
LOG_INFO(log, "Will fetch partition {} from shard {}", partition_id, from_);
}
if (from.back() == '/')
from.resize(from.size() - 1);
if (fetch_part)
{
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
auto part_path = findReplicaHavingPart(part_name, from, zookeeper);
if (part_path.empty())
throw Exception(ErrorCodes::NO_REPLICA_HAS_PART, "Part {} does not exist on any replica", part_name);
/** Let's check that there is no such part in the `detached` directory (where we will write the downloaded parts).
* Unreliable (there is a race condition) - such a part may appear a little later.
*/
if (checkIfDetachedPartExists(part_name))
throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Detached part " + part_name + " already exists.");
LOG_INFO(log, "Will fetch part {} from shard {} (zookeeper '{}')", part_name, from_, auxiliary_zookeeper_name);
try
{
/// part name , metadata, part_path , true, 0, zookeeper
if (!fetchPart(part_name, metadata_snapshot, part_path, true, 0, zookeeper))
throw Exception(ErrorCodes::UNFINISHED, "Failed to fetch part {} from {}", part_name, from_);
}
catch (const DB::Exception & e)
{
if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER && e.code() != ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
&& e.code() != ErrorCodes::CANNOT_READ_ALL_DATA)
throw;
LOG_INFO(log, e.displayText());
}
return;
}
String partition_id = getPartitionIDFromQuery(partition, query_context);
LOG_INFO(log, "Will fetch partition {} from shard {} (zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name);
/** Let's check that there is no such partition in the `detached` directory (where we will write the downloaded parts).
* Unreliable (there is a race condition) - such a partition may appear a little later.
*/
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
{
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version)
&& part_info.partition_id == partition_id)
throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
}
}
if (checkIfDetachedPartitionExists(partition_id))
throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
zkutil::Strings replicas;
zkutil::Strings active_replicas;
@ -6913,4 +6928,46 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
return best_replica;
}
String StorageReplicatedMergeTree::findReplicaHavingPart(
const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_)
{
Strings replicas = zookeeper_->getChildren(zookeeper_path_ + "/replicas");
/// Select replicas in uniformly random order.
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
for (const String & replica : replicas)
{
if (zookeeper_->exists(zookeeper_path_ + "/replicas/" + replica + "/parts/" + part_name)
&& zookeeper_->exists(zookeeper_path_ + "/replicas/" + replica + "/is_active"))
return zookeeper_path_ + "/replicas/" + replica;
}
return {};
}
bool StorageReplicatedMergeTree::checkIfDetachedPartExists(const String & part_name)
{
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
if (dir_it.name() == part_name)
return true;
return false;
}
bool StorageReplicatedMergeTree::checkIfDetachedPartitionExists(const String & partition_name)
{
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
{
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version) && part_info.partition_id == partition_name)
return true;
}
}
return false;
}
}

View File

@ -522,8 +522,11 @@ private:
/** Returns an empty string if no one has a part.
*/
String findReplicaHavingPart(const String & part_name, bool active);
static String findReplicaHavingPart(const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_);
bool checkReplicaHavePart(const String & replica, const String & part_name);
bool checkIfDetachedPartExists(const String & part_name);
bool checkIfDetachedPartitionExists(const String & partition_name);
/** Find replica having specified part or any part that covers it.
* If active = true, consider only active replicas.
@ -626,7 +629,12 @@ private:
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;
void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, ContextPtr query_context) override;
void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from,
bool fetch_part,
ContextPtr query_context) override;
/// Check granularity of already existing replicated table in zookeeper if it exists
/// return true if it's fixed

View File

@ -26,6 +26,8 @@
<election_timeout_upper_bound_ms>4000</election_timeout_upper_bound_ms>
<raft_logs_level>trace</raft_logs_level>
<force_sync>false</force_sync>
<!-- we want all logs for complex problems investigation -->
<reserved_log_items>1000000000000000</reserved_log_items>
</coordination_settings>
<raft_configuration>

View File

@ -8,6 +8,8 @@
<session_timeout_ms>30000</session_timeout_ms>
<force_sync>false</force_sync>
<startup_timeout>60000</startup_timeout>
<!-- we want all logs for complex problems investigation -->
<reserved_log_items>1000000000000000</reserved_log_items>
</coordination_settings>
<raft_configuration>

View File

@ -156,6 +156,7 @@
"extractURLParameterNames"
"extractURLParameters"
"FETCH PARTITION"
"FETCH PART"
"FINAL"
"FIRST"
"firstSignificantSubdomain"

View File

@ -435,7 +435,7 @@ class ClickhouseIntegrationTestsRunner:
time.sleep(5)
logging.info("Finally all tests done, going to compress test dir")
test_logs = os.path.join(str(self.path()), "./test_dir.tar")
test_logs = os.path.join(str(self.path()), "./test_dir.tar.gz")
self._compress_logs("{}/tests/integration".format(repo_path), test_logs)
logging.info("Compression finished")
@ -500,7 +500,7 @@ class ClickhouseIntegrationTestsRunner:
break
logging.info("Finally all tests done, going to compress test dir")
test_logs = os.path.join(str(self.path()), "./test_dir.tar")
test_logs = os.path.join(str(self.path()), "./test_dir.tar.gz")
self._compress_logs("{}/tests/integration".format(repo_path), test_logs)
logging.info("Compression finished")

View File

@ -1,5 +1,3 @@
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
@ -18,23 +16,33 @@ def start_cluster():
cluster.shutdown()
def test_fetch_part_from_allowed_zookeeper(start_cluster):
@pytest.mark.parametrize(
('part', 'date', 'part_name'),
[
('PARTITION', '2020-08-27', '2020-08-27'),
('PART', '2020-08-28', '20200828_0_0_0'),
]
)
def test_fetch_part_from_allowed_zookeeper(start_cluster, part, date, part_name):
node.query(
"CREATE TABLE simple (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', 'node') ORDER BY tuple() PARTITION BY date;"
"CREATE TABLE IF NOT EXISTS simple (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', 'node') ORDER BY tuple() PARTITION BY date;"
)
node.query("INSERT INTO simple VALUES ('2020-08-27', 1)")
node.query("""INSERT INTO simple VALUES ('{date}', 1)""".format(date=date))
node.query(
"CREATE TABLE simple2 (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/simple', 'node') ORDER BY tuple() PARTITION BY date;"
"CREATE TABLE IF NOT EXISTS simple2 (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/simple', 'node') ORDER BY tuple() PARTITION BY date;"
)
node.query(
"ALTER TABLE simple2 FETCH PARTITION '2020-08-27' FROM 'zookeeper2:/clickhouse/tables/0/simple';"
)
node.query("ALTER TABLE simple2 ATTACH PARTITION '2020-08-27';")
"""ALTER TABLE simple2 FETCH {part} '{part_name}' FROM 'zookeeper2:/clickhouse/tables/0/simple';""".format(
part=part, part_name=part_name))
node.query("""ALTER TABLE simple2 ATTACH {part} '{part_name}';""".format(part=part, part_name=part_name))
with pytest.raises(QueryRuntimeException):
node.query(
"ALTER TABLE simple2 FETCH PARTITION '2020-08-27' FROM 'zookeeper:/clickhouse/tables/0/simple';"
)
"""ALTER TABLE simple2 FETCH {part} '{part_name}' FROM 'zookeeper:/clickhouse/tables/0/simple';""".format(
part=part, part_name=part_name))
assert node.query("SELECT id FROM simple2").strip() == "1"
assert node.query("""SELECT id FROM simple2 where date = '{date}'""".format(date=date)).strip() == "1"

View File

@ -68,6 +68,16 @@ def create_table(cluster, table_name, additional_settings=None):
node.query(create_table_statement)
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
minio = cluster.minio_client
while timeout > 0:
if len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == expected:
return
timeout -= 1
time.sleep(1)
assert(len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == expected)
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
@ -75,8 +85,9 @@ def drop_table(cluster):
minio = cluster.minio_client
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
try:
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
wait_for_delete_s3_objects(cluster, 0)
finally:
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
@ -151,7 +162,7 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical):
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD)
def test_alter_table_columns(cluster):
@ -167,32 +178,20 @@ def test_alter_table_columns(cluster):
# To ensure parts have merged
node.query("OPTIMIZE TABLE s3_test")
# Wait for merges, mutations and old parts deletion
time.sleep(3)
assert node.query("SELECT sum(col1) FROM s3_test FORMAT Values") == "(8192)"
assert node.query("SELECT sum(col1) FROM s3_test WHERE id > 0 FORMAT Values") == "(4096)"
assert len(list(minio.list_objects(cluster.minio_bucket,
'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN)
node.query("ALTER TABLE s3_test MODIFY COLUMN col1 String", settings={"mutations_sync": 2})
# Wait for old parts deletion
time.sleep(3)
assert node.query("SELECT distinct(col1) FROM s3_test FORMAT Values") == "('1')"
# and file with mutation
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == (
FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + 1)
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + 1)
node.query("ALTER TABLE s3_test DROP COLUMN col1", settings={"mutations_sync": 2})
# Wait for old parts deletion
time.sleep(3)
# and 2 files with mutations
assert len(
list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2)
def test_attach_detach_partition(cluster):
@ -320,9 +319,7 @@ def test_move_replace_partition_to_another_table(cluster):
assert node.query("SELECT count(*) FROM s3_clone FORMAT Values") == "(8192)"
# Wait for outdated partitions deletion.
time.sleep(3)
assert len(list(
minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4)
node.query("DROP TABLE s3_clone NO DELAY")
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
@ -338,7 +335,8 @@ def test_move_replace_partition_to_another_table(cluster):
node.query("DROP TABLE s3_test NO DELAY")
# Backup data should remain in S3.
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE * 4
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD_PER_PART_WIDE * 4)
for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
minio.remove_object(cluster.minio_bucket, obj.object_name)

View File

@ -7,7 +7,6 @@ fun:tolower
# Suppress some failures in contrib so that we can enable MSan in CI.
# Ideally, we should report these upstream.
src:*/contrib/zlib-ng/*
# Hyperscan
fun:roseRunProgram

View File

@ -0,0 +1,4 @@
<test>
<query>SELECT arrayFold(x, acc -> acc + 1, range(100000), toUInt64(0))</query> <!-- count -->
<query>SELECT arrayFold(x, acc -> acc + x, range(100000), toUInt64(0))</query> <!-- sum -->
</test>

View File

@ -55,14 +55,14 @@
INSERT INTO simple_key_direct_dictionary_source_table
SELECT number, number, toString(number), toDecimal64(number, 8), toString(number)
FROM system.numbers
LIMIT 100000;
LIMIT 50000;
</fill_query>
<fill_query>
INSERT INTO complex_key_direct_dictionary_source_table
SELECT number, toString(number), number, toString(number), toDecimal64(number, 8), toString(number)
FROM system.numbers
LIMIT 100000;
LIMIT 50000;
</fill_query>
<substitutions>
@ -79,47 +79,51 @@
<substitution>
<name>elements_count</name>
<values>
<value>25000</value>
<value>50000</value>
<value>75000</value>
<value>100000</value>
</values>
</substitution>
</substitutions>
<query>
SELECT dictGet('default.simple_key_direct_dictionary', {column_name}, number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_direct_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.simple_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.simple_key_direct_dictionary', number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictHas('default.simple_key_direct_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.complex_key_direct_dictionary', {column_name}, (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictGet('default.complex_key_direct_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.complex_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictGet('default.complex_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.complex_key_direct_dictionary', (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictHas('default.complex_key_direct_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;

View File

@ -1,8 +1,4 @@
<test max_ignored_relative_change="0.3">
<preconditions>
<table_exists>please_fix_me</table_exists>
</preconditions>
<create_query>
CREATE TABLE simple_key_flat_dictionary_source_table
(
@ -50,25 +46,30 @@
<substitution>
<name>elements_count</name>
<values>
<value>2500000</value>
<value>5000000</value>
<value>7500000</value>
<value>10000000</value>
</values>
</substitution>
</substitutions>
<query>
SELECT dictGet('default.simple_key_flat_dictionary', {column_name}, number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_flat_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.simple_key_flat_dictionary', number)
SELECT * FROM simple_key_flat_dictionary
FORMAT Null;
</query>
<query>
WITH rand64() % toUInt64(75000000) as key
SELECT dictHas('default.simple_key_flat_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
LIMIT 75000000
FORMAT Null;
</query>

View File

@ -81,35 +81,37 @@
<substitution>
<name>elements_count</name>
<values>
<value>2500000</value>
<value>5000000</value>
<value>7500000</value>
<value>10000000</value>
</values>
</substitution>
</substitutions>
<query>
SELECT dictGet('default.simple_key_hashed_dictionary', {column_name}, number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_hashed_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.simple_key_hashed_dictionary', number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictHas('default.simple_key_hashed_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.complex_key_hashed_dictionary', {column_name}, (number, toString(number)))
WITH (rand64() % toUInt64({elements_count}), toString(rand64() % toUInt64({elements_count}))) as key
SELECT dictGet('default.complex_key_hashed_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.complex_key_hashed_dictionary', (number, toString(number)))
WITH (rand64() % toUInt64({elements_count}), toString(rand64() % toUInt64({elements_count}))) as key
SELECT dictHas('default.complex_key_hashed_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;

View File

@ -0,0 +1,5 @@
<test>
<query>SELECT count() FROM numbers(200000000) WHERE NOT ignore(intDiv(number, 1000000000))</query>
<query>SELECT count() FROM numbers(200000000) WHERE NOT ignore(divide(number, 1000000000))</query>
<query>SELECT count() FROM numbers(200000000) WHERE NOT ignore(toUInt32(divide(number, 1000000000)))</query>
</test>

View File

@ -109,7 +109,7 @@ SELECT
t2_00826.a,
t2_00826.b
FROM t1_00826
ALL INNER JOIN t2_00826 ON (((a = t2_00826.a) AND (a = t2_00826.a)) AND (a = t2_00826.a)) AND (b = t2_00826.b)
ALL INNER JOIN t2_00826 ON (a = t2_00826.a) AND (a = t2_00826.a) AND (a = t2_00826.a) AND (b = t2_00826.b)
WHERE (a = t2_00826.a) AND ((a = t2_00826.a) AND ((a = t2_00826.a) AND (b = t2_00826.b)))
--- cross split conjunction ---
SELECT

Some files were not shown because too many files have changed in this diff Show More