Merge branch 'master' into merging_external_source_cassandra

This commit is contained in:
Alexander Tokmakov 2020-05-20 23:44:53 +03:00
commit 31b6f5f0d2
182 changed files with 7644 additions and 1066 deletions

View File

@ -385,9 +385,6 @@ if (OS_LINUX AND NOT ENABLE_JEMALLOC)
endif ()
if (USE_OPENCL)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DUSE_OPENCL=1")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUSE_OPENCL=1")
if (OS_DARWIN)
set(OPENCL_LINKER_FLAGS "-framework OpenCL")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${OPENCL_LINKER_FLAGS}")

View File

@ -1,13 +1,19 @@
# TODO: enable by default
if(0)
option(ENABLE_OPENCL "Enable OpenCL support" ${ENABLE_LIBRARIES})
endif()
if(ENABLE_OPENCL)
# Intel OpenCl driver: sudo apt install intel-opencl-icd
# TODO It's possible to add it as submodules: https://github.com/intel/compute-runtime/releases
# @sa https://github.com/intel/compute-runtime/releases
# OpenCL applications should link wiht ICD loader
# sudo apt install opencl-headers ocl-icd-libopencl1
# sudo ln -s /usr/lib/x86_64-linux-gnu/libOpenCL.so.1.0.0 /usr/lib/libOpenCL.so
# TODO: add https://github.com/OCL-dev/ocl-icd as submodule instead
find_package(OpenCL REQUIRED)
find_package(OpenCL)
if(OpenCL_FOUND)
set(USE_OPENCL 1)
endif()

View File

@ -1,4 +1,4 @@
version: '2.2'
version: '2.3'
services:
hdfs1:
image: sequenceiq/hadoop-docker:2.7.0

View File

@ -1,4 +1,4 @@
version: '2.2'
version: '2.3'
services:
kafka_zookeeper:

View File

@ -1,4 +1,4 @@
version: '2.2'
version: '2.3'
services:
minio1:

View File

@ -1,4 +1,4 @@
version: '2.2'
version: '2.3'
services:
mongo1:
image: mongo:3.6

View File

@ -1,4 +1,4 @@
version: '2.2'
version: '2.3'
services:
mysql1:
image: mysql:5.7

View File

@ -1,4 +1,4 @@
version: '2.2'
version: '2.3'
networks:
default:
driver: bridge

View File

@ -1,4 +1,4 @@
version: '2.2'
version: '2.3'
services:
postgres1:
image: postgres

View File

@ -1,4 +1,4 @@
version: '2.2'
version: '2.3'
services:
redis1:
image: redis

View File

@ -1,25 +1,47 @@
version: '2.2'
version: '2.3'
services:
zoo1:
image: zookeeper:3.4.12
restart: always
environment:
ZOO_TICK_TIME: 500
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
ZOO_MY_ID: 1
JVMFLAGS: -Dzookeeper.forceSync=no
volumes:
- type: ${ZK_FS:-tmpfs}
source: ${ZK_DATA1:-}
target: /data
- type: ${ZK_FS:-tmpfs}
source: ${ZK_DATA_LOG1:-}
target: /datalog
zoo2:
image: zookeeper:3.4.12
restart: always
environment:
ZOO_TICK_TIME: 500
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
ZOO_MY_ID: 2
JVMFLAGS: -Dzookeeper.forceSync=no
volumes:
- type: ${ZK_FS:-tmpfs}
source: ${ZK_DATA2:-}
target: /data
- type: ${ZK_FS:-tmpfs}
source: ${ZK_DATA_LOG2:-}
target: /datalog
zoo3:
image: zookeeper:3.4.12
restart: always
environment:
ZOO_TICK_TIME: 500
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
ZOO_MY_ID: 3
JVMFLAGS: -Dzookeeper.forceSync=no
volumes:
- type: ${ZK_FS:-tmpfs}
source: ${ZK_DATA3:-}
target: /data
- type: ${ZK_FS:-tmpfs}
source: ${ZK_DATA_LOG3:-}
target: /datalog

View File

@ -27,7 +27,7 @@ function configure
kill -0 $left_pid
disown $left_pid
set +m
while ! clickhouse-client --port 9001 --query "select 1" ; do kill -0 $left_pid ; echo . ; sleep 1 ; done
while ! clickhouse-client --port 9001 --query "select 1" && kill -0 $left_pid ; do echo . ; sleep 1 ; done
echo server for setup started
clickhouse-client --port 9001 --query "create database test" ||:
@ -71,9 +71,9 @@ function restart
set +m
while ! clickhouse-client --port 9001 --query "select 1" ; do kill -0 $left_pid ; echo . ; sleep 1 ; done
while ! clickhouse-client --port 9001 --query "select 1" && kill -0 $left_pid ; do echo . ; sleep 1 ; done
echo left ok
while ! clickhouse-client --port 9002 --query "select 1" ; do kill -0 $right_pid ; echo . ; sleep 1 ; done
while ! clickhouse-client --port 9002 --query "select 1" && kill -0 $right_pid ; do echo . ; sleep 1 ; done
echo right ok
clickhouse-client --port 9001 --query "select * from system.tables where database != 'system'"
@ -263,7 +263,7 @@ done
wait
unset IFS
parallel --verbose --null < analyze-commands.txt
parallel --null < analyze-commands.txt
}
# Analyze results
@ -314,6 +314,25 @@ create table queries_old_format engine File(TSVWithNamesAndTypes, 'queries.rep')
from queries
;
-- save all test runs as JSON for the new comparison page
create table all_query_funs_json engine File(JSON, 'report/all-query-runs.json') as
select test, query, versions_runs[1] runs_left, versions_runs[2] runs_right
from (
select
test, query,
groupArrayInsertAt(runs, version) versions_runs
from (
select
replaceAll(_file, '-queries.tsv', '') test,
query, version,
groupArray(time) runs
from file('*-queries.tsv', TSV, 'query text, run int, version UInt32, time float')
group by test, query, version
)
group by test, query
)
;
create table changed_perf_tsv engine File(TSV, 'report/changed-perf.tsv') as
select left, right, diff, stat_threshold, changed_fail, test, query from queries where changed_show
order by abs(diff) desc;
@ -542,7 +561,7 @@ case "$stage" in
# to collect the logs. Prefer not to restart, because addresses might change
# and we won't be able to process trace_log data. Start in a subshell, so that
# it doesn't interfere with the watchdog through `wait`.
( time get_profiles || restart || get_profiles ||: )
( get_profiles || restart || get_profiles ||: )
# Kill the whole process group, because somehow when the subshell is killed,
# the sleep inside remains alive and orphaned.

View File

@ -1,10 +1,10 @@
## system.table\_name {#system-tables_table-name}
## system.table_name {#system-tables_table-name}
Description.
Columns:
- `column_name` ([data\_type\_name](path/to/data_type.md)) — Description.
- `column_name` ([data_type_name](path/to/data_type.md)) — Description.
**Example**

View File

@ -5,7 +5,7 @@ toc_title: How to Build ClickHouse on Mac OS X
# How to Build ClickHouse on Mac OS X {#how-to-build-clickhouse-on-mac-os-x}
Build should work on Mac OS X 10.15 (Catalina)
Build should work on Mac OS X 10.15 (Catalina).
## Install Homebrew {#install-homebrew}

View File

@ -7,7 +7,7 @@ Building of ClickHouse is supported on Linux, FreeBSD and Mac OS X.
# If You Use Windows {#if-you-use-windows}
If you use Windows, you need to create a virtual machine with Ubuntu. To start working with a virtual machine please install VirtualBox. You can download Ubuntu from the website: https://www.ubuntu.com/\#download. Please create a virtual machine from the downloaded image (you should reserve at least 4GB of RAM for it). To run a command-line terminal in Ubuntu, please locate a program containing the word “terminal” in its name (gnome-terminal, konsole etc.) or just press Ctrl+Alt+T.
If you use Windows, you need to create a virtual machine with Ubuntu. To start working with a virtual machine please install VirtualBox. You can download Ubuntu from the website: https://www.ubuntu.com/#download. Please create a virtual machine from the downloaded image (you should reserve at least 4GB of RAM for it). To run a command-line terminal in Ubuntu, please locate a program containing the word “terminal” in its name (gnome-terminal, konsole etc.) or just press Ctrl+Alt+T.
# If You Use a 32-bit System {#if-you-use-a-32-bit-system}

View File

@ -3,4 +3,4 @@ toc_folder_title: Engines
toc_priority: 25
---
{## [Original article](https://clickhouse.tech/docs/en/engines/) ##}

View File

@ -4,3 +4,4 @@ toc_priority: 76
---
{## [Original article](https://clickhouse.tech/docs/en/faq) ##}

View File

@ -50,7 +50,7 @@ sudo rpm --import https://repo.clickhouse.tech/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.tech/rpm/stable/x86_64
```
If you want to use the most recent version, replace `stable` with `testing` (this is recommended for your testing environments). The `prestable` tag is sometimes available too.
If you want to use the most recent version, replace `stable` with `testing` (this is recommended for your testing environments). `prestable` is sometimes also available.
Then run these commands to install packages:

View File

@ -8,27 +8,27 @@ toc_title: Playground
[ClickHouse Playground](https://play.clickhouse.tech) allows people to experiment with ClickHouse by running queries instantly, without setting up their server or cluster.
Several example datasets are available in the Playground as well as sample queries that show ClickHouse features. There's also a selection of ClickHouse LTS releases to experiment with.
ClickHouse Playground gives the experience of m2.small [Managed Service for ClickHouse](https://cloud.yandex.com/services/managed-clickhouse) instance hosted in [Yandex.Cloud](https://cloud.yandex.com/). More information about [cloud providers](../commercial/cloud.md).
ClickHouse Playground gives the experience of m2.small [Managed Service for ClickHouse](https://cloud.yandex.com/services/managed-clickhouse) instance (4 vCPU, 32 GB RAM) hosted in [Yandex.Cloud](https://cloud.yandex.com/). More information about [cloud providers](../commercial/cloud.md).
You can make queries to playground using any HTTP client, for example [curl](https://curl.haxx.se) or [wget](https://www.gnu.org/software/wget/), or set up a connection using [JDBC](../interfaces/jdbc.md) or [ODBC](../interfaces/odbc.md) drivers. More information about software products that support ClickHouse is available [here](../interfaces/index.md).
## Credentials
| Parameter | Value |
|:------------------|:----------------------------------------|
|:--------------------|:----------------------------------------|
| HTTPS endpoint | `https://play-api.clickhouse.tech:8443` |
| Native endpoint | `play-api.clickhouse.tech:9440` |
| Native TCP endpoint | `play-api.clickhouse.tech:9440` |
| User | `playground` |
| Password | `clickhouse` |
!!! note "Note"
Note that all endpoints require a secure TLS connection.
There are additional endpoints with specific ClickHouse releases to experiment with their differences (ports and user/password are the same as above):
* 20.3 LTS: `play-api-v20-3.clickhouse.tech`
* 19.14 LTS: `play-api-v19-14.clickhouse.tech`
!!! note "Note"
All these endpoints require a secure TLS connection.
## Limitations
The queries are executed as a read-only user. It implies some limitations:

View File

@ -11,7 +11,7 @@ toc_title: Adopters
| Company | Industry | Usecase | Cluster Size | (Un)Compressed Data Size<abbr title="of single replica"><sup>\*</sup></abbr> | Reference |
|---------------------------------------------------------------------|---------------------------------|-----------------------|------------------------------------------------------------|------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| [2gis](https://2gis.ru){.favicon} | Maps | Monitoring | — | — | [Talk in Russian, July 2019](https://youtu.be/58sPkXfq6nw) |
| [Aloha&nbsp;Browser](https://alohabrowser.com/){.favicon} | Mobile App | Browser backend | — | — | [Slides in Russian, May 2019](https://github.com/yandex/clickhouse-presentations/blob/master/meetup22/aloha.pdf) |
| [Aloha&nbsp;Browser](https://alohabrowser.com/){.favicon} | Mobile App | Browser backend | — | — | [Slides in Russian, May 2019](https://presentations.clickhouse.tech/meetup22/aloha.pdf) |
| [Amadeus](https://amadeus.com/){.favicon} | Travel | Analytics | — | — | [Press Release, April 2018](https://www.altinity.com/blog/2018/4/5/amadeus-technologies-launches-investment-and-insights-tool-based-on-machine-learning-and-strategy-algorithms) |
| [Appsflyer](https://www.appsflyer.com){.favicon} | Mobile analytics | Main product | — | — | [Talk in Russian, July 2019](https://www.youtube.com/watch?v=M3wbRlcpBbY) |
| [ArenaData](https://arenadata.tech/){.favicon} | Data Platform | Main product | — | — | [Slides in Russian, December 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup38/indexes.pdf) |

View File

@ -27,4 +27,4 @@ Under the same conditions, ClickHouse can handle several hundred queries per sec
We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second. When inserting to a MergeTree table from a tab-separated dump, the insertion speed can be from 50 to 200 MB/s. If the inserted rows are around 1 Kb in size, the speed will be from 50,000 to 200,000 rows per second. If the rows are small, the performance can be higher in rows per second (on Banner System data -`>` 500,000 rows per second; on Graphite data -`>` 1,000,000 rows per second). To improve performance, you can make multiple INSERT queries in parallel, which scales linearly.
[Original article](https://clickhouse.tech/docs/en/introduction/performance/) <!--hide-->
{## [Original article](https://clickhouse.tech/docs/en/introduction/performance/) ##}

View File

@ -733,7 +733,7 @@ Example
<mysql_port>9004</mysql_port>
```
## tmp\_path {#server-settings-tmp_path}
## tmp_path {#tmp-path}
Path to temporary data for processing large queries.
@ -746,16 +746,17 @@ Path to temporary data for processing large queries.
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
```
## tmp\_policy {#server-settings-tmp-policy}
## tmp_policy {#tmp-policy}
Policy from [`storage_configuration`](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes) to store temporary files.
If not set [`tmp_path`](#server-settings-tmp_path) is used, otherwise it is ignored.
Policy from [storage_configuration](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes) to store temporary files.
If not set, [tmp_path](#tmp-path) is used, otherwise it is ignored.
!!! note "Note"
- `move_factor` is ignored
- `keep_free_space_bytes` is ignored
- `max_data_part_size_bytes` is ignored
- you must have exactly one volume in that policy
- `move_factor` is ignored.
- `keep_free_space_bytes` is ignored.
- `max_data_part_size_bytes` is ignored.
- Уou must have exactly one volume in that policy.
## uncompressed\_cache\_size {#server-settings-uncompressed_cache_size}

View File

@ -1026,27 +1026,32 @@ Possible values:
Default value: 0.
## optimize\_skip\_unused\_shards {#settings-optimize_skip_unused_shards}
## optimize_skip_unused_shards {#optimize-skip-unused-shards}
Enables or disables skipping of unused shards for SELECT queries that have sharding key condition in PREWHERE/WHERE (assumes that the data is distributed by sharding key, otherwise do nothing).
Default value: 0
## force\_optimize\_skip\_unused\_shards {#settings-force_optimize_skip_unused_shards}
Enables or disables query execution if [`optimize_skip_unused_shards`](#settings-optimize_skip_unused_shards) enabled and skipping of unused shards is not possible. If the skipping is not possible and the setting is enabled exception will be thrown.
Enables or disables skipping of unused shards for [SELECT](../../sql-reference/statements/select/index.md) queries that have sharding key condition in `WHERE/PREWHERE` (assuming that the data is distributed by sharding key, otherwise does nothing).
Possible values:
- 0 - Disabled (do not throws)
- 1 - Disable query execution only if the table has sharding key
- 2 - Disable query execution regardless sharding key is defined for the table
- 0 — Disabled.
- 1 — Enabled.
Default value: 0
## force_optimize_skip_unused_shards {#force-optimize-skip-unused-shards}
Enables or disables query execution if [optimize_skip_unused_shards](#optimize-skip-unused-shards) is enabled and skipping of unused shards is not possible. If the skipping is not possible and the setting is enabled, an exception will be thrown.
Possible values:
- 0 — Disabled. ClickHouse doesn't throw an exception.
- 1 — Enabled. Query execution is disabled only if the table has a sharding key.
- 2 — Enabled. Query execution is disabled regardless of whether a sharding key is defined for the table.
Default value: 0
## force\_optimize\_skip\_unused\_shards\_no\_nested {#settings-force_optimize_skip_unused_shards_no_nested}
Reset [`optimize_skip_unused_shards`](#settings-force_optimize_skip_unused_shards) for nested `Distributed` table
Reset [`optimize_skip_unused_shards`](#optimize-skip-unused-shards) for nested `Distributed` table
Possible values:
@ -1250,7 +1255,9 @@ Default value: Empty
## background\_pool\_size {#background_pool_size}
Sets the number of threads performing background operations in table engines (for example, merges in [MergeTree engine](../../engines/table-engines/mergetree-family/index.md) tables). This setting is applied at ClickHouse server start and cant be changed in a user session. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
Sets the number of threads performing background operations in table engines (for example, merges in [MergeTree engine](../../engines/table-engines/mergetree-family/index.md) tables). This setting is applied from `default` profile at ClickHouse server start and cant be changed in a user session. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
Before changing it, please also take a look at related [MergeTree settings](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-merge_tree), such as `number_of_free_entries_in_pool_to_lower_max_size_of_merge` and `number_of_free_entries_in_pool_to_execute_mutation`.
Possible values:

View File

@ -536,26 +536,26 @@ Contains logging entries. Logging level which goes to this table can be limited
Columns:
- `event_date` (`Date`) - Date of the entry.
- `event_time` (`DateTime`) - Time of the entry.
- `microseconds` (`UInt32`) - Microseconds of the entry.
- `event_date` (Date) — Date of the entry.
- `event_time` (DateTime) — Time of the entry.
- `microseconds` (UInt32) — Microseconds of the entry.
- `thread_name` (String) — Name of the thread from which the logging was done.
- `thread_id` (UInt64) — OS thread ID.
- `level` (`Enum8`) - Entry level.
- `'Fatal' = 1`
- `'Critical' = 2`
- `'Error' = 3`
- `'Warning' = 4`
- `'Notice' = 5`
- `'Information' = 6`
- `'Debug' = 7`
- `'Trace' = 8`
- `query_id` (`String`) - ID of the query.
- `logger_name` (`LowCardinality(String)`) - Name of the logger (i.e. `DDLWorker`)
- `message` (`String`) - The message itself.
- `revision` (`UInt32`) - ClickHouse revision.
- `source_file` (`LowCardinality(String)`) - Source file from which the logging was done.
- `source_line` (`UInt64`) - Source line from which the logging was done.
- `level` (`Enum8`) — Entry level. Possible values:
- `1` or `'Fatal'`.
- `2` or `'Critical'`.
- `3` or `'Error'`.
- `4` or `'Warning'`.
- `5` or `'Notice'`.
- `6` or `'Information'`.
- `7` or `'Debug'`.
- `8` or `'Trace'`.
- `query_id` (String) — ID of the query.
- `logger_name` (LowCardinality(String)) — Name of the logger (i.e. `DDLWorker`).
- `message` (String) — The message itself.
- `revision` (UInt32) — ClickHouse revision.
- `source_file` (LowCardinality(String)) — Source file from which the logging was done.
- `source_line` (UInt64) — Source line from which the logging was done.
## system.query\_log {#system_tables-query_log}

View File

@ -1543,20 +1543,32 @@ It represents an unbiased estimate of the variance of a random variable if passe
Returns `Float64`. When `n <= 1`, returns `+∞`.
!!! note "Note"
This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `varSampStable` function. It works slower but provides a lower computational error.
## varPop(x) {#varpopx}
Calculates the amount `Σ((x - x̅)^2) / n`, where `n` is the sample size and `x̅`is the average value of `x`.
In other words, dispersion for a set of values. Returns `Float64`.
!!! note "Note"
This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `varPopStable` function. It works slower but provides a lower computational error.
## stddevSamp(x) {#stddevsampx}
The result is equal to the square root of `varSamp(x)`.
!!! note "Note"
This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `stddevSampStable` function. It works slower but provides a lower computational error.
## stddevPop(x) {#stddevpopx}
The result is equal to the square root of `varPop(x)`.
!!! note "Note"
This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `stddevPopStable` function. It works slower but provides a lower computational error.
## topK(N)(x) {#topknx}
Returns an array of the approximately most frequent values in the specified column. The resulting array is sorted in descending order of approximate frequency of values (not by the values themselves).
@ -1641,14 +1653,23 @@ Calculates the value of `Σ((x - x̅)(y - y̅)) / (n - 1)`.
Returns Float64. When `n <= 1`, returns +∞.
!!! note "Note"
This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `covarSampStable` function. It works slower but provides a lower computational error.
## covarPop(x, y) {#covarpopx-y}
Calculates the value of `Σ((x - x̅)(y - y̅)) / n`.
!!! note "Note"
This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `covarPopStable` function. It works slower but provides a lower computational error.
## corr(x, y) {#corrx-y}
Calculates the Pearson correlation coefficient: `Σ((x - x̅)(y - y̅)) / sqrt(Σ((x - x̅)^2) * Σ((y - y̅)^2))`.
!!! note "Note"
This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `corrStable` function. It works slower but provides a lower computational error.
## categoricalInformationValue {#categoricalinformationvalue}
Calculates the value of `(P(tag = 1) - P(tag = 0))(log(P(tag = 1)) - log(P(tag = 0)))` for each category.

View File

@ -53,16 +53,16 @@ An exception is thrown when dividing by zero or when dividing a minimal negative
Differs from intDiv in that it returns zero when dividing by zero or when dividing a minimal negative number by minus one.
## modulo(a, b), a % b operator {#moduloa-b-a-b-operator}
## modulo(a, b), a % b operator {#modulo}
Calculates the remainder after division.
If arguments are floating-point numbers, they are pre-converted to integers by dropping the decimal portion.
The remainder is taken in the same sense as in C++. Truncated division is used for negative numbers.
An exception is thrown when dividing by zero or when dividing a minimal negative number by minus one.
## moduloOrZero(a, b) {#moduloorzeroa-b}
## moduloOrZero(a, b) {#modulo-or-zero}
Differs from modulo in that it returns zero when the divisor is zero.
Differs from [modulo](#modulo) in that it returns zero when the divisor is zero.
## negate(a), -a operator {#negatea-a-operator}

View File

@ -201,17 +201,17 @@ All changes on replicated tables are broadcasting to ZooKeeper so will be applie
The following operations with [partitions](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) are available:
- [DETACH PARTITION](#alter_detach-partition) Moves a partition to the `detached` directory and forget it.
- [DROP PARTITION](#alter_drop-partition) Deletes a partition.
- [ATTACH PART\|PARTITION](#alter_attach-partition) Adds a part or partition from the `detached` directory to the table.
- [ATTACH PARTITION FROM](#alter_attach-partition-from) Copies the data partition from one table to another and adds.
- [REPLACE PARTITION](#alter_replace-partition) - Copies the data partition from one table to another and replaces.
- [MOVE PARTITION TO TABLE](#alter_move_to_table-partition)(#alter_move_to_table-partition) - Move the data partition from one table to another.
- [CLEAR COLUMN IN PARTITION](#alter_clear-column-partition) - Resets the value of a specified column in a partition.
- [CLEAR INDEX IN PARTITION](#alter_clear-index-partition) - Resets the specified secondary index in a partition.
- [FREEZE PARTITION](#alter_freeze-partition) Creates a backup of a partition.
- [FETCH PARTITION](#alter_fetch-partition) Downloads a partition from another server.
- [MOVE PARTITION\|PART](#alter_move-partition) Move partition/data part to another disk or volume.
- [DETACH PARTITION](#alter_detach-partition) Moves a partition to the `detached` directory and forget it.
- [DROP PARTITION](#alter_drop-partition) Deletes a partition.
- [ATTACH PART\|PARTITION](#alter_attach-partition) Adds a part or partition from the `detached` directory to the table.
- [ATTACH PARTITION FROM](#alter_attach-partition-from) Copies the data partition from one table to another and adds.
- [REPLACE PARTITION](#alter_replace-partition) Copies the data partition from one table to another and replaces.
- [MOVE PARTITION TO TABLE](#alter_move_to_table-partition) — Moves the data partition from one table to another.
- [CLEAR COLUMN IN PARTITION](#alter_clear-column-partition) Resets the value of a specified column in a partition.
- [CLEAR INDEX IN PARTITION](#alter_clear-index-partition) Resets the specified secondary index in a partition.
- [FREEZE PARTITION](#alter_freeze-partition) Creates a backup of a partition.
- [FETCH PARTITION](#alter_fetch-partition) Downloads a partition from another server.
- [MOVE PARTITION\|PART](#alter_move-partition) Move partition/data part to another disk or volume.
<!-- -->
@ -307,13 +307,13 @@ For the query to run successfully, the following conditions must be met:
ALTER TABLE table_source MOVE PARTITION partition_expr TO TABLE table_dest
```
This query move the data partition from the `table_source` to `table_dest` with deleting the data from `table_source`.
This query moves the data partition from the `table_source` to `table_dest` with deleting the data from `table_source`.
For the query to run successfully, the following conditions must be met:
- Both tables must have the same structure.
- Both tables must have the same partition key.
- Both tables must be the same engine family. (replicated or non-replicated)
- Both tables must be the same engine family (replicated or non-replicated).
- Both tables must have the same storage policy.
#### CLEAR COLUMN IN PARTITION {#alter_clear-column-partition}

View File

@ -5,13 +5,12 @@ toc_title: Roadmap
# Roadmap {#roadmap}
## Q1 2020 {#q1-2020}
- Role-based access control
## Q2 2020 {#q2-2020}
- Integration with external authentication services
## Q3 2020 {#q3-2020}
- Resource pools for more precise distribution of cluster capacity between users
{## [Original article](https://clickhouse.tech/docs/en/roadmap/) ##}

View File

@ -25,6 +25,7 @@ toc_title: Integrations
- Message queues
- [Kafka](https://kafka.apache.org)
- [clickhouse\_sinker](https://github.com/housepower/clickhouse_sinker) (uses [Go client](https://github.com/ClickHouse/clickhouse-go/))
- [stream-loader-clickhouse](https://github.com/adform/stream-loader)
- Stream processing
- [Flink](https://flink.apache.org)
- [flink-clickhouse-sink](https://github.com/ivi-ru/flink-clickhouse-sink)

View File

@ -27,6 +27,7 @@ toc_title: "\u06CC\u06A9\u067E\u0627\u0631\u0686\u06AF\u06CC"
- صف پیام
- [کافکا](https://kafka.apache.org)
- [در حال بارگذاری](https://github.com/housepower/clickhouse_sinker) (استفاده [برو کارگیر](https://github.com/ClickHouse/clickhouse-go/))
- [stream-loader-clickhouse](https://github.com/adform/stream-loader)
- پردازش جریان
- [لرزش](https://flink.apache.org)
- [سینک فلینک-کلیک](https://github.com/ivi-ru/flink-clickhouse-sink)

View File

@ -27,6 +27,7 @@ toc_title: "Int\xE9gration"
- Files d'attente de messages
- [Kafka](https://kafka.apache.org)
- [clickhouse\_sinker](https://github.com/housepower/clickhouse_sinker) (utiliser [Allez client](https://github.com/ClickHouse/clickhouse-go/))
- [stream-loader-clickhouse](https://github.com/adform/stream-loader)
- Traitement de flux
- [Flink](https://flink.apache.org)
- [flink-clickhouse-évier](https://github.com/ivi-ru/flink-clickhouse-sink)

View File

@ -27,6 +27,7 @@ toc_title: "\u7D71\u5408"
- メッセージキュ
- [カフカ](https://kafka.apache.org)
- [clickhouse\_sinker](https://github.com/housepower/clickhouse_sinker) (用途 [Goクライアント](https://github.com/ClickHouse/clickhouse-go/))
- [stream-loader-clickhouse](https://github.com/adform/stream-loader)
- ストリーム処理
- [フリンク](https://flink.apache.org)
- [フリンク-クリックハウス-シンク](https://github.com/ivi-ru/flink-clickhouse-sink)

View File

@ -113,7 +113,7 @@ ClickHouse может слить куски данных таким образо
Если название вложенной таблицы заканчивается на `Map` и она содержит не менее двух столбцов, удовлетворяющих критериям:
- первый столбец - числовой `(*Int*, Date, DateTime)`, назовем его условно `key`,
- первый столбец - числовой `(*Int*, Date, DateTime)` или строковый `(String, FixedString)`, назовем его условно `key`,
- остальные столбцы - арифметические `(*Int*, Float32/64)`, условно `(values...)`,
то вложенная таблица воспринимается как отображение `key => (values...)` и при слиянии её строк выполняется слияние элементов двух множеств по `key` со сложением соответствующих `(values...)`.

View File

@ -38,7 +38,7 @@ sudo rpm --import https://repo.yandex.ru/clickhouse/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.yandex.ru/clickhouse/rpm/stable/x86_64
```
Для использования наиболее свежих версий нужно заменить `stable` на `testing` (рекомендуется для тестовых окружений).
Для использования наиболее свежих версий нужно заменить `stable` на `testing` (рекомендуется для тестовых окружений). Также иногда доступен `prestable`.
Для, собственно, установки пакетов необходимо выполнить следующие команды:

View File

@ -45,6 +45,7 @@
- [ClickHouse.Net](https://github.com/ilyabreev/ClickHouse.Net)
- Elixir
- [clickhousex](https://github.com/appodeal/clickhousex/)
- [pillar](https://github.com/sofakingworld/pillar)
- Nim
- [nim-clickhouse](https://github.com/leonardoce/nim-clickhouse)

View File

@ -20,6 +20,7 @@
- Очереди сообщений
- [Kafka](https://kafka.apache.org)
- [clickhouse\_sinker](https://github.com/housepower/clickhouse_sinker) (использует [Go client](https://github.com/ClickHouse/clickhouse-go/))
- [stream-loader-clickhouse](https://github.com/adform/stream-loader)
- Потоковая обработка
- [Flink](https://flink.apache.org)
- [flink-clickhouse-sink](https://github.com/ivi-ru/flink-clickhouse-sink)

View File

@ -686,7 +686,7 @@ TCP порт для защищённого обмена данными с кли
<mysql_port>9004</mysql_port>
```
## tmp\_path {#tmp-path}
## tmp_path {#tmp-path}
Путь ко временным данным для обработки больших запросов.
@ -698,6 +698,17 @@ TCP порт для защищённого обмена данными с кли
``` xml
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
```
## tmp_policy {#tmp-policy}
Политика из [storage_configuration](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes) для хранения временных файлов.
Если политика не задана, используется [tmp_path](#tmp-path). В противном случае `tmp_path` игнорируется.
!!! note "Примечание"
- `move_factor` игнорируется.
- `keep_free_space_bytes` игнорируется.
- `max_data_part_size_bytes` игнорируется.
- В данной политике у вас должен быть ровно один том.
## uncompressed\_cache\_size {#server-settings-uncompressed_cache_size}

View File

@ -1025,6 +1025,29 @@ ClickHouse генерирует исключение
Значение по умолчанию: 0.
## optimize_skip_unused_shards {#optimize-skip-unused-shards}
Включает или отключает пропуск неиспользуемых шардов для запросов [SELECT](../../sql-reference/statements/select/index.md) , в которых условие ключа шардирования задано в секции `WHERE/PREWHERE`. Предполагается, что данные распределены с помощью ключа шардирования, в противном случае настройка ничего не делает.
Возможные значения:
- 0 — Выключена.
- 1 — Включена.
Значение по умолчанию: 0
## force_optimize_skip_unused_shards {#force-optimize-skip-unused-shards}
Разрешает или запрещает выполнение запроса, если настройка [optimize_skip_unused_shards](#optimize-skip-unused-shards) включена, а пропуск неиспользуемых шардов невозможен. Если данная настройка включена и пропуск невозможен, ClickHouse генерирует исключение.
Возможные значения:
- 0 — Выключена. ClickHouse не генерирует исключение.
- 1 — Включена. Выполнение запроса запрещается, только если у таблицы есть ключ шардирования.
- 2 — Включена. Выполнение запроса запрещается, даже если для таблицы не определен ключ шардирования.
Значение по умолчанию: 0
## optimize\_throw\_if\_noop {#setting-optimize_throw_if_noop}
Включает или отключает генерирование исключения в в случаях, когда запрос [OPTIMIZE](../../sql-reference/statements/misc.md#misc_operations-optimize) не выполняет мёрж.

View File

@ -517,6 +517,33 @@ CurrentMetric_ReplicatedChecks: 0
- `query` (String) текст запроса. Для запросов `INSERT` не содержит встаявляемые данные.
- `query_id` (String) идентификатор запроса, если был задан.
## system.text\_log {#system-tables-text-log}
Содержит записи логов. Уровень логирования для таблицы может быть ограничен параметром сервера `text_log.level`.
Столбцы:
- `event_date` (Date) — Дата создания записи.
- `event_time` (DateTime) — Время создания записи.
- `microseconds` (UInt32) — Время создания записи в микросекундах.
- `thread_name` (String) — Название потока, из которого была сделана запись.
- `thread_id` (UInt64) — Идентификатор потока ОС.
- `level` (Enum8) — Уровень логирования записи. Возможные значения:
- `1` или `'Fatal'`.
- `2` или `'Critical'`.
- `3` или `'Error'`.
- `4` или `'Warning'`.
- `5` или `'Notice'`.
- `6` или `'Information'`.
- `7` или `'Debug'`.
- `8` или `'Trace'`.
- `query_id` (String) — Идентификатор запроса.
- `logger_name` (LowCardinality(String)) — Название логгера (`DDLWorker`).
- `message` (String) — Само тело записи.
- `revision` (UInt32) — Ревизия ClickHouse.
- `source_file` (LowCardinality(String)) — Исходный файл, из которого была сделана запись.
- `source_line` (UInt64) — Исходная строка, из которой была сделана запись.
## system.query\_log {#system_tables-query_log}
Содержит информацию о выполнении запросов. Для каждого запроса вы можете увидеть время начала обработки, продолжительность обработки, сообщения об ошибках и другую информацию.

View File

@ -1533,20 +1533,33 @@ SELECT medianDeterministic(val, 1) FROM t
Возвращает `Float64`. В случае, когда `n <= 1`, возвращается `+∞`.
!!! note "Примечание"
Функция использует вычислительно неустойчивый алгоритм. Если для ваших расчётов необходима [вычислительная устойчивость](https://ru.wikipedia.org/wiki/Вычислительная_устойчивость), используйте функцию `varSampStable`. Она работает медленнее, но обеспечиват меньшую вычислительную ошибку.
## varPop(x) {#varpopx}
Вычисляет величину `Σ((x - x̅)^2) / n`, где `n` - размер выборки, `x̅`- среднее значение `x`.
То есть, дисперсию для множества значений. Возвращает `Float64`.
!!! note "Примечание"
Функция использует вычислительно неустойчивый алгоритм. Если для ваших расчётов необходима [вычислительная устойчивость](https://ru.wikipedia.org/wiki/Вычислительная_устойчивость), используйте функцию `varPopStable`. Она работает медленнее, но обеспечивает меньшую вычислительную ошибку.
## stddevSamp(x) {#stddevsampx}
Результат равен квадратному корню от `varSamp(x)`.
!!! note "Примечание"
Функция использует вычислительно неустойчивый алгоритм. Если для ваших расчётов необходима [вычислительная устойчивость](https://ru.wikipedia.org/wiki/Вычислительная_устойчивость), используйте функцию `stddevSampStable`. Она работает медленнее, но обеспечивает меньшую вычислительную ошибку.
## stddevPop(x) {#stddevpopx}
Результат равен квадратному корню от `varPop(x)`.
!!! note "Примечание"
Функция использует вычислительно неустойчивый алгоритм. Если для ваших расчётов необходима [вычислительная устойчивость](https://ru.wikipedia.org/wiki/Вычислительная_устойчивость), используйте функцию `stddevPopStable`. Она работает медленнее, но обеспечивает меньшую вычислительную ошибку.
## topK(N)(column) {#topkncolumn}
Возвращает массив наиболее часто встречающихся значений в указанном столбце. Результирующий массив упорядочен по убыванию частоты значения (не по самим значениям).
@ -1626,14 +1639,24 @@ SELECT topKWeighted(10)(number, number) FROM numbers(1000)
Возвращает Float64. В случае, когда `n <= 1`, возвращается +∞.
!!! note "Примечание"
Функция использует вычислительно неустойчивый алгоритм. Если для ваших расчётов необходима [вычислительная устойчивость](https://ru.wikipedia.org/wiki/Вычислительная_устойчивость), используйте функцию `covarSampStable`. Она работает медленнее, но обеспечивает меньшую вычислительную ошибку.
## covarPop(x, y) {#covarpopx-y}
Вычисляет величину `Σ((x - x̅)(y - y̅)) / n`.
!!! note "Примечание"
Функция использует вычислительно неустойчивый алгоритм. Если для ваших расчётов необходима [вычислительная устойчивость](https://ru.wikipedia.org/wiki/Вычислительная_устойчивость), используйте функцию `covarPopStable`. Она работает медленнее, но обеспечивает меньшую вычислительную ошибку.
## corr(x, y) {#corrx-y}
Вычисляет коэффициент корреляции Пирсона: `Σ((x - x̅)(y - y̅)) / sqrt(Σ((x - x̅)^2) * Σ((y - y̅)^2))`.
!!! note "Примечание"
Функция использует вычислительно неустойчивый алгоритм. Если для ваших расчётов необходима [вычислительная устойчивость](https://ru.wikipedia.org/wiki/Вычислительная_устойчивость), используйте функцию `corrStable`. Она работает медленнее, но обеспечивает меньшую вычислительную ошибку.
## simpleLinearRegression {#simplelinearregression}
Выполняет простую (одномерную) линейную регрессию.

View File

@ -48,13 +48,17 @@ SELECT toTypeName(0), toTypeName(0 + 0), toTypeName(0 + 0 + 0), toTypeName(0 + 0
Отличается от intDiv тем, что при делении на ноль или при делении минимального отрицательного числа на минус единицу, возвращается ноль.
## modulo(a, b), оператор a % b {#moduloa-b-operator-a-b}
## modulo(a, b), оператор a % b {#modulo}
Вычисляет остаток от деления.
Если аргументы - числа с плавающей запятой, то они предварительно преобразуются в целые числа, путём отбрасывания дробной части.
Берётся остаток в том же смысле, как это делается в C++. По факту, для отрицательных чисел, используется truncated division.
При делении на ноль или при делении минимального отрицательного числа на минус единицу, кидается исключение.
## moduloOrZero(a, b) {#modulo-or-zero}
В отличие от [modulo](#modulo), возвращает ноль при делении на ноль.
## negate(a), оператор -a {#negatea-operator-a}
Вычисляет число, обратное по знаку. Результат всегда имеет знаковый тип.

View File

@ -1,6 +1,6 @@
# Функции для работы с внешними словарями {#ext_dict_functions}
Информацию о подключении и настройке внешних словарей смотрите в разделе [Внешние словари](../../sql-reference/functions/ext-dict-functions.md).
Информацию о подключении и настройке внешних словарей смотрите в разделе [Внешние словари](../../sql-reference/dictionaries/external-dictionaries/external-dicts.md).
## dictGet {#dictget}

View File

@ -204,17 +204,17 @@ ALTER TABLE [db].name DROP CONSTRAINT constraint_name;
Для работы с [партициями](../../sql-reference/statements/alter.md) доступны следующие операции:
- [DETACH PARTITION](#alter_detach-partition) перенести партицию в директорию `detached`;
- [DROP PARTITION](#alter_drop-partition) удалить партицию;
- [ATTACH PARTITION\|PART](#alter_attach-partition) добавить партицию/кусок в таблицу из директории `detached`;
- [ATTACH PARTITION FROM](#alter_attach-partition-from) скопировать партицию из другой таблицы;
- [REPLACE PARTITION](#alter_replace-partition) скопировать партицию из другой таблицы с заменой;
- [MOVE PARTITION TO TABLE](#alter_move_to_table-partition) (\#alter\_move\_to\_table-partition) - переместить партицию в другую таблицу;
- [CLEAR COLUMN IN PARTITION](#alter_clear-column-partition) удалить все значения в столбце для заданной партиции;
- [CLEAR INDEX IN PARTITION](#alter_clear-index-partition) - очистить построенные вторичные индексы для заданной партиции;
- [FREEZE PARTITION](#alter_freeze-partition) создать резервную копию партиции;
- [FETCH PARTITION](#alter_fetch-partition) скачать партицию с другого сервера;
- [MOVE PARTITION\|PART](#alter_move-partition) переместить партицию/кускок на другой диск или том.
- [DETACH PARTITION](#alter_detach-partition) перенести партицию в директорию `detached`;
- [DROP PARTITION](#alter_drop-partition) удалить партицию;
- [ATTACH PARTITION\|PART](#alter_attach-partition) добавить партицию/кусок в таблицу из директории `detached`;
- [ATTACH PARTITION FROM](#alter_attach-partition-from) скопировать партицию из другой таблицы;
- [REPLACE PARTITION](#alter_replace-partition) скопировать партицию из другой таблицы с заменой;
- [MOVE PARTITION TO TABLE](#alter_move_to_table-partition) переместить партицию в другую таблицу;
- [CLEAR COLUMN IN PARTITION](#alter_clear-column-partition) удалить все значения в столбце для заданной партиции;
- [CLEAR INDEX IN PARTITION](#alter_clear-index-partition) очистить построенные вторичные индексы для заданной партиции;
- [FREEZE PARTITION](#alter_freeze-partition) создать резервную копию партиции;
- [FETCH PARTITION](#alter_fetch-partition) скачать партицию с другого сервера;
- [MOVE PARTITION\|PART](#alter_move-partition) переместить партицию/кускок на другой диск или том.
#### DETACH PARTITION {#alter_detach-partition}
@ -312,12 +312,14 @@ ALTER TABLE table2 REPLACE PARTITION partition_expr FROM table1
ALTER TABLE table_source MOVE PARTITION partition_expr TO TABLE table_dest
```
Перемещает партицию из таблицы `table_source` в таблицу `table_dest` (добавляет к существующим данным в `table_dest`), с удалением данных из таблицы `table_source`.
Перемещает партицию из таблицы `table_source` в таблицу `table_dest` (добавляет к существующим данным в `table_dest`) с удалением данных из таблицы `table_source`.
Следует иметь в виду:
- Таблицы должны иметь одинаковую структуру.
- Для таблиц должен быть задан одинаковый ключ партиционирования.
- Движки таблиц должны быть одинакового семейства (реплицированные или нереплицированные).
- Для таблиц должна быть задана одинаковая политика хранения.
#### CLEAR COLUMN IN PARTITION {#alter_clear-column-partition}

View File

@ -119,6 +119,11 @@ class PatchedMacrosPlugin(macros.plugin.MacrosPlugin):
def on_page_markdown(self, markdown, page, config, files):
markdown = super(PatchedMacrosPlugin, self).on_page_markdown(markdown, page, config, files)
if os.path.islink(page.file.abs_src_path):
lang = config.data['theme']['language']
page.canonical_url = page.canonical_url.replace(f'/{lang}/', '/en/', 1)
if config.data['extra'].get('version_prefix') or config.data['extra'].get('single_page'):
return markdown
if self.skip_git_log:

View File

@ -44,7 +44,7 @@ then
if [[ ! -z "${CLOUDFLARE_TOKEN}" ]]
then
sleep 1m
git diff --stat="9999,9999" --diff-filter=M HEAD~1 | grep '|' | awk '$1 ~ /\.html$/ { if ($3>4) { url="https://clickhouse.tech/"$1; sub(/\/index.html/, "/", url); print "\""url"\""; }}' | split -l 25 /dev/stdin PURGE
git diff --stat="9999,9999" --diff-filter=M HEAD~1 | grep '|' | awk '$1 ~ /\.html$/ { if ($3>8) { url="https://content.clickhouse.tech/"$1; sub(/\/index.html/, "/", url); print "\""url"\""; }}' | split -l 25 /dev/stdin PURGE
for FILENAME in $(ls PURGE*)
do
POST_DATA=$(cat "${FILENAME}" | sed -n -e 'H;${x;s/\n/,/g;s/^,//;p;}' | awk '{print "{\"files\":["$0"]}";}')

View File

@ -18,7 +18,7 @@ Markdown==3.2.1
MarkupSafe==1.1.1
mkdocs==1.1.2
mkdocs-htmlproofer-plugin==0.0.3
mkdocs-macros-plugin==0.4.7
mkdocs-macros-plugin==0.4.9
nltk==3.5
nose==1.3.7
protobuf==3.12.0

View File

@ -1,9 +1,11 @@
import concurrent.futures
import hashlib
import json
import logging
import os
import shutil
import subprocess
import sys
import bs4
import closure
@ -20,14 +22,19 @@ def adjust_markdown_html(content):
content,
features='html.parser'
)
for a in soup.find_all('a'):
a_class = a.attrs.get('class')
if a_class and 'headerlink' in a_class:
a.string = '\xa0'
for details in soup.find_all('details'):
for summary in details.find_all('summary'):
if summary.parent != details:
summary.extract()
details.insert(0, summary)
for div in soup.find_all('div'):
div.attrs['role'] = 'alert'
div_class = div.attrs.get('class')
is_admonition = div_class and 'admonition' in div.attrs.get('class')
if is_admonition:
for a in div.find_all('a'):
a_class = a.attrs.get('class')
if a_class:
@ -36,9 +43,10 @@ def adjust_markdown_html(content):
a.attrs['class'] = 'alert-link'
for p in div.find_all('p'):
p_class = p.attrs.get('class')
if p_class and ('admonition-title' in p_class):
p.attrs['class'] = p_class + ['alert-heading', 'display-5', 'mb-2']
if div_class and 'admonition' in div.attrs.get('class'):
if is_admonition and p_class and ('admonition-title' in p_class):
p.attrs['class'] = p_class + ['alert-heading', 'display-6', 'mb-2']
if is_admonition:
div.attrs['role'] = 'alert'
if ('info' in div_class) or ('note' in div_class):
mode = 'alert-primary'
elif ('attention' in div_class) or ('warning' in div_class):
@ -49,7 +57,7 @@ def adjust_markdown_html(content):
mode = 'alert-info'
else:
mode = 'alert-secondary'
div.attrs['class'] = div_class + ['alert', 'lead', 'pb-0', 'mb-4', mode]
div.attrs['class'] = div_class + ['alert', 'pb-0', 'mb-4', mode]
return str(soup)
@ -138,6 +146,7 @@ def get_js_in(args):
f"'{args.website_dir}/js/jquery.js'",
f"'{args.website_dir}/js/popper.js'",
f"'{args.website_dir}/js/bootstrap.js'",
f"'{args.website_dir}/js/sentry.js'",
f"'{args.website_dir}/js/base.js'",
f"'{args.website_dir}/js/index.js'",
f"'{args.website_dir}/js/docsearch.js'",
@ -145,6 +154,28 @@ def get_js_in(args):
]
def minify_file(path, css_digest, js_digest):
if not (
path.endswith('.html') or
path.endswith('.css')
):
return
logging.info('Minifying %s', path)
with open(path, 'rb') as f:
content = f.read().decode('utf-8')
if path.endswith('.html'):
content = minify_html(content)
content = content.replace('base.css?css_digest', f'base.css?{css_digest}')
content = content.replace('base.js?js_digest', f'base.js?{js_digest}')
elif path.endswith('.css'):
content = cssmin.cssmin(content)
elif path.endswith('.js'):
content = jsmin.jsmin(content)
with open(path, 'wb') as f:
f.write(content.encode('utf-8'))
def minify_website(args):
css_in = ' '.join(get_css_in(args))
css_out = f'{args.output_dir}/css/base.css'
@ -190,28 +221,17 @@ def minify_website(args):
if args.minify:
logging.info('Minifying website')
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for root, _, filenames in os.walk(args.output_dir):
for filename in filenames:
path = os.path.join(root, filename)
if not (
filename.endswith('.html') or
filename.endswith('.css')
):
continue
logging.info('Minifying %s', path)
with open(path, 'rb') as f:
content = f.read().decode('utf-8')
if filename.endswith('.html'):
content = minify_html(content)
content = content.replace('base.css?css_digest', f'base.css?{css_digest}')
content = content.replace('base.js?js_digest', f'base.js?{js_digest}')
elif filename.endswith('.css'):
content = cssmin.cssmin(content)
elif filename.endswith('.js'):
content = jsmin.jsmin(content)
with open(path, 'wb') as f:
f.write(content.encode('utf-8'))
futures.append(executor.submit(minify_file, path, css_digest, js_digest))
for future in futures:
exc = future.exception()
if exc:
logging.error(exc)
sys.exit(1)
def process_benchmark_results(args):

View File

@ -27,6 +27,7 @@ toc_title: Entegrasyonlar
- Mesaj kuyrukları
- [Kafka](https://kafka.apache.org)
- [clickhouse\_sinker](https://github.com/housepower/clickhouse_sinker) (kullanma [Go client](https://github.com/ClickHouse/clickhouse-go/))
- [stream-loader-clickhouse](https://github.com/adform/stream-loader)
- Akış işleme
- [Flink](https://flink.apache.org)
- [flink-clickhouse-lavabo](https://github.com/ivi-ru/flink-clickhouse-sink)

View File

@ -1,7 +1,5 @@
---
machine_translated: true
machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
toc_folder_title: "\u53D1\u52A8\u673A"
toc_folder_title: "\u5f15\u64ce"
toc_priority: 25
---

View File

@ -19,6 +19,7 @@
- 消息队列
- [卡夫卡](https://kafka.apache.org)
- [clickhouse\_sinker](https://github.com/housepower/clickhouse_sinker) (使用 [去客户](https://github.com/ClickHouse/clickhouse-go/))
- [stream-loader-clickhouse](https://github.com/adform/stream-loader)
- 流处理
- [Flink](https://flink.apache.org)
- [flink-clickhouse-sink](https://github.com/ivi-ru/flink-clickhouse-sink)

View File

@ -380,7 +380,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::string tmp_path = config().getString("tmp_path", path + "tmp/");
std::string tmp_policy = config().getString("tmp_policy", "");
const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy);
for (const DiskPtr & disk : volume->disks)
for (const DiskPtr & disk : volume->getDisks())
setupTmpPath(log, disk->getPath());
}

View File

@ -405,6 +405,9 @@
</prometheus>
-->
<!-- Lazy system.*_log table creation -->
<!-- <system_tables_lazy_load>false</system_tables_lazy_load> -->
<!-- Query log. Used only for queries with setting log_queries = 1. -->
<query_log>
<!-- What table to insert data. If table is not exist, it will be created.

View File

@ -38,6 +38,7 @@ namespace ErrorCodes
{
extern const int PARAMETER_OUT_OF_BOUND;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int OPENCL_ERROR;
extern const int LOGICAL_ERROR;
}
@ -120,6 +121,30 @@ namespace
};
}
template <typename T>
void ColumnVector<T>::getSpecialPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res,
IColumn::SpecialSort special_sort) const
{
if (special_sort == IColumn::SpecialSort::OPENCL_BITONIC)
{
#if !defined(ARCADIA_BUILD)
#if USE_OPENCL
if (!limit || limit >= data.size())
{
res.resize(data.size());
if (data.empty() || BitonicSort::getInstance().sort(data, res, !reverse))
return;
}
#else
throw DB::Exception("'special_sort = bitonic' specified but OpenCL not available", DB::ErrorCodes::OPENCL_ERROR);
#endif
#endif
}
getPermutation(reverse, limit, nan_direction_hint, res);
}
template <typename T>
void ColumnVector<T>::getPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res) const
{
@ -144,14 +169,6 @@ void ColumnVector<T>::getPermutation(bool reverse, size_t limit, int nan_directi
}
else
{
#if !defined(ARCADIA_BUILD)
#if USE_OPENCL
/// If bitonic sort if specified as preferred than `nan_direction_hint` equals specific value 42.
if (nan_direction_hint == 42 && BitonicSort::getInstance().sort(data, res, !reverse))
return;
#endif
#endif
/// A case for radix sort
if constexpr (is_arithmetic_v<T> && !std::is_same_v<T, UInt128>)
{

View File

@ -189,6 +189,8 @@ public:
}
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res) const override;
void getSpecialPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res,
IColumn::SpecialSort) const override;
void reserve(size_t n) override
{

View File

@ -245,6 +245,17 @@ public:
*/
virtual void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const = 0;
enum class SpecialSort
{
NONE = 0,
OPENCL_BITONIC,
};
virtual void getSpecialPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res, SpecialSort) const
{
getPermutation(reverse, limit, nan_direction_hint, res);
}
/** Copies each element according offsets parameter.
* (i-th element should be copied offsets[i] - offsets[i - 1] times.)
* It is necessary in ARRAY JOIN operation.

View File

@ -11,13 +11,6 @@
#include <CL/cl.h>
#endif
#include <algorithm>
#include <cmath>
#include <cstdlib>
#include <cstdint>
#include <map>
#include <type_traits>
#include <ext/bit_cast.h>
#include <Core/Types.h>
#include <Core/Defines.h>
@ -30,6 +23,20 @@
class BitonicSort
{
public:
using KernelType = OCL::KernelType;
enum Types
{
KernelInt8 = 0,
KernelUInt8,
KernelInt16,
KernelUInt16,
KernelInt32,
KernelUInt32,
KernelInt64,
KernelUInt64,
KernelMax
};
static BitonicSort & getInstance()
{
@ -39,32 +46,39 @@ public:
/// Sorts given array in specified order. Returns `true` if given sequence was sorted, `false` otherwise.
template <typename T>
bool sort(const DB::PaddedPODArray<T> & data, DB::IColumn::Permutation & res, cl_uint sort_ascending)
bool sort(const DB::PaddedPODArray<T> & data, DB::IColumn::Permutation & res, cl_uint sort_ascending [[maybe_unused]]) const
{
size_t s = data.size();
if constexpr (
std::is_same_v<T, Int8> ||
std::is_same_v<T, UInt8> ||
std::is_same_v<T, Int16> ||
std::is_same_v<T, UInt16> ||
std::is_same_v<T, Int32> ||
std::is_same_v<T, UInt32> ||
std::is_same_v<T, Int64> ||
std::is_same_v<T, UInt64>)
{
size_t data_size = data.size();
/// Getting the nearest power of 2.
size_t power = 1;
if (s <= 8) power = 8;
else while (power < s) power <<= 1;
size_t power = 8;
while (power < data_size)
power <<= 1;
/// Allocates more space for additional stubs to be added if needed.
std::vector<T> pairs_content(power);
std::vector<UInt32> pairs_indices(power);
for (UInt32 i = 0; i < s; ++i)
{
pairs_content[i] = data[i];
memcpy(&pairs_content[0], &data[0], sizeof(T) * data_size);
for (UInt32 i = 0; i < data_size; ++i)
pairs_indices[i] = i;
}
bool result = sort(pairs_content.data(), pairs_indices.data(), s, power - s, sort_ascending);
if (!result) return false;
fillWithStubs(pairs_content.data(), pairs_indices.data(), data_size, power - data_size, sort_ascending);
sort(pairs_content.data(), pairs_indices.data(), power, sort_ascending);
for (size_t i = 0, shift = 0; i < power; ++i)
{
if (pairs_indices[i] >= s)
if (pairs_indices[i] >= data_size)
{
++shift;
continue;
@ -75,6 +89,9 @@ public:
return true;
}
return false;
}
/// Creating a configuration instance with making all OpenCl required variables
/// such as device, platform, context, queue, program and kernel.
void configure()
@ -84,29 +101,36 @@ public:
cl_platform_id platform = OCL::getPlatformID(settings);
cl_device_id device = OCL::getDeviceID(platform, settings);
cl_context gpu_context = OCL::makeContext(device, settings);
cl_command_queue command_queue = OCL::makeCommandQueue(device, gpu_context, settings);
cl_command_queue command_queue = OCL::makeCommandQueue<2>(device, gpu_context, settings);
cl_program program = OCL::makeProgram(bitonic_sort_kernels, gpu_context, device, settings);
/// Creating kernels for each specified data type.
cl_int error = 0;
kernels.resize(KernelMax);
kernels["char"] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_char", &error),
clReleaseKernel);
kernels["uchar"] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_uchar", &error),
clReleaseKernel);
kernels["short"] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_short", &error),
clReleaseKernel);
kernels["ushort"] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_ushort", &error),
clReleaseKernel);
kernels["int"] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_int", &error),
clReleaseKernel);
kernels["uint"] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_uint", &error),
clReleaseKernel);
kernels["long"] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_long", &error),
clReleaseKernel);
kernels["ulong"] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_ulong", &error),
clReleaseKernel);
kernels[KernelInt8] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_char", &error), clReleaseKernel);
OCL::checkError(error);
kernels[KernelUInt8] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_uchar", &error), clReleaseKernel);
OCL::checkError(error);
kernels[KernelInt16] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_short", &error), clReleaseKernel);
OCL::checkError(error);
kernels[KernelUInt16] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_ushort", &error), clReleaseKernel);
OCL::checkError(error);
kernels[KernelInt32] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_int", &error), clReleaseKernel);
OCL::checkError(error);
kernels[KernelUInt32] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_uint", &error), clReleaseKernel);
OCL::checkError(error);
kernels[KernelInt64] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_long", &error), clReleaseKernel);
OCL::checkError(error);
kernels[KernelUInt64] = std::shared_ptr<KernelType>(clCreateKernel(program, "bitonicSort_ulong", &error), clReleaseKernel);
OCL::checkError(error);
configuration = std::shared_ptr<OCL::Configuration>(new OCL::Configuration(device, gpu_context, command_queue, program));
@ -114,97 +138,24 @@ public:
private:
/// Dictionary with kernels for each type from list: uchar, char, ushort, short, uint, int, ulong and long.
std::map<std::string, std::shared_ptr<KernelType>> kernels;
std::vector<std::shared_ptr<KernelType>> kernels;
/// Current configuration with core OpenCL instances.
std::shared_ptr<OCL::Configuration> configuration = nullptr;
/// Returns `true` if given sequence was sorted, `false` otherwise.
template <typename T>
bool sort(T * p_input, cl_uint * indices, cl_int array_size, cl_int number_of_stubs, cl_uint sort_ascending)
{
if (typeid(T).name() == typeid(cl_char).name())
sort_char(reinterpret_cast<cl_char *>(p_input), indices, array_size, number_of_stubs, sort_ascending);
else if (typeid(T) == typeid(cl_uchar))
sort_uchar(reinterpret_cast<cl_uchar *>(p_input), indices, array_size, number_of_stubs, sort_ascending);
else if (typeid(T) == typeid(cl_short))
sort_short(reinterpret_cast<cl_short *>(p_input), indices, array_size, number_of_stubs, sort_ascending);
else if (typeid(T) == typeid(cl_ushort))
sort_ushort(reinterpret_cast<cl_ushort *>(p_input), indices, array_size, number_of_stubs, sort_ascending);
else if (typeid(T) == typeid(cl_int))
sort_int(reinterpret_cast<cl_int *>(p_input), indices, array_size, number_of_stubs, sort_ascending);
else if (typeid(T) == typeid(cl_uint))
sort_uint(reinterpret_cast<cl_uint *>(p_input), indices, array_size, number_of_stubs, sort_ascending);
else if (typeid(T) == typeid(cl_long))
sort_long(reinterpret_cast<cl_long *>(p_input), indices, array_size, number_of_stubs, sort_ascending);
else if (typeid(T) == typeid(cl_ulong))
sort_ulong(reinterpret_cast<cl_ulong *>(p_input), indices, array_size, number_of_stubs, sort_ascending);
else
return false;
return true;
}
/// Specific functions for each integer type.
void sort_char(cl_char * p_input, cl_uint * indices, cl_int array_size, cl_int number_of_stubs, cl_uint sort_ascending)
{
cl_char stubs_value = sort_ascending ? CHAR_MAX : CHAR_MIN;
fillWithStubs(number_of_stubs, stubs_value, p_input, indices, array_size);
sort(kernels["char"].get(), p_input, indices, array_size + number_of_stubs, sort_ascending);
}
void sort_uchar(cl_uchar * p_input, cl_uint * indices, cl_int array_size, cl_int number_of_stubs, cl_uint sort_ascending)
{
cl_uchar stubs_value = sort_ascending ? UCHAR_MAX : 0;
fillWithStubs(number_of_stubs, stubs_value, p_input, indices, array_size);
sort(kernels["uchar"].get(), p_input, indices, array_size + number_of_stubs, sort_ascending);
}
void sort_short(cl_short * p_input, cl_uint * indices, cl_int array_size, cl_int number_of_stubs, cl_uint sort_ascending)
{
cl_short stubs_value = sort_ascending ? SHRT_MAX : SHRT_MIN;
fillWithStubs(number_of_stubs, stubs_value, p_input, indices, array_size);
sort(kernels["short"].get(), p_input, indices, array_size + number_of_stubs, sort_ascending);
}
void sort_ushort(cl_ushort * p_input, cl_uint * indices, cl_int array_size, cl_int number_of_stubs, cl_uint sort_ascending)
{
cl_ushort stubs_value = sort_ascending ? USHRT_MAX : 0;
fillWithStubs(number_of_stubs, stubs_value, p_input, indices, array_size);
sort(kernels["ushort"].get(), p_input, indices, array_size + number_of_stubs, sort_ascending);
}
void sort_int(cl_int * p_input, cl_uint * indices, cl_int array_size, cl_int number_of_stubs, cl_uint sort_ascending)
{
cl_int stubs_value = sort_ascending ? INT_MAX : INT_MIN;
fillWithStubs(number_of_stubs, stubs_value, p_input, indices, array_size);
sort(kernels["int"].get(), p_input, indices, array_size + number_of_stubs, sort_ascending);
}
void sort_uint(cl_uint * p_input, cl_uint * indices, cl_int array_size, cl_int number_of_stubs, cl_uint sort_ascending)
{
cl_uint stubs_value = sort_ascending ? UINT_MAX : 0;
fillWithStubs(number_of_stubs, stubs_value, p_input, indices, array_size);
sort(kernels["uint"].get(), p_input, indices, array_size + number_of_stubs, sort_ascending);
}
void sort_long(cl_long * p_input, cl_uint * indices, cl_int array_size, cl_int number_of_stubs, cl_uint sort_ascending)
{
cl_long stubs_value = sort_ascending ? LONG_MAX : LONG_MIN;
fillWithStubs(number_of_stubs, stubs_value, p_input, indices, array_size);
sort(kernels["long"].get(), p_input, indices, array_size + number_of_stubs, sort_ascending);
}
void sort_ulong(cl_ulong * p_input, cl_uint * indices, cl_int array_size, cl_int number_of_stubs, cl_uint sort_ascending)
{
cl_ulong stubs_value = sort_ascending ? ULONG_MAX : 0;
fillWithStubs(number_of_stubs, stubs_value, p_input, indices, array_size);
sort(kernels["ulong"].get(), p_input, indices, array_size + number_of_stubs, sort_ascending);
}
cl_kernel getKernel(Int8) const { return kernels[KernelInt8].get(); }
cl_kernel getKernel(UInt8) const { return kernels[KernelUInt8].get(); }
cl_kernel getKernel(Int16) const { return kernels[KernelInt16].get(); }
cl_kernel getKernel(UInt16) const { return kernels[KernelUInt16].get(); }
cl_kernel getKernel(Int32) const { return kernels[KernelInt32].get(); }
cl_kernel getKernel(UInt32) const { return kernels[KernelUInt32].get(); }
cl_kernel getKernel(Int64) const { return kernels[KernelInt64].get(); }
cl_kernel getKernel(UInt64) const { return kernels[KernelUInt64].get(); }
/// Sorts p_input inplace with indices. Works only with arrays which size equals to power of two.
template <class T>
void sort(cl_kernel kernel, T * p_input, cl_uint * indices, cl_int array_size, cl_uint sort_ascending)
void sort(T * p_input, cl_uint * indices, cl_int array_size, cl_uint sort_ascending) const
{
cl_kernel kernel = getKernel(T(0));
cl_int error = CL_SUCCESS;
cl_int num_stages = 0;
@ -246,7 +197,7 @@ private:
}
template <class T>
void configureKernel(cl_kernel kernel, int number_of_argument, void * source)
void configureKernel(cl_kernel kernel, int number_of_argument, void * source) const
{
cl_int error = clSetKernelArg(kernel, number_of_argument, sizeof(T), source);
OCL::checkError(error);
@ -254,9 +205,9 @@ private:
/// Fills given sequences from `arraySize` index with `numberOfStubs` values.
template <class T>
void fillWithStubs(cl_int number_of_stubs, T value, T * p_input,
cl_uint * indices, cl_int array_size)
void fillWithStubs(T * p_input, cl_uint * indices, cl_int array_size, cl_int number_of_stubs, cl_uint sort_ascending) const
{
T value = sort_ascending ? std::numeric_limits<T>::max() : std::numeric_limits<T>::min();
for (cl_int index = 0; index < number_of_stubs; ++index)
{
p_input[array_size + index] = value;
@ -264,7 +215,7 @@ private:
}
}
BitonicSort() {}
BitonicSort(BitonicSort const &);
void operator=(BitonicSort const &);
BitonicSort() = default;
BitonicSort(BitonicSort const &) = delete;
void operator = (BitonicSort const &) = delete;
};

View File

@ -496,7 +496,9 @@ namespace ErrorCodes
extern const int OPENCL_ERROR = 522;
extern const int UNKNOWN_ROW_POLICY = 523;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN = 524;
extern const int CASSANDRA_INTERNAL_ERROR = 525;
extern const int INCORRECT_DISK_INDEX = 525;
extern const int UNKNOWN_VOLUME_TYPE = 526;
extern const int CASSANDRA_INTERNAL_ERROR = 527;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -17,24 +17,18 @@
#include <Core/Types.h>
#include <Common/Exception.h>
#ifndef CL_VERSION_2_0
#define CL_USE_DEPRECATED_OPENCL_1_2_APIS
#endif
using KernelType = std::remove_reference<decltype(*cl_kernel())>::type;
namespace DB
{
namespace ErrorCodes
{
namespace ErrorCodes
{
extern const int OPENCL_ERROR;
}
}
}
struct OCL
{
using KernelType = std::remove_reference<decltype(*cl_kernel())>::type;
/**
* Structure which represents the most essential settings of common OpenCl entities.
@ -211,7 +205,7 @@ struct OCL
static void checkError(cl_int error)
{
if (error != CL_SUCCESS)
throw DB::Exception("OpenCL error " + opencl_error_to_str(error), DB::ErrorCodes::OPENCL_ERROR);
throw DB::Exception("OpenCL error: " + opencl_error_to_str(error), DB::ErrorCodes::OPENCL_ERROR);
}
@ -223,22 +217,18 @@ struct OCL
cl_int error = clGetPlatformIDs(settings.number_of_platform_entries, &platform,
settings.number_of_available_platforms);
checkError(error);
return platform;
}
static cl_device_id getDeviceID(cl_platform_id & platform, const Settings & settings)
{
cl_device_id device;
cl_int error = clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, settings.number_of_devices_entries,
&device, settings.number_of_available_devices);
OCL::checkError(error);
return device;
}
static cl_context makeContext(cl_device_id & device, const Settings & settings)
{
cl_int error;
@ -246,24 +236,34 @@ struct OCL
&device, settings.context_callback, settings.context_callback_data,
&error);
OCL::checkError(error);
return gpu_context;
}
template <int version>
static cl_command_queue makeCommandQueue(cl_device_id & device, cl_context & context, const Settings & settings [[maybe_unused]])
{
cl_int error;
#ifdef CL_USE_DEPRECATED_OPENCL_1_2_APIS
cl_command_queue command_queue = clCreateCommandQueue(context, device, settings.command_queue_properties, &error);
#else
cl_command_queue command_queue = clCreateCommandQueueWithProperties(context, device, nullptr, &error);
#endif
OCL::checkError(error);
cl_command_queue command_queue;
return command_queue;
if constexpr (version == 1)
{
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
command_queue = clCreateCommandQueue(context, device, settings.command_queue_properties, &error);
#pragma GCC diagnostic pop
}
else
{
#ifdef CL_VERSION_2_0
command_queue = clCreateCommandQueueWithProperties(context, device, nullptr, &error);
#else
throw DB::Exception("Binary is built with OpenCL version < 2.0", DB::ErrorCodes::OPENCL_ERROR);
#endif
}
OCL::checkError(error);
return command_queue;
}
static cl_program makeProgram(const char * source_code, cl_context context,
cl_device_id device_id, const Settings & settings)
@ -271,7 +271,8 @@ struct OCL
cl_int error = 0;
size_t source_size = strlen(source_code);
cl_program program = clCreateProgramWithSource(context, settings.number_of_program_source_pointers, &source_code, &source_size, &error);
cl_program program = clCreateProgramWithSource(context, settings.number_of_program_source_pointers,
&source_code, &source_size, &error);
checkError(error);
error = clBuildProgram(program, settings.number_of_devices_entries, &device_id, settings.build_options,
@ -293,39 +294,30 @@ struct OCL
}
checkError(error);
return program;
}
/// Configuring buffer for given input data
template<typename K>
static cl_mem createBuffer(K * p_input, cl_int array_size, cl_context context,
cl_int elements_size = sizeof(K))
static cl_mem createBuffer(K * p_input, cl_int array_size, cl_context context, cl_int elements_size = sizeof(K))
{
cl_int error = CL_SUCCESS;
cl_mem cl_input_buffer =
clCreateBuffer
(
cl_mem cl_input_buffer = clCreateBuffer(
context,
CL_MEM_USE_HOST_PTR,
zeroCopySizeAlignment(elements_size * array_size),
p_input,
&error
);
&error);
checkError(error);
return cl_input_buffer;
}
static size_t zeroCopySizeAlignment(size_t required_size)
{
return required_size + (~required_size + 1) % 64;
}
/// Manipulating with common OpenCL variables.
static void finishCommandQueue(cl_command_queue command_queue)
@ -335,10 +327,8 @@ struct OCL
OCL::checkError(error);
}
template<class T>
static void releaseData(T * origin, cl_int array_size, cl_mem cl_buffer,
cl_command_queue command_queue, size_t offset = 0)
static void releaseData(T * origin, cl_int array_size, cl_mem cl_buffer, cl_command_queue command_queue, size_t offset = 0)
{
cl_int error = CL_SUCCESS;
@ -359,7 +349,6 @@ struct OCL
error = clReleaseMemObject(cl_buffer);
checkError(error);
}
};
#endif

View File

@ -35,10 +35,10 @@ target_link_libraries (compact_array PRIVATE clickhouse_common_io)
add_executable (radix_sort radix_sort.cpp)
target_link_libraries (radix_sort PRIVATE clickhouse_common_io)
# if (USE_OPENCL)
# add_executable (bitonic_sort bitonic_sort.cpp)
# target_link_libraries (bitonic_sort PRIVATE clickhouse_common_io ${OPENCL_LINKER_FLAGS})
# endif ()
if (USE_OPENCL)
add_executable (bitonic_sort bitonic_sort.cpp)
target_link_libraries (bitonic_sort PRIVATE clickhouse_common_io ${OPENCL_LINKER_FLAGS} ${OpenCL_LIBRARIES})
endif ()
add_executable (arena_with_free_lists arena_with_free_lists.cpp)
target_link_libraries (arena_with_free_lists PRIVATE dbms)

View File

@ -1,8 +1,6 @@
#include <Common/config.h>
#include <iostream>
#if USE_OPENCL
#if !defined(__APPLE__) && !defined(__FreeBSD__)
#include <malloc.h>
#endif
@ -16,13 +14,10 @@
#include "Common/BitonicSort.h"
using Key = cl_ulong;
/// Generates vector of size 8 for testing.
/// Vector contains max possible value, min possible value and duplicate values.
template <class Type>
static void generateTest(std::vector<Type>& data, Type min_value, Type max_value)
static void generateTest(std::vector<Type> & data, Type min_value, Type max_value)
{
int size = 10;
@ -62,8 +57,7 @@ static void check(const std::vector<size_t> & indices, bool reverse = true)
template <class Type>
static void sortBitonicSortWithPodArrays(const std::vector<Type>& data,
std::vector<size_t> & indices, bool ascending = true)
static void sortBitonicSortWithPodArrays(const std::vector<Type> & data, std::vector<size_t> & indices, bool ascending = true)
{
DB::PaddedPODArray<Type> pod_array_data = DB::PaddedPODArray<Type>(data.size());
DB::IColumn::Permutation pod_array_indices = DB::IColumn::Permutation(data.size());
@ -74,7 +68,6 @@ static void sortBitonicSortWithPodArrays(const std::vector<Type>& data,
*(pod_array_indices.data() + index) = index;
}
BitonicSort::getInstance().configure();
BitonicSort::getInstance().sort(pod_array_data, pod_array_indices, ascending);
for (size_t index = 0; index < data.size(); ++index)
@ -83,7 +76,7 @@ static void sortBitonicSortWithPodArrays(const std::vector<Type>& data,
template <class Type>
static void testBitonicSort(std::string test_name, Type min_value, Type max_value)
static void testBitonicSort(const std::string & test_name, Type min_value, Type max_value)
{
std::cerr << test_name << std::endl;
@ -102,147 +95,80 @@ static void testBitonicSort(std::string test_name, Type min_value, Type max_valu
static void straightforwardTests()
{
testBitonicSort<cl_char>("Test 01: cl_char.", CHAR_MIN, CHAR_MAX);
testBitonicSort<cl_uchar>("Test 02: cl_uchar.", 0, UCHAR_MAX);
testBitonicSort<cl_short>("Test 03: cl_short.", SHRT_MIN, SHRT_MAX);
testBitonicSort<cl_ushort>("Test 04: cl_ushort.", 0, USHRT_MAX);
testBitonicSort<cl_int>("Test 05: cl_int.", INT_MIN, INT_MAX);
testBitonicSort<cl_uint >("Test 06: cl_uint.", 0, UINT_MAX);
testBitonicSort<cl_long >("Test 07: cl_long.", LONG_MIN, LONG_MAX);
testBitonicSort<cl_ulong >("Test 08: cl_ulong.", 0, ULONG_MAX);
testBitonicSort<DB::Int8>("Test 01: Int8.", CHAR_MIN, CHAR_MAX);
testBitonicSort<DB::UInt8>("Test 02: UInt8.", 0, UCHAR_MAX);
testBitonicSort<DB::Int16>("Test 03: Int16.", SHRT_MIN, SHRT_MAX);
testBitonicSort<DB::UInt16>("Test 04: UInt16.", 0, USHRT_MAX);
testBitonicSort<DB::Int32>("Test 05: Int32.", INT_MIN, INT_MAX);
testBitonicSort<DB::UInt32>("Test 06: UInt32.", 0, UINT_MAX);
testBitonicSort<DB::Int64>("Test 07: Int64.", LONG_MIN, LONG_MAX);
testBitonicSort<DB::UInt64>("Test 08: UInt64.", 0, ULONG_MAX);
}
static void NO_INLINE sort1(Key * data, size_t size)
template <typename T>
static void bitonicSort(std::vector<T> & data)
{
std::sort(data, data + size);
}
static void NO_INLINE sort2(std::vector<Key> & data, std::vector<size_t> & indices)
{
BitonicSort::getInstance().configure();
size_t size = data.size();
std::vector<size_t> indices(size);
for (size_t i = 0; i < size; ++i)
indices[i] = i;
sortBitonicSortWithPodArrays(data, indices);
std::vector<Key> result(data.size());
for (size_t index = 0; index < data.size(); ++index)
result[index] = data[indices[index]];
std::vector<T> result(size);
for (size_t i = 0; i < size; ++i)
result[i] = data[indices[i]];
data = std::move(result);
}
int main(int argc, char ** argv)
template <typename T>
static bool checkSort(const std::vector<T> & data, size_t size)
{
straightforwardTests();
std::vector<T> copy1(data.begin(), data.begin() + size);
std::vector<T> copy2(data.begin(), data.begin() + size);
if (argc < 3)
{
std::cerr << "Not enough arguments were passed\n";
return 1;
}
std::sort(copy1.data(), copy1.data() + size);
bitonicSort<T>(copy2);
size_t n = DB::parse<size_t>(argv[1]);
size_t method = DB::parse<size_t>(argv[2]);
for (size_t i = 0; i < size; ++i)
if (copy1[i] != copy2[i])
return false;
std::vector<Key> data(n);
std::vector<size_t> indices(n);
{
Stopwatch watch;
for (auto & elem : data)
elem = static_cast<Key>(rand());
for (size_t i = 0; i < n; ++i)
indices[i] = i;
watch.stop();
double elapsed = watch.elapsedSeconds();
std::cerr
<< "Filled in " << elapsed
<< " (" << n / elapsed << " elem/sec., "
<< n * sizeof(Key) / elapsed / 1048576 << " MB/sec.)"
<< std::endl;
}
if (n <= 100)
{
std::cerr << std::endl;
for (const auto & elem : data)
std::cerr << elem << ' ';
std::cerr << std::endl;
for (const auto & index : indices)
std::cerr << index << ' ';
std::cerr << std::endl;
}
{
Stopwatch watch;
if (method == 1) sort1(data.data(), n);
if (method == 2) sort2(data, indices);
watch.stop();
double elapsed = watch.elapsedSeconds();
std::cerr
<< "Sorted in " << elapsed
<< " (" << n / elapsed << " elem/sec., "
<< n * sizeof(Key) / elapsed / 1048576 << " MB/sec.)"
<< std::endl;
}
{
Stopwatch watch;
size_t i = 1;
while (i < n)
{
if (!(data[i - 1] <= data[i]))
break;
++i;
}
watch.stop();
double elapsed = watch.elapsedSeconds();
std::cerr
<< "Checked in " << elapsed
<< " (" << n / elapsed << " elem/sec., "
<< n * sizeof(Key) / elapsed / 1048576 << " MB/sec.)"
<< std::endl
<< "Result: " << (i == n ? "Ok." : "Fail!") << std::endl;
}
if (n <= 1000)
{
std::cerr << std::endl;
std::cerr << data[0] << ' ';
for (size_t i = 1; i < n; ++i)
{
if (!(data[i - 1] <= data[i]))
std::cerr << "*** ";
std::cerr << data[i] << ' ';
}
std::cerr << std::endl;
for (const auto & index : indices)
std::cerr << index << ' ';
std::cerr << std::endl;
}
return 0;
return true;
}
#else
int main()
{
std::cerr << "Openc CL disabled.";
BitonicSort::getInstance().configure();
straightforwardTests();
size_t size = 1100;
std::vector<int> data(size);
for (size_t i = 0; i < size; ++i)
data[i] = rand();
for (size_t i = 0; i < 128; ++i)
{
if (!checkSort<int>(data, i))
{
std::cerr << "fail at length " << i << std::endl;
return 1;
}
}
for (size_t i = 128; i < size; i += 7)
{
if (!checkSort<int>(data, i))
{
std::cerr << "fail at length " << i << std::endl;
return 1;
}
}
return 0;
}
#endif

View File

@ -8,3 +8,4 @@
#cmakedefine01 USE_EMBEDDED_COMPILER
#cmakedefine01 USE_INTERNAL_LLVM_LIBRARY
#cmakedefine01 USE_SSL
#cmakedefine01 USE_OPENCL

View File

@ -12,11 +12,13 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int PATH_ACCESS_DENIED;
extern const int INCORRECT_DISK_INDEX;
}
std::mutex DiskLocal::reservation_mutex;
@ -34,7 +36,9 @@ public:
UInt64 getSize() const override { return size; }
DiskPtr getDisk() const override { return disk; }
DiskPtr getDisk(size_t i) const override;
Disks getDisks() const override { return {disk}; }
void update(UInt64 new_size) override;
@ -282,6 +286,15 @@ void DiskLocal::copy(const String & from_path, const std::shared_ptr<IDisk> & to
IDisk::copy(from_path, to_disk, to_path); /// Copy files through buffers.
}
DiskPtr DiskLocalReservation::getDisk(size_t i) const
{
if (i != 0)
{
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
}
return disk;
}
void DiskLocalReservation::update(UInt64 new_size)
{
std::lock_guard lock(DiskLocal::reservation_mutex);

View File

@ -55,7 +55,7 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
constexpr auto default_disk_name = "default";
std::set<String> old_disks_minus_new_disks;
for (const auto & [disk_name, _] : result->disks)
for (const auto & [disk_name, _] : result->getDisksMap())
{
old_disks_minus_new_disks.insert(disk_name);
}
@ -65,10 +65,10 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (result->disks.count(disk_name) == 0)
if (result->getDisksMap().count(disk_name) == 0)
{
auto disk_config_prefix = config_prefix + "." + disk_name;
result->disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
result->addToDiskMap(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
}
else
{

View File

@ -29,6 +29,10 @@ public:
/// Get all disks with names
const auto & getDisksMap() const { return disks; }
void addToDiskMap(String name, DiskPtr disk)
{
disks.emplace(name, disk);
}
private:
std::map<String, DiskPtr> disks;

View File

@ -206,8 +206,11 @@ public:
/// Get reservation size.
virtual UInt64 getSize() const = 0;
/// Get disk where reservation take place.
virtual DiskPtr getDisk() const = 0;
/// Get i-th disk where reservation take place.
virtual DiskPtr getDisk(size_t i = 0) const = 0;
/// Get all disks, used in reservation
virtual Disks getDisks() const = 0;
/// Changes amount of reserved space.
virtual void update(UInt64 new_size) = 0;

View File

@ -8,6 +8,17 @@
namespace DB
{
enum class VolumeType
{
JBOD,
SINGLE_DISK,
UNKNOWN
};
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
using Volumes = std::vector<VolumePtr>;
/**
* Disks group by some (user) criteria. For example,
* - VolumeJBOD("slow_disks", [d1, d2], 100)
@ -22,7 +33,7 @@ namespace DB
class IVolume : public Space
{
public:
IVolume(String name_, Disks disks_): disks(std::move(disks_)), name(std::move(name_))
IVolume(String name_, Disks disks_): disks(std::move(disks_)), name(name_)
{
}
@ -37,16 +48,17 @@ public:
/// Volume name from config
const String & getName() const override { return name; }
virtual VolumeType getType() const = 0;
/// Return biggest unreserved space across all disks
UInt64 getMaxUnreservedFreeSpace() const;
Disks disks;
DiskPtr getDisk(size_t i = 0) const { return disks[i]; }
const Disks & getDisks() const { return disks; }
protected:
Disks disks;
const String name;
};
using VolumePtr = std::shared_ptr<IVolume>;
using Volumes = std::vector<VolumePtr>;
}

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
extern const int FILE_ALREADY_EXISTS;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int UNKNOWN_FORMAT;
extern const int INCORRECT_DISK_INDEX;
}
namespace
@ -369,7 +370,16 @@ public:
UInt64 getSize() const override { return size; }
DiskPtr getDisk() const override { return disk; }
DiskPtr getDisk(size_t i) const override
{
if (i != 0)
{
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
}
return disk;
}
Disks getDisks() const override { return {disk}; }
void update(UInt64 new_size) override
{

View File

@ -0,0 +1,6 @@
#include <Disks/SingleDiskVolume.h>
namespace DB
{
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Disks/IVolume.h>
namespace DB
{
class SingleDiskVolume : public IVolume
{
public:
SingleDiskVolume(const String & name_, DiskPtr disk): IVolume(name_, {disk})
{
}
ReservationPtr reserve(UInt64 bytes) override
{
return disks[0]->reserve(bytes);
}
VolumeType getType() const override { return VolumeType::SINGLE_DISK; }
};
using VolumeSingleDiskPtr = std::shared_ptr<SingleDiskVolume>;
}

View File

@ -55,7 +55,7 @@ StoragePolicy::StoragePolicy(
std::set<String> disk_names;
for (const auto & volume : volumes)
{
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
{
if (disk_names.find(disk->getName()) != disk_names.end())
throw Exception(
@ -102,7 +102,7 @@ bool StoragePolicy::isDefaultPolicy() const
if (volumes[0]->getName() != "default")
return false;
const auto & disks = volumes[0]->disks;
const auto & disks = volumes[0]->getDisks();
if (disks.size() != 1)
return false;
@ -117,7 +117,7 @@ Disks StoragePolicy::getDisks() const
{
Disks res;
for (const auto & volume : volumes)
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
res.push_back(disk);
return res;
}
@ -130,17 +130,17 @@ DiskPtr StoragePolicy::getAnyDisk() const
if (volumes.empty())
throw Exception("StoragePolicy has no volumes. It's a bug.", ErrorCodes::LOGICAL_ERROR);
if (volumes[0]->disks.empty())
if (volumes[0]->getDisks().empty())
throw Exception("Volume '" + volumes[0]->getName() + "' has no disks. It's a bug.", ErrorCodes::LOGICAL_ERROR);
return volumes[0]->disks[0];
return volumes[0]->getDisks()[0];
}
DiskPtr StoragePolicy::getDiskByName(const String & disk_name) const
{
for (auto && volume : volumes)
for (auto && disk : volume->disks)
for (auto && disk : volume->getDisks())
if (disk->getName() == disk_name)
return disk;
return {};
@ -181,7 +181,7 @@ ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const
DiskPtr max_disk;
for (const auto & volume : volumes)
{
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
{
auto avail_space = disk->getAvailableSpace();
if (avail_space > max_space)
@ -207,10 +207,10 @@ void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_pol
throw Exception("New storage policy shall contain volumes of old one", ErrorCodes::LOGICAL_ERROR);
std::unordered_set<String> new_disk_names;
for (const auto & disk : new_storage_policy->getVolumeByName(volume->getName())->disks)
for (const auto & disk : new_storage_policy->getVolumeByName(volume->getName())->getDisks())
new_disk_names.insert(disk->getName());
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
if (new_disk_names.count(disk->getName()) == 0)
throw Exception("New storage policy shall contain disks of old one", ErrorCodes::LOGICAL_ERROR);
}
@ -222,7 +222,7 @@ size_t StoragePolicy::getVolumeIndexByDisk(const DiskPtr & disk_ptr) const
for (size_t i = 0; i < volumes.size(); ++i)
{
const auto & volume = volumes[i];
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
if (disk->getName() == disk_ptr->getName())
return i;
}

View File

@ -4,6 +4,7 @@
#include <Disks/IDisk.h>
#include <Disks/IVolume.h>
#include <Disks/VolumeJBOD.h>
#include <Disks/SingleDiskVolume.h>
#include <IO/WriteHelpers.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>

View File

@ -25,6 +25,8 @@ public:
DiskSelectorPtr disk_selector
);
VolumeType getType() const override { return VolumeType::JBOD; }
/// Next disk (round-robin)
///
/// - Used with policy for temporary data

View File

@ -0,0 +1,17 @@
#include "createVolume.h"
namespace DB
{
VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, VolumePtr other_volume)
{
if (other_volume->getType() == VolumeType::JBOD || other_volume->getType() == VolumeType::SINGLE_DISK)
{
/// Since reservation on JBOD chosies one of disks and makes reservation there, volume
/// for such type of reservation will be with one disk.
return std::make_shared<SingleDiskVolume>(other_volume->getName(), reservation->getDisk());
}
return nullptr;
}
}

12
src/Disks/createVolume.h Normal file
View File

@ -0,0 +1,12 @@
#pragma once
#include <Disks/IVolume.h>
#include <Disks/VolumeJBOD.h>
#include <Disks/SingleDiskVolume.h>
namespace DB
{
VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, VolumePtr other_volume);
}

View File

@ -5,6 +5,7 @@ PEERDIR(
)
SRCS(
createVolume.cpp
DiskFactory.cpp
DiskLocal.cpp
DiskMemory.cpp
@ -12,6 +13,7 @@ SRCS(
IDisk.cpp
IVolume.cpp
registerDisks.cpp
SingleDiskVolume.cpp
StoragePolicy.cpp
VolumeJBOD.cpp
)

View File

@ -471,7 +471,7 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
argument_types.push_back(column.type);
argument_names.push_back(column.name);
}
else if (identifier && node.name == "joinGet" && arg == 0)
else if (identifier && (functionIsJoinGet(node.name) || functionIsDictGet(node.name)) && arg == 0)
{
auto table_id = IdentifierSemantic::extractDatabaseAndTable(*identifier);
table_id = data.context.resolveStorageID(table_id, Context::ResolveOrdinary);
@ -480,7 +480,7 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
ColumnWithTypeAndName column(
ColumnConst::create(std::move(column_string), 1),
std::make_shared<DataTypeString>(),
data.getUniqueName("__joinGet"));
data.getUniqueName("__" + node.name));
data.addAction(ExpressionAction::addColumn(column));
argument_types.push_back(column.type);
argument_names.push_back(column.name);

View File

@ -586,7 +586,7 @@ VolumeJBODPtr Context::setTemporaryStorage(const String & path, const String & p
shared->tmp_volume = tmp_policy->getVolume(0);
}
if (shared->tmp_volume->disks.empty())
if (shared->tmp_volume->getDisks().empty())
throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
return shared->tmp_volume;

View File

@ -1025,6 +1025,12 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
chain.clear();
};
if (storage)
{
query_analyzer.makeSetsForIndex(query.where());
query_analyzer.makeSetsForIndex(query.prewhere());
}
{
ExpressionActionsChain chain(context);
Names additional_required_columns_after_prewhere;

View File

@ -243,8 +243,6 @@ public:
const NamesAndTypesList & aggregationKeys() const { return aggregation_keys; }
const AggregateDescriptions & aggregates() const { return aggregate_descriptions; }
/// Create Set-s that we make from IN section to use index on them.
void makeSetsForIndex(const ASTPtr & node);
const PreparedSets & getPreparedSets() const { return prepared_sets; }
/// Tables that will need to be sent to remote servers for distributed query processing.
@ -275,6 +273,9 @@ private:
*/
SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name);
/// Create Set-s that we make from IN section to use index on them.
void makeSetsForIndex(const ASTPtr & node);
JoinPtr makeTableJoin(const ASTTablesInSelectQueryElement & join_element);
const ASTSelectQuery * getAggregatingQuery() const;

View File

@ -309,12 +309,29 @@ InterpreterSelectQuery::InterpreterSelectQuery(
ASTSelectQuery & query = getSelectQuery();
std::shared_ptr<TableJoin> table_join = joined_tables.makeTableJoin(query);
auto analyze = [&] (bool try_move_to_prewhere = true)
ASTPtr row_policy_filter;
if (storage)
row_policy_filter = context->getRowPolicyCondition(table_id.getDatabaseName(), table_id.getTableName(), RowPolicy::SELECT_FILTER);
auto analyze = [&] (bool try_move_to_prewhere)
{
syntax_analyzer_result = SyntaxAnalyzer(*context).analyzeSelect(
query_ptr, SyntaxAnalyzerResult(source_header.getNamesAndTypesList(), storage),
options, joined_tables.tablesWithColumns(), required_result_column_names, table_join);
if (try_move_to_prewhere && storage && !row_policy_filter && query.where() && !query.prewhere() && !query.final())
{
/// PREWHERE optimization: transfer some condition from WHERE to PREWHERE if enabled and viable
if (const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get()))
{
SelectQueryInfo current_info;
current_info.query = query_ptr;
current_info.syntax_analyzer_result = syntax_analyzer_result;
MergeTreeWhereOptimizer{current_info, *context, *merge_tree, syntax_analyzer_result->requiredSourceColumns(), log};
}
}
/// Save scalar sub queries's results in the query context
if (!options.only_analyze && context->hasQueryContext())
for (const auto & it : syntax_analyzer_result->getScalars())
@ -365,7 +382,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
source_header = storage->getSampleBlockForColumns(required_columns);
/// Fix source_header for filter actions.
auto row_policy_filter = context->getRowPolicyCondition(table_id.getDatabaseName(), table_id.getTableName(), RowPolicy::SELECT_FILTER);
if (row_policy_filter)
{
filter_info = std::make_shared<FilterInfo>();
@ -378,10 +394,10 @@ InterpreterSelectQuery::InterpreterSelectQuery(
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
/// Calculate structure of the result.
result_header = getSampleBlockImpl(try_move_to_prewhere);
result_header = getSampleBlockImpl();
};
analyze();
analyze(settings.optimize_move_to_prewhere);
bool need_analyze_again = false;
if (analysis_result.prewhere_constant_filter_description.always_false || analysis_result.prewhere_constant_filter_description.always_true)
@ -481,40 +497,8 @@ QueryPipeline InterpreterSelectQuery::executeWithProcessors()
}
Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere)
Block InterpreterSelectQuery::getSampleBlockImpl()
{
auto & query = getSelectQuery();
const Settings & settings = context->getSettingsRef();
/// Do all AST changes here, because actions from analysis_result will be used later in readImpl.
if (storage)
{
query_analyzer->makeSetsForIndex(query.where());
query_analyzer->makeSetsForIndex(query.prewhere());
/// PREWHERE optimization.
/// Turn off, if the table filter (row-level security) is applied.
if (!context->getRowPolicyCondition(table_id.getDatabaseName(), table_id.getTableName(), RowPolicy::SELECT_FILTER))
{
auto optimize_prewhere = [&](auto & merge_tree)
{
SelectQueryInfo current_info;
current_info.query = query_ptr;
current_info.syntax_analyzer_result = syntax_analyzer_result;
current_info.sets = query_analyzer->getPreparedSets();
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && try_move_to_prewhere && query.where() && !query.prewhere() && !query.final())
MergeTreeWhereOptimizer{current_info, *context, merge_tree,
syntax_analyzer_result->requiredSourceColumns(), log};
};
if (const auto * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
optimize_prewhere(*merge_tree_data);
}
}
if (storage && !options.only_analyze)
from_stage = storage->getQueryProcessingStage(*context, options.to_stage, query_ptr);

View File

@ -106,7 +106,7 @@ private:
ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); }
Block getSampleBlockImpl(bool try_move_to_prewhere);
Block getSampleBlockImpl();
struct Pipeline
{

View File

@ -38,16 +38,18 @@ void MarkTableIdentifiersMatcher::visit(const ASTFunction & func, ASTPtr &, Data
if (functionIsInOrGlobalInOperator(func.name))
{
auto & ast = func.arguments->children.at(1);
if (auto opt_name = tryGetIdentifierName(ast))
if (!data.aliases.count(*opt_name))
auto opt_name = tryGetIdentifierName(ast);
if (opt_name && !data.aliases.count(*opt_name))
setIdentifierSpecial(ast);
}
// first argument of joinGet can be a table identifier
if (func.name == "joinGet")
// First argument of joinGet can be a table name, perhaps with a database.
// First argument of dictGet can be a dictionary name, perhaps with a database.
if (functionIsJoinGet(func.name) || functionIsDictGet(func.name))
{
auto & ast = func.arguments->children.at(0);
if (auto opt_name = tryGetIdentifierName(ast))
auto opt_name = tryGetIdentifierName(ast);
if (opt_name && !data.aliases.count(*opt_name))
setIdentifierSpecial(ast);
}
}

View File

@ -767,8 +767,13 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyzeSelect(
const auto & settings = context.getSettingsRef();
const NameSet & source_columns_set = result.source_columns_set;
if (table_join)
{
result.analyzed_join = table_join;
if (!result.analyzed_join) /// ExpressionAnalyzer expects some not empty object here
result.analyzed_join->resetCollected();
}
else /// TODO: remove. For now ExpressionAnalyzer expects some not empty object here
result.analyzed_join = std::make_shared<TableJoin>();
if (remove_duplicates)

View File

@ -97,7 +97,7 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
if (metric_log)
logs.emplace_back(metric_log.get());
bool lazy_load = config.getBool("system_tables_lazy_load", true);
bool lazy_load = config.getBool("system_tables_lazy_load", false);
try
{

View File

@ -165,9 +165,9 @@ private:
// Queue is bounded. But its size is quite large to not block in all normal cases.
std::vector<LogElement> queue;
// An always-incrementing index of the first message currently in the queue.
// We use it to give a global sequential index to every message, so that we can wait
// until a particular message is flushed. This is used to implement synchronous log
// flushing for SYSTEM FLUSH LOGS.
// We use it to give a global sequential index to every message, so that we
// can wait until a particular message is flushed. This is used to implement
// synchronous log flushing for SYSTEM FLUSH LOGS.
uint64_t queue_front_index = 0;
bool is_shutdown = false;
std::condition_variable flush_event;
@ -175,6 +175,8 @@ private:
uint64_t requested_flush_before = 0;
// Flushed log up to this index, exclusive
uint64_t flushed_before = 0;
// Logged overflow message at this queue front index
uint64_t logged_queue_full_at_index = -1;
void savingThreadFunction();
@ -244,9 +246,22 @@ void SystemLog<LogElement>::add(const LogElement & element)
if (queue.size() >= DBMS_SYSTEM_LOG_QUEUE_SIZE)
{
// TextLog sets its logger level to 0, so this log is a noop and there
// is no recursive logging.
LOG_ERROR(log, "Queue is full for system log '" + demangle(typeid(*this).name()) + "'.");
// Ignore all further entries until the queue is flushed.
// Log a message about that. Don't spam it -- this might be especially
// problematic in case of trace log. Remember what the front index of the
// queue was when we last logged the message. If it changed, it means the
// queue was flushed, and we can log again.
if (queue_front_index != logged_queue_full_at_index)
{
logged_queue_full_at_index = queue_front_index;
// TextLog sets its logger level to 0, so this log is a noop and
// there is no recursive logging.
LOG_ERROR(log, "Queue is full for system log '"
<< demangle(typeid(*this).name()) << "'"
<< " at " << queue_front_index);
}
return;
}
@ -325,9 +340,16 @@ void SystemLog<LogElement>::savingThreadFunction()
uint64_t to_flush_end = 0;
{
LOG_TRACE(log, "Sleeping");
std::unique_lock lock(mutex);
flush_event.wait_for(lock, std::chrono::milliseconds(flush_interval_milliseconds),
[&] () { return requested_flush_before > flushed_before || is_shutdown; });
const bool predicate = flush_event.wait_for(lock,
std::chrono::milliseconds(flush_interval_milliseconds),
[&] ()
{
return requested_flush_before > flushed_before
|| is_shutdown;
}
);
queue_front_index += queue.size();
to_flush_end = queue_front_index;
@ -337,6 +359,13 @@ void SystemLog<LogElement>::savingThreadFunction()
queue.swap(to_flush);
exit_this_thread = is_shutdown;
LOG_TRACE(log, "Woke up"
<< (predicate ? " by condition" : " by timeout ("
+ toString(flush_interval_milliseconds) + " ms)")
<< ", " << to_flush.size() << " elements to flush"
<< " up to " << to_flush_end
<< (is_shutdown ? ", shutdown requested" : ""));
}
if (to_flush.empty())
@ -351,6 +380,7 @@ void SystemLog<LogElement>::savingThreadFunction()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
LOG_TRACE(log, "Terminating");
}
@ -359,11 +389,13 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
{
try
{
LOG_TRACE(log, "Flushing system log");
LOG_TRACE(log, "Flushing system log, "
<< to_flush.size() << " entries to flush");
/// We check for existence of the table and create it as needed at every flush.
/// This is done to allow user to drop the table at any moment (new empty table will be created automatically).
/// BTW, flush method is called from single thread.
/// We check for existence of the table and create it as needed at every
/// flush. This is done to allow user to drop the table at any moment
/// (new empty table will be created automatically). BTW, flush method
/// is called from single thread.
prepareTable();
Block block = LogElement::createBlock();
@ -389,9 +421,13 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
tryLogCurrentException(__PRETTY_FUNCTION__);
}
{
std::unique_lock lock(mutex);
flushed_before = to_flush_end;
flush_event.notify_all();
}
LOG_TRACE(log, "Flushed system log");
}

View File

@ -29,6 +29,18 @@ TableJoin::TableJoin(const Settings & settings, VolumeJBODPtr tmp_volume_)
join_algorithm = JoinAlgorithm::PREFER_PARTIAL_MERGE;
}
void TableJoin::resetCollected()
{
key_names_left.clear();
key_names_right.clear();
key_asts_left.clear();
key_asts_right.clear();
columns_from_joined_table.clear();
columns_added_by_join.clear();
original_names.clear();
renames.clear();
}
void TableJoin::addUsingKey(const ASTPtr & ast)
{
key_names_left.push_back(ast->getColumnName());

View File

@ -112,6 +112,7 @@ public:
const String & temporaryFilesCodec() const { return temporary_files_codec; }
bool enablePartialMergeJoinOptimizations() const { return partial_merge_join_optimizations; }
void resetCollected();
void addUsingKey(const ASTPtr & ast);
void addOnKeys(ASTPtr & left_table_ast, ASTPtr & right_table_ast);

View File

@ -1,5 +1,7 @@
#pragma once
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
@ -18,4 +20,14 @@ inline bool functionIsLikeOperator(const std::string & name)
return name == "like" || name == "notLike";
}
inline bool functionIsJoinGet(const std::string & name)
{
return name == "joinGet" || startsWith(name, "dictGet");
}
inline bool functionIsDictGet(const std::string & name)
{
return startsWith(name, "dictGet") || (name == "dictHas") || (name == "dictIsIn");
}
}

View File

@ -13,7 +13,6 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_COLLATION;
extern const int OPENCL_ERROR;
}
static bool isCollationRequired(const SortColumnDescription & description)
@ -134,19 +133,11 @@ void sortBlock(Block & block, const SortDescription & description, UInt64 limit)
else if (!isColumnConst(*column))
{
int nan_direction_hint = description[0].nulls_direction;
auto special_sort = description[0].special_sort;
/// If in Settings `special_sort` option has been set as `bitonic_sort`,
/// then via `nan_direction_hint` variable a flag which specifies bitonic sort as preferred
/// will be passed to `getPermutation` method with value 42.
if (description[0].special_sort == SpecialSort::OPENCL_BITONIC)
{
#ifdef USE_OPENCL
nan_direction_hint = 42;
#else
throw DB::Exception("Bitonic sort specified as preferred, but OpenCL not available", DB::ErrorCodes::OPENCL_ERROR);
#endif
}
if (special_sort == SpecialSort::OPENCL_BITONIC)
column->getSpecialPermutation(reverse, limit, nan_direction_hint, perm, IColumn::SpecialSort::OPENCL_BITONIC);
else
column->getPermutation(reverse, limit, nan_direction_hint, perm);
}
else

View File

@ -594,7 +594,7 @@ bool AlterCommand::isCommentAlter() const
return false;
}
std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(const StorageInMemoryMetadata & metadata) const
std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(StorageInMemoryMetadata & metadata) const
{
if (!isRequireMutationStage(metadata))
return {};
@ -637,6 +637,7 @@ std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(const S
}
result.ast = ast->clone();
apply(metadata);
return result;
}
@ -733,6 +734,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
auto all_columns = metadata.columns;
/// Default expression for all added/modified columns
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
NameSet modified_columns, renamed_columns;
for (size_t i = 0; i < size(); ++i)
{
const auto & command = (*this)[i];
@ -740,7 +742,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
const auto & column_name = command.column_name;
if (command.type == AlterCommand::ADD_COLUMN)
{
if (metadata.columns.has(column_name) || metadata.columns.hasNested(column_name))
if (all_columns.has(column_name) || all_columns.hasNested(column_name))
{
if (!command.if_not_exists)
throw Exception{"Cannot add column " + backQuote(column_name) + ": column with this name already exists",
@ -757,7 +759,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
}
else if (command.type == AlterCommand::MODIFY_COLUMN)
{
if (!metadata.columns.has(column_name))
if (!all_columns.has(column_name))
{
if (!command.if_exists)
throw Exception{"Wrong column name. Cannot find column " + backQuote(column_name) + " to modify",
@ -765,18 +767,23 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
else
continue;
}
if (renamed_columns.count(column_name))
throw Exception{"Cannot rename and modify the same column " + backQuote(column_name) + " in a single ALTER query",
ErrorCodes::NOT_IMPLEMENTED};
modified_columns.emplace(column_name);
}
else if (command.type == AlterCommand::DROP_COLUMN)
{
if (metadata.columns.has(command.column_name) || metadata.columns.hasNested(command.column_name))
if (all_columns.has(command.column_name) || all_columns.hasNested(command.column_name))
{
for (const ColumnDescription & column : metadata.columns)
for (const ColumnDescription & column : all_columns)
{
const auto & default_expression = column.default_desc.expression;
if (default_expression)
{
ASTPtr query = default_expression->clone();
auto syntax_result = SyntaxAnalyzer(context).analyze(query, metadata.columns.getAll());
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns.getAll());
const auto actions = ExpressionAnalyzer(query, syntax_result, context).getActions(true);
const auto required_columns = actions->getRequiredColumns();
@ -786,6 +793,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
ErrorCodes::ILLEGAL_COLUMN);
}
}
all_columns.remove(command.column_name);
}
else if (!command.if_exists)
throw Exception(
@ -794,7 +802,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
}
else if (command.type == AlterCommand::COMMENT_COLUMN)
{
if (!metadata.columns.has(command.column_name))
if (!all_columns.has(command.column_name))
{
if (!command.if_exists)
throw Exception{"Wrong column name. Cannot find column " + backQuote(command.column_name) + " to comment",
@ -842,6 +850,10 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
throw Exception{"Cannot rename to " + backQuote(command.rename_to) + ": column with this name already exists",
ErrorCodes::DUPLICATE_COLUMN};
if (modified_columns.count(column_name))
throw Exception{"Cannot rename and modify the same column " + backQuote(column_name) + " in a single ALTER query",
ErrorCodes::NOT_IMPLEMENTED};
String from_nested_table_name = Nested::extractTableName(command.column_name);
String to_nested_table_name = Nested::extractTableName(command.rename_to);
bool from_nested = from_nested_table_name != command.column_name;
@ -855,6 +867,8 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
else if (!from_nested && !to_nested)
{
all_columns.rename(command.column_name, command.rename_to);
renamed_columns.emplace(command.column_name);
renamed_columns.emplace(command.rename_to);
}
else
{
@ -886,9 +900,9 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
default_expr_list->children.emplace_back(setAlias(command.default_expression->clone(), tmp_column_name));
}
} /// if we change data type for column with default
else if (metadata.columns.has(column_name) && command.data_type)
else if (all_columns.has(column_name) && command.data_type)
{
auto column_in_table = metadata.columns.get(column_name);
auto column_in_table = all_columns.get(column_name);
/// Column doesn't have a default, nothing to check
if (!column_in_table.default_desc.expression)
continue;
@ -931,7 +945,7 @@ bool AlterCommands::isCommentAlter() const
}
MutationCommands AlterCommands::getMutationCommands(const StorageInMemoryMetadata & metadata) const
MutationCommands AlterCommands::getMutationCommands(StorageInMemoryMetadata metadata) const
{
MutationCommands result;
for (const auto & alter_cmd : *this)

View File

@ -121,7 +121,7 @@ struct AlterCommand
/// If possible, convert alter command to mutation command. In other case
/// return empty optional. Some storages may execute mutations after
/// metadata changes.
std::optional<MutationCommand> tryConvertToMutationCommand(const StorageInMemoryMetadata & metadata) const;
std::optional<MutationCommand> tryConvertToMutationCommand(StorageInMemoryMetadata & metadata) const;
};
/// Return string representation of AlterCommand::Type
@ -162,7 +162,7 @@ public:
/// Return mutation commands which some storages may execute as part of
/// alter. If alter can be performed is pure metadata update, than result is
/// empty.
MutationCommands getMutationCommands(const StorageInMemoryMetadata & metadata) const;
MutationCommands getMutationCommands(StorageInMemoryMetadata metadata) const;
};
}

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Disks/createVolume.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <IO/HTTPCommon.h>
@ -115,7 +116,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
{
String file_name = it.first;
auto disk = part->disk;
auto disk = part->volume->getDisk();
String path = part->getFullRelativePath() + file_name;
UInt64 size = disk->getFileSize(path);
@ -316,7 +317,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
assertEOF(in);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, reservation->getDisk(), part_relative_path);
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path);
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);

View File

@ -137,11 +137,11 @@ void IMergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
IMergeTreeDataPart::IMergeTreeDataPart(
MergeTreeData & storage_, const String & name_, const DiskPtr & disk_, const std::optional<String> & relative_path_, Type part_type_)
MergeTreeData & storage_, const String & name_, const VolumePtr & volume_, const std::optional<String> & relative_path_, Type part_type_)
: storage(storage_)
, name(name_)
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
, disk(disk_)
, volume(volume_)
, relative_path(relative_path_.value_or(name_))
, index_granularity_info(storage_, part_type_)
, part_type(part_type_)
@ -152,13 +152,13 @@ IMergeTreeDataPart::IMergeTreeDataPart(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskPtr & disk_,
const VolumePtr & volume_,
const std::optional<String> & relative_path_,
Type part_type_)
: storage(storage_)
, name(name_)
, info(info_)
, disk(disk_)
, volume(volume_)
, relative_path(relative_path_.value_or(name_))
, index_granularity_info(storage_, part_type_)
, part_type(part_type_)
@ -245,7 +245,7 @@ void IMergeTreeDataPart::removeIfNeeded()
{
auto path = getFullRelativePath();
if (!disk->exists(path))
if (!volume->getDisk()->exists(path))
return;
if (is_temp)
@ -392,7 +392,7 @@ String IMergeTreeDataPart::getFullPath() const
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
return storage.getFullPathOnDisk(disk) + relative_path + "/";
return storage.getFullPathOnDisk(volume->getDisk()) + relative_path + "/";
}
String IMergeTreeDataPart::getFullRelativePath() const
@ -452,7 +452,7 @@ void IMergeTreeDataPart::loadIndex()
}
String index_path = getFullRelativePath() + "primary.idx";
auto index_file = openForReading(disk, index_path);
auto index_file = openForReading(volume->getDisk(), index_path);
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) //-V756
for (size_t j = 0; j < key_size; ++j)
@ -468,7 +468,7 @@ void IMergeTreeDataPart::loadIndex()
}
if (!index_file->eof())
throw Exception("Index file " + fullPath(disk, index_path) + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
throw Exception("Index file " + fullPath(volume->getDisk(), index_path) + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
index.assign(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
}
@ -489,9 +489,9 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
else
{
String path = getFullRelativePath();
partition.load(storage, disk, path);
partition.load(storage, volume->getDisk(), path);
if (!isEmpty())
minmax_idx.load(storage, disk, path);
minmax_idx.load(storage, volume->getDisk(), path);
}
String calculated_partition_id = partition.getID(storage.partition_key_sample);
@ -505,23 +505,23 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
void IMergeTreeDataPart::loadChecksums(bool require)
{
String path = getFullRelativePath() + "checksums.txt";
if (disk->exists(path))
if (volume->getDisk()->exists(path))
{
auto buf = openForReading(disk, path);
auto buf = openForReading(volume->getDisk(), path);
if (checksums.read(*buf))
{
assertEOF(*buf);
bytes_on_disk = checksums.getTotalSizeOnDisk();
}
else
bytes_on_disk = calculateTotalSizeOnDisk(disk, getFullRelativePath());
bytes_on_disk = calculateTotalSizeOnDisk(volume->getDisk(), getFullRelativePath());
}
else
{
if (require)
throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
bytes_on_disk = calculateTotalSizeOnDisk(disk, getFullRelativePath());
bytes_on_disk = calculateTotalSizeOnDisk(volume->getDisk(), getFullRelativePath());
}
}
@ -534,10 +534,10 @@ void IMergeTreeDataPart::loadRowsCount()
}
else if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || part_type == Type::COMPACT)
{
if (!disk->exists(path))
if (!volume->getDisk()->exists(path))
throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
auto buf = openForReading(disk, path);
auto buf = openForReading(volume->getDisk(), path);
readIntText(rows_count, *buf);
assertEOF(*buf);
}
@ -582,9 +582,9 @@ void IMergeTreeDataPart::loadRowsCount()
void IMergeTreeDataPart::loadTTLInfos()
{
String path = getFullRelativePath() + "ttl.txt";
if (disk->exists(path))
if (volume->getDisk()->exists(path))
{
auto in = openForReading(disk, path);
auto in = openForReading(volume->getDisk(), path);
assertString("ttl format version: ", *in);
size_t format_version;
readText(format_version, *in);
@ -609,7 +609,7 @@ void IMergeTreeDataPart::loadTTLInfos()
void IMergeTreeDataPart::loadColumns(bool require)
{
String path = getFullRelativePath() + "columns.txt";
if (!disk->exists(path))
if (!volume->getDisk()->exists(path))
{
/// We can get list of columns only from columns.txt in compact parts.
if (require || part_type == Type::COMPACT)
@ -617,21 +617,21 @@ void IMergeTreeDataPart::loadColumns(bool require)
/// If there is no file with a list of columns, write it down.
for (const NameAndTypePair & column : storage.getColumns().getAllPhysical())
if (disk->exists(getFullRelativePath() + getFileNameForColumn(column) + ".bin"))
if (volume->getDisk()->exists(getFullRelativePath() + getFileNameForColumn(column) + ".bin"))
columns.push_back(column);
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
{
auto buf = disk->writeFile(path + ".tmp", 4096);
auto buf = volume->getDisk()->writeFile(path + ".tmp", 4096);
columns.writeText(*buf);
}
disk->moveFile(path + ".tmp", path);
volume->getDisk()->moveFile(path + ".tmp", path);
}
else
{
columns.readText(*disk->readFile(path));
columns.readText(*volume->getDisk()->readFile(path));
}
size_t pos = 0;
@ -659,29 +659,29 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
String from = getFullRelativePath();
String to = storage.relative_data_path + new_relative_path + "/";
if (!disk->exists(from))
throw Exception("Part directory " + fullPath(disk, from) + " doesn't exist. Most likely it is logical error.", ErrorCodes::FILE_DOESNT_EXIST);
if (!volume->getDisk()->exists(from))
throw Exception("Part directory " + fullPath(volume->getDisk(), from) + " doesn't exist. Most likely it is logical error.", ErrorCodes::FILE_DOESNT_EXIST);
if (disk->exists(to))
if (volume->getDisk()->exists(to))
{
if (remove_new_dir_if_exists)
{
Names files;
disk->listFiles(to, files);
volume->getDisk()->listFiles(to, files);
LOG_WARNING(storage.log, "Part directory " << fullPath(disk, to) << " already exists"
LOG_WARNING(storage.log, "Part directory " << fullPath(volume->getDisk(), to) << " already exists"
<< " and contains " << files.size() << " files. Removing it.");
disk->removeRecursive(to);
volume->getDisk()->removeRecursive(to);
}
else
{
throw Exception("Part directory " + fullPath(disk, to) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
throw Exception("Part directory " + fullPath(volume->getDisk(), to) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
}
}
disk->setLastModified(from, Poco::Timestamp::fromEpochTime(time(nullptr)));
disk->moveFile(from, to);
volume->getDisk()->setLastModified(from, Poco::Timestamp::fromEpochTime(time(nullptr)));
volume->getDisk()->moveFile(from, to);
relative_path = new_relative_path;
}
@ -710,29 +710,29 @@ void IMergeTreeDataPart::remove() const
String to = storage.relative_data_path + "delete_tmp_" + name;
// TODO directory delete_tmp_<name> is never removed if server crashes before returning from this function
if (disk->exists(to))
if (volume->getDisk()->exists(to))
{
LOG_WARNING(storage.log, "Directory " << fullPath(disk, to) << " (to which part must be renamed before removing) already exists."
LOG_WARNING(storage.log, "Directory " << fullPath(volume->getDisk(), to) << " (to which part must be renamed before removing) already exists."
" Most likely this is due to unclean restart. Removing it.");
try
{
disk->removeRecursive(to + "/");
volume->getDisk()->removeRecursive(to + "/");
}
catch (...)
{
LOG_ERROR(storage.log, "Cannot recursively remove directory " << fullPath(disk, to) << ". Exception: " << getCurrentExceptionMessage(false));
LOG_ERROR(storage.log, "Cannot recursively remove directory " << fullPath(volume->getDisk(), to) << ". Exception: " << getCurrentExceptionMessage(false));
throw;
}
}
try
{
disk->moveFile(from, to);
volume->getDisk()->moveFile(from, to);
}
catch (const Poco::FileNotFoundException &)
{
LOG_ERROR(storage.log, "Directory " << fullPath(disk, to) << " (part to remove) doesn't exist or one of nested files has gone."
LOG_ERROR(storage.log, "Directory " << fullPath(volume->getDisk(), to) << " (part to remove) doesn't exist or one of nested files has gone."
" Most likely this is due to manual removing. This should be discouraged. Ignoring.");
return;
@ -741,7 +741,7 @@ void IMergeTreeDataPart::remove() const
if (checksums.empty())
{
/// If the part is not completely written, we cannot use fast path by listing files.
disk->removeRecursive(to + "/");
volume->getDisk()->removeRecursive(to + "/");
}
else
{
@ -754,25 +754,25 @@ void IMergeTreeDataPart::remove() const
# pragma GCC diagnostic ignored "-Wunused-variable"
#endif
for (const auto & [file, _] : checksums.files)
disk->remove(to + "/" + file);
volume->getDisk()->remove(to + "/" + file);
#if !__clang__
# pragma GCC diagnostic pop
#endif
for (const auto & file : {"checksums.txt", "columns.txt"})
disk->remove(to + "/" + file);
disk->removeIfExists(to + "/" + DELETE_ON_DESTROY_MARKER_PATH);
volume->getDisk()->remove(to + "/" + file);
volume->getDisk()->removeIfExists(to + "/" + DELETE_ON_DESTROY_MARKER_PATH);
disk->remove(to);
volume->getDisk()->remove(to);
}
catch (...)
{
/// Recursive directory removal does many excessive "stat" syscalls under the hood.
LOG_ERROR(storage.log, "Cannot quickly remove directory " << fullPath(disk, to) << " by removing files; fallback to recursive removal. Reason: "
LOG_ERROR(storage.log, "Cannot quickly remove directory " << fullPath(volume->getDisk(), to) << " by removing files; fallback to recursive removal. Reason: "
<< getCurrentExceptionMessage(false));
disk->removeRecursive(to + "/");
volume->getDisk()->removeRecursive(to + "/");
}
}
}
@ -793,7 +793,7 @@ String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix)
{
res = "detached/" + (prefix.empty() ? "" : prefix + "_") + name + (try_no ? "_try" + DB::toString(try_no) : "");
if (!disk->exists(getFullRelativePath() + res))
if (!volume->getDisk()->exists(getFullRelativePath() + res))
return res;
LOG_WARNING(storage.log, "Directory " << res << " (to detach to) already exists."
@ -817,16 +817,16 @@ void IMergeTreeDataPart::makeCloneInDetached(const String & prefix) const
String destination_path = storage.relative_data_path + getRelativePathForDetachedPart(prefix);
/// Backup is not recursive (max_level is 0), so do not copy inner directories
localBackup(disk, getFullRelativePath(), destination_path, 0);
disk->removeIfExists(destination_path + "/" + DELETE_ON_DESTROY_MARKER_PATH);
localBackup(volume->getDisk(), getFullRelativePath(), destination_path, 0);
volume->getDisk()->removeIfExists(destination_path + "/" + DELETE_ON_DESTROY_MARKER_PATH);
}
void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservation) const
{
assertOnDisk();
auto reserved_disk = reservation->getDisk();
if (reserved_disk->getName() == disk->getName())
throw Exception("Can not clone data part " + name + " to same disk " + disk->getName(), ErrorCodes::LOGICAL_ERROR);
if (reserved_disk->getName() == volume->getDisk()->getName())
throw Exception("Can not clone data part " + name + " to same disk " + volume->getDisk()->getName(), ErrorCodes::LOGICAL_ERROR);
String path_to_clone = storage.relative_data_path + "detached/";
@ -834,8 +834,8 @@ void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservat
throw Exception("Path " + fullPath(reserved_disk, path_to_clone + relative_path) + " already exists. Can not clone ", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
reserved_disk->createDirectory(path_to_clone);
disk->copy(getFullRelativePath(), reserved_disk, path_to_clone);
disk->removeIfExists(path_to_clone + "/" + DELETE_ON_DESTROY_MARKER_PATH);
volume->getDisk()->copy(getFullRelativePath(), reserved_disk, path_to_clone);
volume->getDisk()->removeIfExists(path_to_clone + "/" + DELETE_ON_DESTROY_MARKER_PATH);
}
void IMergeTreeDataPart::checkConsistencyBase() const
@ -865,7 +865,7 @@ void IMergeTreeDataPart::checkConsistencyBase() const
}
}
checksums.checkSizes(disk, path);
checksums.checkSizes(volume->getDisk(), path);
}
else
{
@ -879,17 +879,17 @@ void IMergeTreeDataPart::checkConsistencyBase() const
/// Check that the primary key index is not empty.
if (!storage.primary_key_columns.empty())
check_file_not_empty(disk, path + "primary.idx");
check_file_not_empty(volume->getDisk(), path + "primary.idx");
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
check_file_not_empty(disk, path + "count.txt");
check_file_not_empty(volume->getDisk(), path + "count.txt");
if (storage.partition_key_expr)
check_file_not_empty(disk, path + "partition.dat");
check_file_not_empty(volume->getDisk(), path + "partition.dat");
for (const String & col_name : storage.minmax_idx_columns)
check_file_not_empty(disk, path + "minmax_" + escapeForFileName(col_name) + ".idx");
check_file_not_empty(volume->getDisk(), path + "minmax_" + escapeForFileName(col_name) + ".idx");
}
}
}

View File

@ -31,6 +31,9 @@ struct FutureMergedMutatedPart;
class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
class IMergeTreeReader;
class IMergeTreeDataPartWriter;
@ -60,14 +63,14 @@ public:
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskPtr & disk,
const VolumePtr & volume,
const std::optional<String> & relative_path,
Type part_type_);
IMergeTreeDataPart(
MergeTreeData & storage_,
const String & name_,
const DiskPtr & disk,
const VolumePtr & volume,
const std::optional<String> & relative_path,
Type part_type_);
@ -155,7 +158,7 @@ public:
String name;
MergeTreePartInfo info;
DiskPtr disk;
VolumePtr volume;
mutable String relative_path;
MergeTreeIndexGranularityInfo index_granularity_info;

View File

@ -63,18 +63,16 @@ void IMergeTreeDataPartWriter::Stream::addToChecksums(MergeTreeData::DataPart::C
IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
DiskPtr disk_,
const String & part_path_,
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: disk(std::move(disk_))
, part_path(part_path_)
, storage(storage_)
: data_part(data_part_)
, part_path(data_part_->getFullRelativePath())
, storage(data_part_->storage)
, columns_list(columns_list_)
, marks_file_extension(marks_file_extension_)
, index_granularity(index_granularity_)
@ -87,6 +85,7 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
if (settings.blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
auto disk = data_part->volume->getDisk();
if (!disk->exists(part_path))
disk->createDirectories(part_path);
}
@ -165,7 +164,7 @@ void IMergeTreeDataPartWriter::initPrimaryIndex()
{
if (storage.hasPrimaryKey())
{
index_file_stream = disk->writeFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
index_file_stream = data_part->volume->getDisk()->writeFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
}
@ -180,7 +179,7 @@ void IMergeTreeDataPartWriter::initSkipIndices()
skip_indices_streams.emplace_back(
std::make_unique<IMergeTreeDataPartWriter::Stream>(
stream_name,
disk,
data_part->volume->getDisk(),
part_path + stream_name, INDEX_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size,

View File

@ -61,9 +61,7 @@ public:
using StreamPtr = std::unique_ptr<Stream>;
IMergeTreeDataPartWriter(
DiskPtr disk,
const String & part_path,
const MergeTreeData & storage,
const MergeTreeData::DataPartPtr & data_part,
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const String & marks_file_extension,
@ -118,7 +116,7 @@ protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::unordered_map<String, SerializationState>;
DiskPtr disk;
MergeTreeData::DataPartPtr data_part;
String part_path;
const MergeTreeData & storage;
NamesAndTypesList columns_list;

View File

@ -9,7 +9,7 @@ namespace DB
IMergedBlockOutputStream::IMergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part)
: storage(data_part->storage)
, disk(data_part->disk)
, volume(data_part->volume)
, part_path(data_part->getFullRelativePath())
{
}
@ -82,7 +82,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
{
if (checksums.files.count(removed_file))
{
data_part->disk->remove(data_part->getFullRelativePath() + removed_file);
data_part->volume->getDisk()->remove(data_part->getFullRelativePath() + removed_file);
checksums.files.erase(removed_file);
}
}

View File

@ -37,7 +37,7 @@ protected:
protected:
const MergeTreeData & storage;
DiskPtr disk;
VolumePtr volume;
String part_path;
static Block getBlockAndPermute(const Block & block, const Names & names, const IColumn::Permutation * permutation);

View File

@ -929,7 +929,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
return;
auto part = createPart(part_name, part_info, part_disk_ptr, part_name);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr);
auto part = createPart(part_name, part_info, single_disk_volume, part_name);
bool broken = false;
String part_path = relative_data_path + "/" + part_name;
@ -1552,12 +1553,12 @@ MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_uncompressed, s
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
const DiskPtr & disk, const String & relative_path) const
const VolumePtr & volume, const String & relative_path) const
{
if (type == MergeTreeDataPartType::COMPACT)
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, disk, relative_path);
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, volume, relative_path);
else if (type == MergeTreeDataPartType::WIDE)
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, disk, relative_path);
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, volume, relative_path);
else
throw Exception("Unknown type in part " + relative_path, ErrorCodes::UNKNOWN_PART_TYPE);
}
@ -1575,18 +1576,18 @@ static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const String & name, const DiskPtr & disk, const String & relative_path) const
const String & name, const VolumePtr & volume, const String & relative_path) const
{
return createPart(name, MergeTreePartInfo::fromPartName(name, format_version), disk, relative_path);
return createPart(name, MergeTreePartInfo::fromPartName(name, format_version), volume, relative_path);
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const String & name, const MergeTreePartInfo & part_info,
const DiskPtr & disk, const String & relative_path) const
const VolumePtr & volume, const String & relative_path) const
{
MergeTreeDataPartType type;
auto full_path = relative_data_path + relative_path + "/";
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(disk, full_path);
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(volume->getDisk(), full_path);
if (mrk_ext)
type = getPartTypeFromMarkExtension(*mrk_ext);
@ -1596,7 +1597,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
type = choosePartType(0, 0);
}
return createPart(name, type, part_info, disk, relative_path);
return createPart(name, type, part_info, volume, relative_path);
}
void MergeTreeData::changeSettings(
@ -2314,7 +2315,7 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
auto part_it = data_parts_indexes.insert(part_copy).first;
modifyPartState(part_it, DataPartState::Committed);
auto disk = original_active_part->disk;
auto disk = original_active_part->volume->getDisk();
String marker_path = original_active_part->getFullRelativePath() + DELETE_ON_DESTROY_MARKER_PATH;
try
{
@ -2379,7 +2380,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
{
auto disk = part->disk;
auto disk = part->volume->getDisk();
String full_part_path = part->getFullRelativePath();
/// Earlier the list of columns was written incorrectly. Delete it and re-create.
@ -2404,9 +2405,9 @@ static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
}
}
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path) const
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const VolumePtr & volume, const String & relative_path) const
{
MutableDataPartPtr part = createPart(Poco::Path(relative_path).getFileName(), disk, relative_path);
MutableDataPartPtr part = createPart(Poco::Path(relative_path).getFileName(), volume, relative_path);
loadPartAndFixMetadataImpl(part);
return part;
}
@ -2519,7 +2520,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
parts.erase(std::remove_if(parts.begin(), parts.end(), [&](auto part_ptr)
{
return part_ptr->disk->getName() == disk->getName();
return part_ptr->volume->getDisk()->getName() == disk->getName();
}), parts.end());
if (parts.empty())
@ -2570,9 +2571,9 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
parts.erase(std::remove_if(parts.begin(), parts.end(), [&](auto part_ptr)
{
for (const auto & disk : volume->disks)
for (const auto & disk : volume->getDisks())
{
if (part_ptr->disk->getName() == disk->getName())
if (part_ptr->volume->getDisk()->getName() == disk->getName())
{
return true;
}
@ -2848,7 +2849,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (const auto & part_names : renamed_parts.old_and_new_names)
{
LOG_DEBUG(log, "Checking part " << part_names.second);
MutableDataPartPtr part = createPart(part_names.first, name_to_disk[part_names.first], source_dir + part_names.second);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_names.first, name_to_disk[part_names.first]);
MutableDataPartPtr part = createPart(part_names.first, single_disk_volume, source_dir + part_names.second);
loadPartAndFixMetadataImpl(part);
loaded_parts.push_back(part);
}
@ -2962,12 +2964,12 @@ bool MergeTreeData::TTLEntry::isPartInDestination(StoragePolicyPtr policy, const
{
if (destination_type == PartDestinationType::VOLUME)
{
for (const auto & disk : policy->getVolumeByName(destination_name)->disks)
if (disk->getName() == part.disk->getName())
for (const auto & disk : policy->getVolumeByName(destination_name)->getDisks())
if (disk->getName() == part.volume->getDisk()->getName())
return true;
}
else if (destination_type == PartDestinationType::DISK)
return policy->getDiskByName(destination_name)->getName() == part.disk->getName();
return policy->getDiskByName(destination_name)->getName() == part.volume->getDisk()->getName();
return false;
}
@ -3181,7 +3183,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
bool does_storage_policy_allow_same_disk = false;
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
{
if (disk->getName() == src_part->disk->getName())
if (disk->getName() == src_part->volume->getDisk()->getName())
{
does_storage_policy_allow_same_disk = true;
break;
@ -3194,7 +3196,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto reservation = reserveSpace(src_part->getBytesOnDisk(), src_part->disk);
auto reservation = reserveSpace(src_part->getBytesOnDisk(), src_part->volume->getDisk());
auto disk = reservation->getDisk();
String src_part_path = src_part->getFullRelativePath();
String dst_part_path = relative_data_path + tmp_dst_part_name;
@ -3206,7 +3208,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
localBackup(disk, src_part_path, dst_part_path);
disk->removeIfExists(dst_part_path + "/" + DELETE_ON_DESTROY_MARKER_PATH);
auto dst_data_part = createPart(dst_part_name, dst_part_info, reservation->getDisk(), tmp_dst_part_name);
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk);
auto dst_data_part = createPart(dst_part_name, dst_part_info, single_disk_volume, tmp_dst_part_name);
dst_data_part->is_temp = true;
@ -3278,7 +3281,7 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
if (!matcher(part))
continue;
part->disk->createDirectories(shadow_path);
part->volume->getDisk()->createDirectories(shadow_path);
String backup_path = shadow_path
+ (!with_name.empty()
@ -3289,8 +3292,8 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
LOG_DEBUG(log, "Freezing part " << part->name << " snapshot will be placed at " + backup_path);
String backup_part_path = backup_path + relative_data_path + part->relative_path;
localBackup(part->disk, part->getFullRelativePath(), backup_part_path);
part->disk->removeIfExists(backup_part_path + "/" + DELETE_ON_DESTROY_MARKER_PATH);
localBackup(part->volume->getDisk(), part->getFullRelativePath(), backup_part_path);
part->volume->getDisk()->removeIfExists(backup_part_path + "/" + DELETE_ON_DESTROY_MARKER_PATH);
part->is_frozen.store(true, std::memory_order_relaxed);
++parts_processed;
@ -3411,7 +3414,7 @@ bool MergeTreeData::areBackgroundMovesNeeded() const
if (policy->getVolumes().size() > 1)
return true;
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->disks.size() > 1 && !move_ttl_entries.empty();
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1 && !move_ttl_entries.empty();
}
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space)

View File

@ -194,14 +194,14 @@ public:
/// After this method setColumns must be called
MutableDataPartPtr createPart(const String & name,
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
const DiskPtr & disk, const String & relative_path) const;
const VolumePtr & volume, const String & relative_path) const;
/// After this methods 'loadColumnsChecksumsIndexes' must be called
MutableDataPartPtr createPart(const String & name,
const DiskPtr & disk, const String & relative_path) const;
const VolumePtr & volume, const String & relative_path) const;
MutableDataPartPtr createPart(const String & name, const MergeTreePartInfo & part_info,
const DiskPtr & disk, const String & relative_path) const;
const VolumePtr & volume, const String & relative_path) const;
/// Auxiliary object to add a set of parts into the working set in two steps:
/// * First, as PreCommitted parts (the parts are ready, but not yet in the active set).
@ -539,7 +539,7 @@ public:
bool hasAnyTTL() const override { return hasRowsTTL() || hasAnyMoveTTL() || hasAnyColumnTTL(); }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path) const;
MutableDataPartPtr loadPartAndFixMetadata(const VolumePtr & volume, const String & relative_path) const;
/** Create local backup (snapshot) for parts with specified prefix.
* Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,

Some files were not shown because too many files have changed in this diff Show More