mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge branch 'master' into fix_aggregation_ttl
This commit is contained in:
commit
b11254d191
@ -184,10 +184,27 @@ endif ()
|
||||
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -rdynamic")
|
||||
|
||||
find_program (OBJCOPY_PATH NAMES "llvm-objcopy" "llvm-objcopy-12" "llvm-objcopy-11" "llvm-objcopy-10" "llvm-objcopy-9" "llvm-objcopy-8" "objcopy")
|
||||
|
||||
if (NOT OBJCOPY_PATH AND OS_DARWIN)
|
||||
find_program (BREW_PATH NAMES "brew")
|
||||
if (BREW_PATH)
|
||||
execute_process (COMMAND ${BREW_PATH} --prefix llvm ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE LLVM_PREFIX)
|
||||
if (LLVM_PREFIX)
|
||||
find_program (OBJCOPY_PATH NAMES "llvm-objcopy" PATHS "${LLVM_PREFIX}/bin" NO_DEFAULT_PATH)
|
||||
endif ()
|
||||
if (NOT OBJCOPY_PATH)
|
||||
execute_process (COMMAND ${BREW_PATH} --prefix binutils ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE BINUTILS_PREFIX)
|
||||
if (BINUTILS_PREFIX)
|
||||
find_program (OBJCOPY_PATH NAMES "objcopy" PATHS "${BINUTILS_PREFIX}/bin" NO_DEFAULT_PATH)
|
||||
endif ()
|
||||
endif ()
|
||||
endif ()
|
||||
endif ()
|
||||
|
||||
if (OBJCOPY_PATH)
|
||||
message(STATUS "Using objcopy: ${OBJCOPY_PATH}.")
|
||||
message (STATUS "Using objcopy: ${OBJCOPY_PATH}")
|
||||
else ()
|
||||
message(FATAL_ERROR "Cannot find objcopy.")
|
||||
message (FATAL_ERROR "Cannot find objcopy.")
|
||||
endif ()
|
||||
|
||||
if (OS_DARWIN)
|
||||
|
@ -1,9 +1,9 @@
|
||||
# This strings autochanged from release_lib.sh:
|
||||
SET(VERSION_REVISION 54452)
|
||||
SET(VERSION_REVISION 54453)
|
||||
SET(VERSION_MAJOR 21)
|
||||
SET(VERSION_MINOR 7)
|
||||
SET(VERSION_MINOR 8)
|
||||
SET(VERSION_PATCH 1)
|
||||
SET(VERSION_GITHASH 976ccc2e908ac3bc28f763bfea8134ea0a121b40)
|
||||
SET(VERSION_DESCRIBE v21.7.1.1-prestable)
|
||||
SET(VERSION_STRING 21.7.1.1)
|
||||
SET(VERSION_GITHASH fb895056568e26200629c7d19626e92d2dedc70d)
|
||||
SET(VERSION_DESCRIBE v21.8.1.1-prestable)
|
||||
SET(VERSION_STRING 21.8.1.1)
|
||||
# end of autochange
|
||||
|
@ -33,51 +33,25 @@ macro(clickhouse_embed_binaries)
|
||||
message(FATAL_ERROR "The list of binary resources to embed may not be empty")
|
||||
endif()
|
||||
|
||||
# If cross-compiling, ensure we use the toolchain file and target the actual target architecture
|
||||
if (CMAKE_CROSSCOMPILING)
|
||||
set(CROSS_COMPILE_FLAGS --target=${CMAKE_C_COMPILER_TARGET})
|
||||
|
||||
# FIXME: find a way to properly pass all cross-compile flags to custom command in CMake
|
||||
if (CMAKE_SYSTEM_NAME STREQUAL "Darwin")
|
||||
list(APPEND CROSS_COMPILE_FLAGS -isysroot ${CMAKE_OSX_SYSROOT} -mmacosx-version-min=${CMAKE_OSX_DEPLOYMENT_TARGET})
|
||||
else ()
|
||||
list(APPEND CROSS_COMPILE_FLAGS -isysroot ${CMAKE_SYSROOT})
|
||||
endif ()
|
||||
else()
|
||||
set(CROSS_COMPILE_FLAGS "")
|
||||
endif()
|
||||
add_library("${EMBED_TARGET}" STATIC)
|
||||
set_target_properties("${EMBED_TARGET}" PROPERTIES LINKER_LANGUAGE C)
|
||||
|
||||
set(EMBED_TEMPLATE_FILE "${PROJECT_SOURCE_DIR}/programs/embed_binary.S.in")
|
||||
set(RESOURCE_OBJS)
|
||||
foreach(RESOURCE_FILE ${EMBED_RESOURCES})
|
||||
set(RESOURCE_OBJ "${RESOURCE_FILE}.o")
|
||||
list(APPEND RESOURCE_OBJS "${RESOURCE_OBJ}")
|
||||
|
||||
# Normalize the name of the resource
|
||||
foreach(RESOURCE_FILE ${EMBED_RESOURCES})
|
||||
set(ASSEMBLY_FILE_NAME "${RESOURCE_FILE}.S")
|
||||
set(BINARY_FILE_NAME "${RESOURCE_FILE}")
|
||||
|
||||
# Normalize the name of the resource.
|
||||
string(REGEX REPLACE "[\./-]" "_" SYMBOL_NAME "${RESOURCE_FILE}") # - must be last in regex
|
||||
string(REPLACE "+" "_PLUS_" SYMBOL_NAME "${SYMBOL_NAME}")
|
||||
set(ASSEMBLY_FILE_NAME "${RESOURCE_FILE}.S")
|
||||
|
||||
# Put the configured assembly file in the output directory.
|
||||
# This is so we can clean it up as usual, and we CD to the
|
||||
# source directory before compiling, so that the assembly
|
||||
# `.incbin` directive can find the file.
|
||||
# Generate the configured assembly file in the output directory.
|
||||
configure_file("${EMBED_TEMPLATE_FILE}" "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" @ONLY)
|
||||
|
||||
# Generate the output object file by compiling the assembly, in the directory of
|
||||
# the sources so that the resource file may also be found
|
||||
add_custom_command(
|
||||
OUTPUT ${RESOURCE_OBJ}
|
||||
COMMAND cd "${EMBED_RESOURCE_DIR}" &&
|
||||
${CMAKE_C_COMPILER} "${CROSS_COMPILE_FLAGS}" -c -o
|
||||
"${CMAKE_CURRENT_BINARY_DIR}/${RESOURCE_OBJ}"
|
||||
"${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}"
|
||||
COMMAND_EXPAND_LISTS
|
||||
)
|
||||
set_source_files_properties("${RESOURCE_OBJ}" PROPERTIES EXTERNAL_OBJECT true GENERATED true)
|
||||
endforeach()
|
||||
# Set the include directory for relative paths specified for `.incbin` directive.
|
||||
set_property(SOURCE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}" APPEND PROPERTY INCLUDE_DIRECTORIES "${EMBED_RESOURCE_DIR}")
|
||||
|
||||
add_library("${EMBED_TARGET}" STATIC ${RESOURCE_OBJS})
|
||||
set_target_properties("${EMBED_TARGET}" PROPERTIES LINKER_LANGUAGE C)
|
||||
target_sources("${EMBED_TARGET}" PRIVATE "${CMAKE_CURRENT_BINARY_DIR}/${ASSEMBLY_FILE_NAME}")
|
||||
endforeach()
|
||||
endmacro()
|
||||
|
4
debian/changelog
vendored
4
debian/changelog
vendored
@ -1,5 +1,5 @@
|
||||
clickhouse (21.7.1.1) unstable; urgency=low
|
||||
clickhouse (21.8.1.1) unstable; urgency=low
|
||||
|
||||
* Modified source code
|
||||
|
||||
-- clickhouse-release <clickhouse-release@yandex-team.ru> Thu, 20 May 2021 22:23:29 +0300
|
||||
-- clickhouse-release <clickhouse-release@yandex-team.ru> Mon, 28 Jun 2021 00:50:15 +0300
|
||||
|
@ -1,7 +1,7 @@
|
||||
FROM ubuntu:18.04
|
||||
|
||||
ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
|
||||
ARG version=21.7.1.*
|
||||
ARG version=21.8.1.*
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install --yes --no-install-recommends \
|
||||
|
@ -1,7 +1,7 @@
|
||||
FROM ubuntu:20.04
|
||||
|
||||
ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
|
||||
ARG version=21.7.1.*
|
||||
ARG version=21.8.1.*
|
||||
ARG gosu_ver=1.10
|
||||
|
||||
# set non-empty deb_location_url url to create a docker image
|
||||
|
@ -1,7 +1,7 @@
|
||||
FROM ubuntu:18.04
|
||||
|
||||
ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
|
||||
ARG version=21.7.1.*
|
||||
ARG version=21.8.1.*
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y apt-transport-https dirmngr && \
|
||||
|
@ -561,7 +561,7 @@ if args.report == 'main':
|
||||
# Don't show mildly unstable queries, only the very unstable ones we
|
||||
# treat as errors.
|
||||
if very_unstable_queries:
|
||||
if very_unstable_queries > 3:
|
||||
if very_unstable_queries > 5:
|
||||
error_tests += very_unstable_queries
|
||||
status = 'failure'
|
||||
message_array.append(str(very_unstable_queries) + ' unstable')
|
||||
|
@ -33,7 +33,7 @@ Reboot.
|
||||
|
||||
``` bash
|
||||
brew update
|
||||
brew install cmake ninja libtool gettext llvm gcc
|
||||
brew install cmake ninja libtool gettext llvm gcc binutils
|
||||
```
|
||||
|
||||
## Checkout ClickHouse Sources {#checkout-clickhouse-sources}
|
||||
|
@ -0,0 +1,53 @@
|
||||
---
|
||||
toc_priority: 12
|
||||
toc_title: ExternalDistributed
|
||||
---
|
||||
|
||||
# ExternalDistributed {#externaldistributed}
|
||||
|
||||
The `ExternalDistributed` engine allows to perform `SELECT` queries on data that is stored on a remote servers MySQL or PostgreSQL. Accepts [MySQL](../../../engines/table-engines/integrations/mysql.md) or [PostgreSQL](../../../engines/table-engines/integrations/postgresql.md) engines as an argument so sharding is possible.
|
||||
|
||||
## Creating a Table {#creating-a-table}
|
||||
|
||||
``` sql
|
||||
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
||||
(
|
||||
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
|
||||
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
|
||||
...
|
||||
) ENGINE = ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password');
|
||||
```
|
||||
|
||||
See a detailed description of the [CREATE TABLE](../../../sql-reference/statements/create/table.md#create-table-query) query.
|
||||
|
||||
The table structure can differ from the original table structure:
|
||||
|
||||
- Column names should be the same as in the original table, but you can use just some of these columns and in any order.
|
||||
- Column types may differ from those in the original table. ClickHouse tries to [cast](../../../sql-reference/functions/type-conversion-functions.md#type_conversion_function-cast) values to the ClickHouse data types.
|
||||
|
||||
**Engine Parameters**
|
||||
|
||||
- `engine` — The table engine `MySQL` or `PostgreSQL`.
|
||||
- `host:port` — MySQL or PostgreSQL server address.
|
||||
- `database` — Remote database name.
|
||||
- `table` — Remote table name.
|
||||
- `user` — User name.
|
||||
- `password` — User password.
|
||||
|
||||
## Implementation Details {#implementation-details}
|
||||
|
||||
Supports multiple replicas that must be listed by `|` and shards must be listed by `,`. For example:
|
||||
|
||||
```sql
|
||||
CREATE TABLE test_shards (id UInt32, name String, age UInt32, money UInt32) ENGINE = ExternalDistributed('MySQL', `mysql{1|2}:3306,mysql{3|4}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');
|
||||
```
|
||||
|
||||
When specifying replicas, one of the available replicas is selected for each of the shards when reading. If the connection fails, the next replica is selected, and so on for all the replicas. If the connection attempt fails for all the replicas, the attempt is repeated the same way several times.
|
||||
|
||||
You can specify any number of shards and any number of replicas for each shard.
|
||||
|
||||
**See Also**
|
||||
|
||||
- [MySQL table engine](../../../engines/table-engines/integrations/mysql.md)
|
||||
- [PostgreSQL table engine](../../../engines/table-engines/integrations/postgresql.md)
|
||||
- [Distributed table engine](../../../engines/table-engines/special/distributed.md)
|
@ -28,8 +28,8 @@ See a detailed description of the [CREATE TABLE](../../../sql-reference/statemen
|
||||
The table structure can differ from the original MySQL table structure:
|
||||
|
||||
- Column names should be the same as in the original MySQL table, but you can use just some of these columns and in any order.
|
||||
- Column types may differ from those in the original MySQL table. ClickHouse tries to [cast](../../../sql-reference/functions/type-conversion-functions.md#type_conversion_function-cast) values to the ClickHouse data types.
|
||||
- Setting `external_table_functions_use_nulls` defines how to handle Nullable columns. Default is true, if false - table function will not make nullable columns and will insert default values instead of nulls. This is also applicable for null values inside array data types.
|
||||
- Column types may differ from those in the original MySQL table. ClickHouse tries to [cast](../../../engines/database-engines/mysql.md#data_types-support) values to the ClickHouse data types.
|
||||
- The [external_table_functions_use_nulls](../../../operations/settings/settings.md#external-table-functions-use-nulls) setting defines how to handle Nullable columns. Default value: 1. If 0, the table function does not make Nullable columns and inserts default values instead of nulls. This is also applicable for NULL values inside arrays.
|
||||
|
||||
**Engine Parameters**
|
||||
|
||||
@ -55,6 +55,12 @@ Simple `WHERE` clauses such as `=, !=, >, >=, <, <=` are executed on the MySQL s
|
||||
|
||||
The rest of the conditions and the `LIMIT` sampling constraint are executed in ClickHouse only after the query to MySQL finishes.
|
||||
|
||||
Supports multiple replicas that must be listed by `|`. For example:
|
||||
|
||||
```sql
|
||||
CREATE TABLE test_replicas (id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL(`mysql{2|3|4}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');
|
||||
```
|
||||
|
||||
## Usage Example {#usage-example}
|
||||
|
||||
Table in MySQL:
|
||||
|
@ -29,7 +29,7 @@ The table structure can differ from the source table structure:
|
||||
|
||||
- Column names should be the same as in the source table, but you can use just some of these columns and in any order.
|
||||
- Column types may differ from those in the source table. ClickHouse tries to [cast](../../../sql-reference/functions/type-conversion-functions.md#type_conversion_function-cast) values to the ClickHouse data types.
|
||||
- Setting `external_table_functions_use_nulls` defines how to handle Nullable columns. Default is true, if false - table function will not make nullable columns and will insert default values instead of nulls. This is also applicable for null values inside array data types.
|
||||
- The [external_table_functions_use_nulls](../../../operations/settings/settings.md#external-table-functions-use-nulls) setting defines how to handle Nullable columns. Default value: 1. If 0, the table function does not make Nullable columns and inserts default values instead of nulls. This is also applicable for NULL values inside arrays.
|
||||
|
||||
**Engine Parameters**
|
||||
|
||||
|
@ -23,8 +23,8 @@ See a detailed description of the [CREATE TABLE](../../../sql-reference/statemen
|
||||
The table structure can differ from the original PostgreSQL table structure:
|
||||
|
||||
- Column names should be the same as in the original PostgreSQL table, but you can use just some of these columns and in any order.
|
||||
- Column types may differ from those in the original PostgreSQL table. ClickHouse tries to [cast](../../../sql-reference/functions/type-conversion-functions.md#type_conversion_function-cast) values to the ClickHouse data types.
|
||||
- Setting `external_table_functions_use_nulls` defines how to handle Nullable columns. Default is 1, if 0 - table function will not make nullable columns and will insert default values instead of nulls. This is also applicable for null values inside array data types.
|
||||
- Column types may differ from those in the original PostgreSQL table. ClickHouse tries to [cast](../../../engines/database-engines/postgresql.md#data_types-support) values to the ClickHouse data types.
|
||||
- The [external_table_functions_use_nulls](../../../operations/settings/settings.md#external-table-functions-use-nulls) setting defines how to handle Nullable columns. Default value: 1. If 0, the table function does not make Nullable columns and inserts default values instead of nulls. This is also applicable for NULL values inside arrays.
|
||||
|
||||
**Engine Parameters**
|
||||
|
||||
@ -49,6 +49,12 @@ PostgreSQL `Array` types are converted into ClickHouse arrays.
|
||||
|
||||
!!! info "Note"
|
||||
Be careful - in PostgreSQL an array data, created like a `type_name[]`, may contain multi-dimensional arrays of different dimensions in different table rows in same column. But in ClickHouse it is only allowed to have multidimensional arrays of the same count of dimensions in all table rows in same column.
|
||||
|
||||
Supports multiple replicas that must be listed by `|`. For example:
|
||||
|
||||
```sql
|
||||
CREATE TABLE test_replicas (id UInt32, name String) ENGINE = PostgreSQL(`postgres{2|3|4}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword');
|
||||
```
|
||||
|
||||
Replicas priority for PostgreSQL dictionary source is supported. The bigger the number in map, the less the priority. The highest priority is `0`.
|
||||
|
||||
|
@ -1803,6 +1803,27 @@ Possible values:
|
||||
|
||||
Default value: 0.
|
||||
|
||||
## distributed_directory_monitor_split_batch_on_failure {#distributed_directory_monitor_split_batch_on_failure}
|
||||
|
||||
Enables/disables splitting batches on failures.
|
||||
|
||||
Sometimes sending particular batch to the remote shard may fail, because of some complex pipeline after (i.e. `MATERIALIZED VIEW` with `GROUP BY`) due to `Memory limit exceeded` or similar errors. In this case, retrying will not help (and this will stuck distributed sends for the table) but sending files from that batch one by one may succeed INSERT.
|
||||
|
||||
So installing this setting to `1` will disable batching for such batches (i.e. temporary disables `distributed_directory_monitor_batch_inserts` for failed batches).
|
||||
|
||||
Possible values:
|
||||
|
||||
- 1 — Enabled.
|
||||
- 0 — Disabled.
|
||||
|
||||
Default value: 0.
|
||||
|
||||
!!! note "Note"
|
||||
This setting also affects broken batches (that may appears because of abnormal server (machine) termination and no `fsync_after_insert`/`fsync_directories` for [Distributed](../../engines/table-engines/special/distributed.md) table engine).
|
||||
|
||||
!!! warning "Warning"
|
||||
You should not rely on automatic batch splitting, since this may hurt performance.
|
||||
|
||||
## os_thread_priority {#setting-os-thread-priority}
|
||||
|
||||
Sets the priority ([nice](https://en.wikipedia.org/wiki/Nice_(Unix))) for threads that execute queries. The OS scheduler considers this priority when choosing the next thread to run on each available CPU core.
|
||||
@ -3145,4 +3166,17 @@ SETTINGS index_granularity = 8192 │
|
||||
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
[Original article](https://clickhouse.tech/docs/en/operations/settings/settings/) <!-- hide -->
|
||||
## external_table_functions_use_nulls {#external-table-functions-use-nulls}
|
||||
|
||||
Defines how [mysql](../../sql-reference/table-functions/mysql.md), [postgresql](../../sql-reference/table-functions/postgresql.md) and [odbc](../../sql-reference/table-functions/odbc.md)] table functions use Nullable columns.
|
||||
|
||||
Possible values:
|
||||
|
||||
- 0 — The table function explicitly uses Nullable columns.
|
||||
- 1 — The table function implicitly uses Nullable columns.
|
||||
|
||||
Default value: `1`.
|
||||
|
||||
**Usage**
|
||||
|
||||
If the setting is set to `0`, the table function does not make Nullable columns and inserts default values instead of NULL. This is also applicable for NULL values inside arrays.
|
||||
|
39
docs/en/operations/system-tables/data_skipping_indices.md
Normal file
39
docs/en/operations/system-tables/data_skipping_indices.md
Normal file
@ -0,0 +1,39 @@
|
||||
# system.data_skipping_indices {#system-data-skipping-indices}
|
||||
|
||||
Contains information about existing data skipping indices in all the tables.
|
||||
|
||||
Columns:
|
||||
|
||||
- `database` ([String](../../sql-reference/data-types/string.md)) — Database name.
|
||||
- `table` ([String](../../sql-reference/data-types/string.md)) — Table name.
|
||||
- `name` ([String](../../sql-reference/data-types/string.md)) — Index name.
|
||||
- `type` ([String](../../sql-reference/data-types/string.md)) — Index type.
|
||||
- `expr` ([String](../../sql-reference/data-types/string.md)) — Expression used to calculate the index.
|
||||
- `granularity` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of granules in the block.
|
||||
|
||||
**Example**
|
||||
|
||||
|
||||
```sql
|
||||
SELECT * FROM system.data_skipping_indices LIMIT 2 FORMAT Vertical;
|
||||
```
|
||||
|
||||
```text
|
||||
Row 1:
|
||||
──────
|
||||
database: default
|
||||
table: user_actions
|
||||
name: clicks_idx
|
||||
type: minmax
|
||||
expr: clicks
|
||||
granularity: 1
|
||||
|
||||
Row 2:
|
||||
──────
|
||||
database: default
|
||||
table: users
|
||||
name: contacts_null_idx
|
||||
type: minmax
|
||||
expr: assumeNotNull(contacts_null)
|
||||
granularity: 1
|
||||
```
|
@ -39,6 +39,18 @@ Simple `WHERE` clauses such as `=, !=, >, >=, <, <=` are currently executed on t
|
||||
|
||||
The rest of the conditions and the `LIMIT` sampling constraint are executed in ClickHouse only after the query to MySQL finishes.
|
||||
|
||||
Supports multiple replicas that must be listed by `|`. For example:
|
||||
|
||||
```sql
|
||||
SELECT name FROM mysql(`mysql{1|2|3}:3306`, 'mysql_database', 'mysql_table', 'user', 'password');
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```sql
|
||||
SELECT name FROM mysql(`mysql1:3306|mysql2:3306|mysql3:3306`, 'mysql_database', 'mysql_table', 'user', 'password');
|
||||
```
|
||||
|
||||
**Returned Value**
|
||||
|
||||
A table object with the same columns as the original MySQL table.
|
||||
|
@ -43,8 +43,20 @@ PostgreSQL Array types converts into ClickHouse arrays.
|
||||
|
||||
!!! info "Note"
|
||||
Be careful, in PostgreSQL an array data type column like Integer[] may contain arrays of different dimensions in different rows, but in ClickHouse it is only allowed to have multidimensional arrays of the same dimension in all rows.
|
||||
|
||||
Supports multiple replicas that must be listed by `|`. For example:
|
||||
|
||||
Supports replicas priority for PostgreSQL dictionary source. The bigger the number in map, the less the priority. The highest priority is `0`.
|
||||
```sql
|
||||
SELECT name FROM postgresql(`postgres{1|2|3}:5432`, 'postgres_database', 'postgres_table', 'user', 'password');
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```sql
|
||||
SELECT name FROM postgresql(`postgres1:5431|postgres2:5432`, 'postgres_database', 'postgres_table', 'user', 'password');
|
||||
```
|
||||
|
||||
Supports replicas priority for PostgreSQL dictionary source. The bigger the number in map, the less the priority. The highest priority is `0`.
|
||||
|
||||
**Examples**
|
||||
|
||||
|
@ -0,0 +1,53 @@
|
||||
---
|
||||
toc_priority: 12
|
||||
toc_title: ExternalDistributed
|
||||
---
|
||||
|
||||
# ExternalDistributed {#externaldistributed}
|
||||
|
||||
Движок `ExternalDistributed` позволяет выполнять запросы `SELECT` для таблиц на удаленном сервере MySQL или PostgreSQL. Принимает в качестве аргумента табличные движки [MySQL](../../../engines/table-engines/integrations/mysql.md) или [PostgreSQL](../../../engines/table-engines/integrations/postgresql.md), поэтому возможно шардирование.
|
||||
|
||||
## Создание таблицы {#creating-a-table}
|
||||
|
||||
``` sql
|
||||
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
||||
(
|
||||
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
|
||||
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
|
||||
...
|
||||
) ENGINE = ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password');
|
||||
```
|
||||
|
||||
Смотрите подробное описание запроса [CREATE TABLE](../../../sql-reference/statements/create/table.md#create-table-query).
|
||||
|
||||
Структура таблицы может отличаться от структуры исходной таблицы:
|
||||
|
||||
- Имена столбцов должны быть такими же, как в исходной таблице, но можно использовать только некоторые из этих столбцов и в любом порядке.
|
||||
- Типы столбцов могут отличаться от типов в исходной таблице. ClickHouse пытается [привести](../../../sql-reference/functions/type-conversion-functions.md#type_conversion_function-cast) значения к типам данных ClickHouse.
|
||||
|
||||
**Параметры движка**
|
||||
|
||||
- `engine` — табличный движок `MySQL` или `PostgreSQL`.
|
||||
- `host:port` — адрес сервера MySQL или PostgreSQL.
|
||||
- `database` — имя базы данных на сервере.
|
||||
- `table` — имя таблицы.
|
||||
- `user` — имя пользователя.
|
||||
- `password` — пароль пользователя.
|
||||
|
||||
## Особенности реализации {#implementation-details}
|
||||
|
||||
Поддерживает несколько реплик, которые должны быть перечислены через `|`, а шарды — через `,`. Например:
|
||||
|
||||
```sql
|
||||
CREATE TABLE test_shards (id UInt32, name String, age UInt32, money UInt32) ENGINE = ExternalDistributed('MySQL', `mysql{1|2}:3306,mysql{3|4}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');
|
||||
```
|
||||
|
||||
При указании реплик для каждого из шардов при чтении выбирается одна из доступных реплик. Если соединиться не удалось, то выбирается следующая реплика, и так для всех реплик. Если попытка соединения не удалась для всех реплик, то сервер ClickHouse снова пытается соединиться с одной из реплик, перебирая их по кругу, и так несколько раз.
|
||||
|
||||
Вы можете указать любое количество шардов и любое количество реплик для каждого шарда.
|
||||
|
||||
**Смотрите также**
|
||||
|
||||
- [Табличный движок MySQL](../../../engines/table-engines/integrations/mysql.md)
|
||||
- [Табличный движок PostgreSQL](../../../engines/table-engines/integrations/postgresql.md)
|
||||
- [Табличный движок Distributed](../../../engines/table-engines/special/distributed.md)
|
@ -20,11 +20,11 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
||||
|
||||
Смотрите подробное описание запроса [CREATE TABLE](../../../sql-reference/statements/create/table.md#create-table-query).
|
||||
|
||||
Структура таблицы может отличаться от исходной структуры таблицы MySQL:
|
||||
Структура таблицы может отличаться от структуры исходной таблицы MySQL:
|
||||
|
||||
- Имена столбцов должны быть такими же, как в исходной таблице MySQL, но вы можете использовать только некоторые из этих столбцов и в любом порядке.
|
||||
- Типы столбцов могут отличаться от типов в исходной таблице MySQL. ClickHouse пытается [приводить](../../../sql-reference/functions/type-conversion-functions.md#type_conversion_function-cast) значения к типам данных ClickHouse.
|
||||
- Настройка `external_table_functions_use_nulls` определяет как обрабатывать Nullable столбцы. По умолчанию 1, если 0 - табличная функция не будет делать nullable столбцы и будет вместо null выставлять значения по умолчанию для скалярного типа. Это также применимо для null значений внутри массивов.
|
||||
- Имена столбцов должны быть такими же, как в исходной таблице MySQL, но можно использовать только некоторые из этих столбцов и в любом порядке.
|
||||
- Типы столбцов могут отличаться от типов в исходной таблице MySQL. ClickHouse пытается [привести](../../../engines/database-engines/mysql.md#data_types-support) значения к типам данных ClickHouse.
|
||||
- Настройка [external_table_functions_use_nulls](../../../operations/settings/settings.md#external-table-functions-use-nulls) определяет как обрабатывать Nullable столбцы. Значение по умолчанию: 1. Если значение 0, то табличная функция не делает Nullable столбцы, а вместо NULL выставляет значения по умолчанию для скалярного типа. Это также применимо для значений NULL внутри массивов.
|
||||
|
||||
**Параметры движка**
|
||||
|
||||
@ -50,6 +50,12 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
||||
|
||||
Остальные условия и ограничение выборки `LIMIT` будут выполнены в ClickHouse только после выполнения запроса к MySQL.
|
||||
|
||||
Поддерживает несколько реплик, которые должны быть перечислены через `|`. Например:
|
||||
|
||||
```sql
|
||||
CREATE TABLE test_replicas (id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL(`mysql{2|3|4}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');
|
||||
```
|
||||
|
||||
## Пример использования {#primer-ispolzovaniia}
|
||||
|
||||
Таблица в MySQL:
|
||||
|
@ -29,7 +29,7 @@ ENGINE = ODBC(connection_settings, external_database, external_table)
|
||||
|
||||
- Имена столбцов должны быть такими же, как в исходной таблице, но вы можете использовать только некоторые из этих столбцов и в любом порядке.
|
||||
- Типы столбцов могут отличаться от типов аналогичных столбцов в исходной таблице. ClickHouse пытается [приводить](../../../engines/table-engines/integrations/odbc.md#type_conversion_function-cast) значения к типам данных ClickHouse.
|
||||
- Настройка `external_table_functions_use_nulls` определяет как обрабатывать Nullable столбцы. По умолчанию 1, если 0 - табличная функция не будет делать nullable столбцы и будет вместо null выставлять значения по умолчанию для скалярного типа. Это также применимо для null значений внутри массивов.
|
||||
- Настройка [external_table_functions_use_nulls](../../../operations/settings/settings.md#external-table-functions-use-nulls) определяет как обрабатывать Nullable столбцы. Значение по умолчанию: 1. Если значение 0, то табличная функция не делает Nullable столбцы, а вместо NULL выставляет значения по умолчанию для скалярного типа. Это также применимо для значений NULL внутри массивов.
|
||||
|
||||
**Параметры движка**
|
||||
|
||||
|
@ -20,19 +20,19 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
||||
|
||||
Смотрите подробное описание запроса [CREATE TABLE](../../../sql-reference/statements/create/table.md#create-table-query).
|
||||
|
||||
Структура таблицы может отличаться от исходной структуры таблицы PostgreSQL:
|
||||
Структура таблицы может отличаться от структуры исходной таблицы PostgreSQL:
|
||||
|
||||
- Имена столбцов должны быть такими же, как в исходной таблице PostgreSQL, но вы можете использовать только некоторые из этих столбцов и в любом порядке.
|
||||
- Типы столбцов могут отличаться от типов в исходной таблице PostgreSQL. ClickHouse пытается [приводить](../../../sql-reference/functions/type-conversion-functions.md#type_conversion_function-cast) values to the ClickHouse data types.
|
||||
- Настройка `external_table_functions_use_nulls` определяет как обрабатывать Nullable столбцы. По умолчанию 1, если 0 - табличная функция не будет делать nullable столбцы и будет вместо null выставлять значения по умолчанию для скалярного типа. Это также применимо для null значений внутри массивов.
|
||||
- Имена столбцов должны быть такими же, как в исходной таблице PostgreSQL, но можно использовать только некоторые из этих столбцов и в любом порядке.
|
||||
- Типы столбцов могут отличаться от типов в исходной таблице PostgreSQL. ClickHouse пытается [привести](../../../engines/database-engines/postgresql.md#data_types-support) значения к типам данных ClickHouse.
|
||||
- Настройка [external_table_functions_use_nulls](../../../operations/settings/settings.md#external-table-functions-use-nulls) определяет как обрабатывать Nullable столбцы. Значение по умолчанию: 1. Если значение 0, то табличная функция не делает Nullable столбцы, а вместо NULL выставляет значения по умолчанию для скалярного типа. Это также применимо для значений NULL внутри массивов.
|
||||
|
||||
**Параметры движка**
|
||||
|
||||
- `host:port` — адрес сервера PostgreSQL.
|
||||
- `database` — Имя базы данных на сервере PostgreSQL.
|
||||
- `table` — Имя таблицы.
|
||||
- `user` — Имя пользователя PostgreSQL.
|
||||
- `password` — Пароль пользователя PostgreSQL.
|
||||
- `database` — имя базы данных на сервере PostgreSQL.
|
||||
- `table` — имя таблицы.
|
||||
- `user` — имя пользователя PostgreSQL.
|
||||
- `password` — пароль пользователя PostgreSQL.
|
||||
- `schema` — имя схемы, если не используется схема по умолчанию. Необязательный аргумент.
|
||||
|
||||
## Особенности реализации {#implementation-details}
|
||||
@ -49,6 +49,12 @@ PostgreSQL массивы конвертируются в массивы ClickHo
|
||||
|
||||
!!! info "Внимание"
|
||||
Будьте внимательны, в PostgreSQL массивы, созданные как `type_name[]`, являются многомерными и могут содержать в себе разное количество измерений в разных строках одной таблицы. Внутри ClickHouse допустимы только многомерные массивы с одинаковым кол-вом измерений во всех строках таблицы.
|
||||
|
||||
Поддерживает несколько реплик, которые должны быть перечислены через `|`. Например:
|
||||
|
||||
```sql
|
||||
CREATE TABLE test_replicas (id UInt32, name String) ENGINE = PostgreSQL(`postgres{2|3|4}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword');
|
||||
```
|
||||
|
||||
При использовании словаря PostgreSQL поддерживается приоритет реплик. Чем больше номер реплики, тем ниже ее приоритет. Наивысший приоритет у реплики с номером `0`.
|
||||
|
||||
|
@ -3023,4 +3023,17 @@ SETTINGS index_granularity = 8192 │
|
||||
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
[Оригинальная статья](https://clickhouse.tech/docs/ru/operations/settings/settings/) <!--hide-->
|
||||
## external_table_functions_use_nulls {#external-table-functions-use-nulls}
|
||||
|
||||
Определяет, как табличные функции [mysql](../../sql-reference/table-functions/mysql.md), [postgresql](../../sql-reference/table-functions/postgresql.md) и [odbc](../../sql-reference/table-functions/odbc.md)] используют Nullable столбцы.
|
||||
|
||||
Возможные значения:
|
||||
|
||||
- 0 — табличная функция явно использует Nullable столбцы.
|
||||
- 1 — табличная функция неявно использует Nullable столбцы.
|
||||
|
||||
Значение по умолчанию: `1`.
|
||||
|
||||
**Использование**
|
||||
|
||||
Если установлено значение `0`, то табличная функция не делает Nullable столбцы, а вместо NULL выставляет значения по умолчанию для скалярного типа. Это также применимо для значений NULL внутри массивов.
|
||||
|
@ -38,6 +38,18 @@ mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_
|
||||
|
||||
Остальные условия и ограничение выборки `LIMIT` будут выполнены в ClickHouse только после выполнения запроса к MySQL.
|
||||
|
||||
Поддерживает несколько реплик, которые должны быть перечислены через `|`. Например:
|
||||
|
||||
```sql
|
||||
SELECT name FROM mysql(`mysql{1|2|3}:3306`, 'mysql_database', 'mysql_table', 'user', 'password');
|
||||
```
|
||||
|
||||
или
|
||||
|
||||
```sql
|
||||
SELECT name FROM mysql(`mysql1:3306|mysql2:3306|mysql3:3306`, 'mysql_database', 'mysql_table', 'user', 'password');
|
||||
```
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
||||
Объект таблицы с теми же столбцами, что и в исходной таблице MySQL.
|
||||
|
@ -43,6 +43,18 @@ PostgreSQL массивы конвертируются в массивы ClickHo
|
||||
|
||||
!!! info "Примечание"
|
||||
Будьте внимательны, в PostgreSQL массивы, созданные как `type_name[]`, являются многомерными и могут содержать в себе разное количество измерений в разных строках одной таблицы. Внутри ClickHouse допустипы только многомерные массивы с одинаковым кол-вом измерений во всех строках таблицы.
|
||||
|
||||
Поддерживает несколько реплик, которые должны быть перечислены через `|`. Например:
|
||||
|
||||
```sql
|
||||
SELECT name FROM postgresql(`postgres{1|2|3}:5432`, 'postgres_database', 'postgres_table', 'user', 'password');
|
||||
```
|
||||
|
||||
или
|
||||
|
||||
```sql
|
||||
SELECT name FROM postgresql(`postgres1:5431|postgres2:5432`, 'postgres_database', 'postgres_table', 'user', 'password');
|
||||
```
|
||||
|
||||
При использовании словаря PostgreSQL поддерживается приоритет реплик. Чем больше номер реплики, тем ниже ее приоритет. Наивысший приоритет у реплики с номером `0`.
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
# 设置 {#set}
|
||||
# 集合 {#set}
|
||||
|
||||
始终存在于 RAM 中的数据集。它适用于IN运算符的右侧(请参见 «IN运算符» 部分)。
|
||||
|
||||
|
120
docs/zh/sql-reference/table-functions/postgresql.md
Normal file
120
docs/zh/sql-reference/table-functions/postgresql.md
Normal file
@ -0,0 +1,120 @@
|
||||
---
|
||||
toc_priority: 42
|
||||
toc_title: postgresql
|
||||
---
|
||||
|
||||
# postgresql {#postgresql}
|
||||
|
||||
允许对存储在远程 PostgreSQL 服务器上的数据进行 `SELECT` 和 `INSERT` 查询.
|
||||
|
||||
**语法**
|
||||
|
||||
``` sql
|
||||
postgresql('host:port', 'database', 'table', 'user', 'password'[, `schema`])
|
||||
```
|
||||
|
||||
**参数**
|
||||
|
||||
- `host:port` — PostgreSQL 服务器地址.
|
||||
- `database` — 远程数据库名称.
|
||||
- `table` — 远程表名称.
|
||||
- `user` — PostgreSQL 用户.
|
||||
- `password` — 用户密码.
|
||||
- `schema` — 非默认的表结构. 可选.
|
||||
|
||||
**返回值**
|
||||
|
||||
一个表对象,其列数与原 PostgreSQL 表的列数相同。
|
||||
|
||||
!!! info "Note"
|
||||
在`INSERT`查询中,为了区分表函数`postgresql(..)`和表名以及表的列名列表,你必须使用关键字`FUNCTION`或`TABLE FUNCTION`。请看下面的例子。
|
||||
|
||||
## 实施细节 {#implementation-details}
|
||||
|
||||
`SELECT`查询在 PostgreSQL 上以 `COPY (SELECT ...) TO STDOUT` 的方式在只读的 PostgreSQL 事务中运行,每次在`SELECT`查询后提交。
|
||||
|
||||
简单的`WHERE`子句,如`=`、`!=`、`>`、`>=`、`<`、`<=`和`IN`,在PostgreSQL服务器上执行。
|
||||
|
||||
所有的连接、聚合、排序,`IN [ 数组 ]`条件和`LIMIT`采样约束只有在对PostgreSQL的查询结束后才会在ClickHouse中执行。
|
||||
|
||||
PostgreSQL 上的`INSERT`查询以`COPY "table_name" (field1, field2, ... fieldN) FROM STDIN`的方式在 PostgreSQL 事务中运行,每次`INSERT`语句后自动提交。
|
||||
|
||||
PostgreSQL 数组类型将转换为 ClickHouse 数组。
|
||||
|
||||
!!! info "Note"
|
||||
要小心,在 PostgreSQL 中,像 Integer[] 这样的数组数据类型列可以在不同的行中包含不同维度的数组,但在 ClickHouse 中,只允许在所有的行中有相同维度的多维数组。
|
||||
|
||||
支持设置 PostgreSQL 字典源中 Replicas 的优先级。地图中的数字越大,优先级就越低。`0`代表最高的优先级。
|
||||
|
||||
**示例**
|
||||
|
||||
PostgreSQL 中的表:
|
||||
|
||||
``` text
|
||||
postgres=# CREATE TABLE "public"."test" (
|
||||
"int_id" SERIAL,
|
||||
"int_nullable" INT NULL DEFAULT NULL,
|
||||
"float" FLOAT NOT NULL,
|
||||
"str" VARCHAR(100) NOT NULL DEFAULT '',
|
||||
"float_nullable" FLOAT NULL DEFAULT NULL,
|
||||
PRIMARY KEY (int_id));
|
||||
|
||||
CREATE TABLE
|
||||
|
||||
postgres=# INSERT INTO test (int_id, str, "float") VALUES (1,'test',2);
|
||||
INSERT 0 1
|
||||
|
||||
postgresql> SELECT * FROM test;
|
||||
int_id | int_nullable | float | str | float_nullable
|
||||
--------+--------------+-------+------+----------------
|
||||
1 | | 2 | test |
|
||||
(1 row)
|
||||
```
|
||||
|
||||
从 ClickHouse 检索数据:
|
||||
|
||||
```sql
|
||||
SELECT * FROM postgresql('localhost:5432', 'test', 'test', 'postgresql_user', 'password') WHERE str IN ('test');
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─int_id─┬─int_nullable─┬─float─┬─str──┬─float_nullable─┐
|
||||
│ 1 │ ᴺᵁᴸᴸ │ 2 │ test │ ᴺᵁᴸᴸ │
|
||||
└────────┴──────────────┴───────┴──────┴────────────────┘
|
||||
```
|
||||
|
||||
插入数据:
|
||||
|
||||
```sql
|
||||
INSERT INTO TABLE FUNCTION postgresql('localhost:5432', 'test', 'test', 'postgrsql_user', 'password') (int_id, float) VALUES (2, 3);
|
||||
SELECT * FROM postgresql('localhost:5432', 'test', 'test', 'postgresql_user', 'password');
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─int_id─┬─int_nullable─┬─float─┬─str──┬─float_nullable─┐
|
||||
│ 1 │ ᴺᵁᴸᴸ │ 2 │ test │ ᴺᵁᴸᴸ │
|
||||
│ 2 │ ᴺᵁᴸᴸ │ 3 │ │ ᴺᵁᴸᴸ │
|
||||
└────────┴──────────────┴───────┴──────┴────────────────┘
|
||||
```
|
||||
|
||||
使用非默认的表结构:
|
||||
|
||||
```text
|
||||
postgres=# CREATE SCHEMA "nice.schema";
|
||||
|
||||
postgres=# CREATE TABLE "nice.schema"."nice.table" (a integer);
|
||||
|
||||
postgres=# INSERT INTO "nice.schema"."nice.table" SELECT i FROM generate_series(0, 99) as t(i)
|
||||
```
|
||||
|
||||
```sql
|
||||
CREATE TABLE pg_table_schema_with_dots (a UInt32)
|
||||
ENGINE PostgreSQL('localhost:5432', 'clickhouse', 'nice.table', 'postgrsql_user', 'password', 'nice.schema');
|
||||
```
|
||||
|
||||
**另请参阅**
|
||||
|
||||
- [PostgreSQL 表引擎](../../engines/table-engines/integrations/postgresql.md)
|
||||
- [使用 PostgreSQL 作为外部字典的来源](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md#dicts-external_dicts_dict_sources-postgresql)
|
||||
|
||||
[原始文章](https://clickhouse.tech/docs/en/sql-reference/table-functions/postgresql/) <!--hide-->
|
@ -87,18 +87,4 @@ FilterDescription::FilterDescription(const IColumn & column_)
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
|
||||
}
|
||||
|
||||
|
||||
void checkColumnCanBeUsedAsFilter(const ColumnWithTypeAndName & column_elem)
|
||||
{
|
||||
ConstantFilterDescription const_filter;
|
||||
if (column_elem.column)
|
||||
const_filter = ConstantFilterDescription(*column_elem.column);
|
||||
|
||||
if (!const_filter.always_false && !const_filter.always_true)
|
||||
{
|
||||
auto column = column_elem.column ? column_elem.column : column_elem.type->createColumn();
|
||||
FilterDescription filter(*column);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -32,7 +32,4 @@ struct FilterDescription
|
||||
|
||||
struct ColumnWithTypeAndName;
|
||||
|
||||
/// Will throw an exception if column_elem is cannot be used as a filter column.
|
||||
void checkColumnCanBeUsedAsFilter(const ColumnWithTypeAndName & column_elem);
|
||||
|
||||
}
|
||||
|
@ -555,6 +555,8 @@
|
||||
M(585, CANNOT_PARSE_YAML) \
|
||||
M(586, CANNOT_CREATE_FILE) \
|
||||
M(587, CONCURRENT_ACCESS_NOT_SUPPORTED) \
|
||||
M(588, DISTRIBUTED_BROKEN_BATCH_INFO) \
|
||||
M(589, DISTRIBUTED_BROKEN_BATCH_FILES) \
|
||||
\
|
||||
M(998, POSTGRESQL_CONNECTION_FAILURE) \
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
|
@ -90,6 +90,7 @@ class IColumn;
|
||||
M(Milliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \
|
||||
\
|
||||
M(Bool, distributed_directory_monitor_batch_inserts, false, "Should StorageDistributed DirectoryMonitors try to batch individual inserts into bigger ones.", 0) \
|
||||
M(Bool, distributed_directory_monitor_split_batch_on_failure, false, "Should StorageDistributed DirectoryMonitors try to split batch into smaller in case of failures.", 0) \
|
||||
\
|
||||
M(Bool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.", 0) \
|
||||
\
|
||||
|
@ -25,6 +25,7 @@ namespace ErrorCodes
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int FILE_ALREADY_EXISTS;
|
||||
extern const int INCORRECT_QUERY;
|
||||
extern const int ABORTED;
|
||||
}
|
||||
|
||||
class AtomicDatabaseTablesSnapshotIterator final : public DatabaseTablesSnapshotIterator
|
||||
@ -420,7 +421,18 @@ void DatabaseAtomic::loadStoredObjects(ContextMutablePtr local_context, bool has
|
||||
{
|
||||
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
|
||||
if (has_force_restore_data_flag)
|
||||
fs::remove_all(path_to_table_symlinks);
|
||||
{
|
||||
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
|
||||
{
|
||||
if (!fs::is_symlink(table_path))
|
||||
{
|
||||
throw Exception(ErrorCodes::ABORTED,
|
||||
"'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path.path()));
|
||||
}
|
||||
|
||||
fs::remove(table_path);
|
||||
}
|
||||
}
|
||||
|
||||
DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach);
|
||||
|
||||
|
@ -25,14 +25,19 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, uint16_t dimensions)
|
||||
static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, uint16_t dimensions, const std::function<void()> & recheck_array)
|
||||
{
|
||||
DataTypePtr res;
|
||||
bool is_array = false;
|
||||
|
||||
/// Get rid of trailing '[]' for arrays
|
||||
if (dimensions)
|
||||
if (type.ends_with("[]"))
|
||||
{
|
||||
is_array = true;
|
||||
|
||||
while (type.ends_with("[]"))
|
||||
type.resize(type.size() - 2);
|
||||
}
|
||||
|
||||
if (type == "smallint")
|
||||
res = std::make_shared<DataTypeInt16>();
|
||||
@ -88,8 +93,24 @@ static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, ui
|
||||
res = std::make_shared<DataTypeString>();
|
||||
if (is_nullable)
|
||||
res = std::make_shared<DataTypeNullable>(res);
|
||||
while (dimensions--)
|
||||
res = std::make_shared<DataTypeArray>(res);
|
||||
|
||||
if (is_array)
|
||||
{
|
||||
/// In some cases att_ndims does not return correct number of dimensions
|
||||
/// (it might return incorrect 0 number, for example, when a postgres table is created via 'as select * from table_with_arrays').
|
||||
/// So recheck all arrays separately afterwards. (Cannot check here on the same connection because another query is in execution).
|
||||
if (!dimensions)
|
||||
{
|
||||
/// Return 1d array type and recheck all arrays dims with array_ndims
|
||||
res = std::make_shared<DataTypeArray>(res);
|
||||
recheck_array();
|
||||
}
|
||||
else
|
||||
{
|
||||
while (dimensions--)
|
||||
res = std::make_shared<DataTypeArray>(res);
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
@ -98,7 +119,7 @@ static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, ui
|
||||
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
|
||||
postgres::ConnectionHolderPtr connection_holder, const String & postgres_table_name, bool use_nulls)
|
||||
{
|
||||
auto columns = NamesAndTypesList();
|
||||
auto columns = NamesAndTypes();
|
||||
|
||||
if (postgres_table_name.find('\'') != std::string::npos
|
||||
|| postgres_table_name.find('\\') != std::string::npos)
|
||||
@ -115,22 +136,46 @@ std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
|
||||
"AND NOT attisdropped AND attnum > 0", postgres_table_name);
|
||||
try
|
||||
{
|
||||
pqxx::read_transaction tx(connection_holder->get());
|
||||
auto stream{pqxx::stream_from::query(tx, query)};
|
||||
|
||||
std::tuple<std::string, std::string, std::string, uint16_t> row;
|
||||
while (stream >> row)
|
||||
std::set<size_t> recheck_arrays_indexes;
|
||||
{
|
||||
columns.push_back(NameAndTypePair(
|
||||
std::get<0>(row),
|
||||
convertPostgreSQLDataType(
|
||||
std::get<1>(row),
|
||||
use_nulls && (std::get<2>(row) == "f"), /// 'f' means that postgres `not_null` is false, i.e. value is nullable
|
||||
std::get<3>(row))));
|
||||
pqxx::read_transaction tx(connection_holder->get());
|
||||
auto stream{pqxx::stream_from::query(tx, query)};
|
||||
|
||||
std::tuple<std::string, std::string, std::string, uint16_t> row;
|
||||
size_t i = 0;
|
||||
auto recheck_array = [&]() { recheck_arrays_indexes.insert(i); };
|
||||
while (stream >> row)
|
||||
{
|
||||
auto data_type = convertPostgreSQLDataType(std::get<1>(row),
|
||||
use_nulls && (std::get<2>(row) == "f"), /// 'f' means that postgres `not_null` is false, i.e. value is nullable
|
||||
std::get<3>(row),
|
||||
recheck_array);
|
||||
columns.push_back(NameAndTypePair(std::get<0>(row), data_type));
|
||||
++i;
|
||||
}
|
||||
stream.complete();
|
||||
tx.commit();
|
||||
}
|
||||
|
||||
for (const auto & i : recheck_arrays_indexes)
|
||||
{
|
||||
const auto & name_and_type = columns[i];
|
||||
|
||||
pqxx::nontransaction tx(connection_holder->get());
|
||||
/// All rows must contain the same number of dimensions, so limit 1 is ok. If number of dimensions in all rows is not the same -
|
||||
/// such arrays are not able to be used as ClickHouse Array at all.
|
||||
pqxx::result result{tx.exec(fmt::format("SELECT array_ndims({}) FROM {} LIMIT 1", name_and_type.name, postgres_table_name))};
|
||||
auto dimensions = result[0][0].as<int>();
|
||||
|
||||
/// It is always 1d array if it is in recheck.
|
||||
DataTypePtr type = assert_cast<const DataTypeArray *>(name_and_type.type.get())->getNestedType();
|
||||
while (dimensions--)
|
||||
type = std::make_shared<DataTypeArray>(type);
|
||||
|
||||
columns[i] = NameAndTypePair(name_and_type.name, type);
|
||||
}
|
||||
stream.complete();
|
||||
tx.commit();
|
||||
}
|
||||
|
||||
catch (const pqxx::undefined_table &)
|
||||
{
|
||||
throw Exception(fmt::format(
|
||||
@ -146,7 +191,7 @@ std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
|
||||
if (columns.empty())
|
||||
return nullptr;
|
||||
|
||||
return std::make_shared<NamesAndTypesList>(columns);
|
||||
return std::make_shared<NamesAndTypesList>(NamesAndTypesList(columns.begin(), columns.end()));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -394,7 +394,7 @@ struct ContextSharedPart
|
||||
|
||||
/// Clusters for distributed tables
|
||||
/// Initialized on demand (on distributed storages initialization) since Settings should be initialized
|
||||
std::unique_ptr<Clusters> clusters;
|
||||
std::shared_ptr<Clusters> clusters;
|
||||
ConfigurationPtr clusters_config; /// Stores updated configs
|
||||
mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config
|
||||
|
||||
@ -1882,7 +1882,7 @@ std::optional<UInt16> Context::getTCPPortSecure() const
|
||||
|
||||
std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) const
|
||||
{
|
||||
auto res = getClusters().getCluster(cluster_name);
|
||||
auto res = getClusters()->getCluster(cluster_name);
|
||||
if (res)
|
||||
return res;
|
||||
|
||||
@ -1896,7 +1896,7 @@ std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) c
|
||||
|
||||
std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name) const
|
||||
{
|
||||
return getClusters().getCluster(cluster_name);
|
||||
return getClusters()->getCluster(cluster_name);
|
||||
}
|
||||
|
||||
|
||||
@ -1911,7 +1911,7 @@ void Context::reloadClusterConfig() const
|
||||
}
|
||||
|
||||
const auto & config = cluster_config ? *cluster_config : getConfigRef();
|
||||
auto new_clusters = std::make_unique<Clusters>(config, settings);
|
||||
auto new_clusters = std::make_shared<Clusters>(config, settings);
|
||||
|
||||
{
|
||||
std::lock_guard lock(shared->clusters_mutex);
|
||||
@ -1927,16 +1927,16 @@ void Context::reloadClusterConfig() const
|
||||
}
|
||||
|
||||
|
||||
Clusters & Context::getClusters() const
|
||||
std::shared_ptr<Clusters> Context::getClusters() const
|
||||
{
|
||||
std::lock_guard lock(shared->clusters_mutex);
|
||||
if (!shared->clusters)
|
||||
{
|
||||
const auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef();
|
||||
shared->clusters = std::make_unique<Clusters>(config, settings);
|
||||
shared->clusters = std::make_shared<Clusters>(config, settings);
|
||||
}
|
||||
|
||||
return *shared->clusters;
|
||||
return shared->clusters;
|
||||
}
|
||||
|
||||
|
||||
|
@ -676,7 +676,7 @@ public:
|
||||
void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);
|
||||
DDLWorker & getDDLWorker() const;
|
||||
|
||||
Clusters & getClusters() const;
|
||||
std::shared_ptr<Clusters> getClusters() const;
|
||||
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
|
||||
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
|
||||
void setClustersConfig(const ConfigurationPtr & config, const String & config_name = "remote_servers");
|
||||
|
@ -953,10 +953,8 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
|
||||
ExpressionActionsChain & chain, bool only_types, const Names & additional_required_columns)
|
||||
{
|
||||
const auto * select_query = getSelectQuery();
|
||||
ActionsDAGPtr prewhere_actions;
|
||||
|
||||
if (!select_query->prewhere())
|
||||
return prewhere_actions;
|
||||
return nullptr;
|
||||
|
||||
Names first_action_names;
|
||||
if (!chain.steps.empty())
|
||||
@ -973,6 +971,7 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
|
||||
throw Exception("Invalid type for filter in PREWHERE: " + filter_type->getName(),
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
|
||||
|
||||
ActionsDAGPtr prewhere_actions;
|
||||
{
|
||||
/// Remove unused source_columns from prewhere actions.
|
||||
auto tmp_actions_dag = std::make_shared<ActionsDAG>(sourceColumns());
|
||||
@ -1038,18 +1037,6 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
|
||||
return prewhere_actions;
|
||||
}
|
||||
|
||||
void SelectQueryExpressionAnalyzer::appendPreliminaryFilter(ExpressionActionsChain & chain, ActionsDAGPtr actions_dag, String column_name)
|
||||
{
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
|
||||
|
||||
// FIXME: assert(filter_info);
|
||||
auto * expression_step = typeid_cast<ExpressionActionsChain::ExpressionActionsStep *>(&step);
|
||||
expression_step->actions_dag = std::move(actions_dag);
|
||||
step.addRequiredOutput(column_name);
|
||||
|
||||
chain.addStep();
|
||||
}
|
||||
|
||||
bool SelectQueryExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_types)
|
||||
{
|
||||
const auto * select_query = getSelectQuery();
|
||||
|
@ -357,8 +357,6 @@ private:
|
||||
ArrayJoinActionPtr appendArrayJoin(ExpressionActionsChain & chain, ActionsDAGPtr & before_array_join, bool only_types);
|
||||
bool appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types);
|
||||
JoinPtr appendJoin(ExpressionActionsChain & chain);
|
||||
/// Add preliminary rows filtration. Actions are created in other expression analyzer to prevent any possible alias injection.
|
||||
void appendPreliminaryFilter(ExpressionActionsChain & chain, ActionsDAGPtr actions_dag, String column_name);
|
||||
/// remove_filter is set in ExpressionActionsChain::finalize();
|
||||
/// Columns in `additional_required_columns` will not be removed (they can be used for e.g. sampling or FINAL modifier).
|
||||
ActionsDAGPtr appendPrewhere(ExpressionActionsChain & chain, bool only_types, const Names & additional_required_columns);
|
||||
|
@ -52,6 +52,15 @@ namespace ErrorCodes
|
||||
extern const int ATTEMPT_TO_READ_AFTER_EOF;
|
||||
extern const int EMPTY_DATA_PASSED;
|
||||
extern const int INCORRECT_FILE_NAME;
|
||||
extern const int MEMORY_LIMIT_EXCEEDED;
|
||||
extern const int DISTRIBUTED_BROKEN_BATCH_INFO;
|
||||
extern const int DISTRIBUTED_BROKEN_BATCH_FILES;
|
||||
extern const int TOO_MANY_PARTS;
|
||||
extern const int TOO_MANY_BYTES;
|
||||
extern const int TOO_MANY_ROWS_OR_BYTES;
|
||||
extern const int TOO_MANY_PARTITIONS;
|
||||
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
|
||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||
}
|
||||
|
||||
|
||||
@ -227,9 +236,27 @@ namespace
|
||||
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|
||||
|| code == ErrorCodes::UNKNOWN_CODEC
|
||||
|| code == ErrorCodes::CANNOT_DECOMPRESS
|
||||
|| code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO
|
||||
|| code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES
|
||||
|| (!remote_error && code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
|
||||
}
|
||||
|
||||
/// Can the batch be split and send files from batch one-by-one instead?
|
||||
bool isSplittableErrorCode(int code, bool remote)
|
||||
{
|
||||
return code == ErrorCodes::MEMORY_LIMIT_EXCEEDED
|
||||
/// FunctionRange::max_elements and similar
|
||||
|| code == ErrorCodes::ARGUMENT_OUT_OF_BOUND
|
||||
|| code == ErrorCodes::TOO_MANY_PARTS
|
||||
|| code == ErrorCodes::TOO_MANY_BYTES
|
||||
|| code == ErrorCodes::TOO_MANY_ROWS_OR_BYTES
|
||||
|| code == ErrorCodes::TOO_MANY_PARTITIONS
|
||||
|| code == ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES
|
||||
|| code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO
|
||||
|| isFileBrokenErrorCode(code, remote)
|
||||
;
|
||||
}
|
||||
|
||||
SyncGuardPtr getDirectorySyncGuard(bool dir_fsync, const DiskPtr & disk, const String & path)
|
||||
{
|
||||
if (dir_fsync)
|
||||
@ -319,6 +346,7 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
|
||||
, relative_path(relative_path_)
|
||||
, path(fs::path(disk->getPath()) / relative_path / "")
|
||||
, should_batch_inserts(storage.getContext()->getSettingsRef().distributed_directory_monitor_batch_inserts)
|
||||
, split_batch_on_failure(storage.getContext()->getSettingsRef().distributed_directory_monitor_split_batch_on_failure)
|
||||
, dir_fsync(storage.getDistributedSettingsRef().fsync_directories)
|
||||
, min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows)
|
||||
, min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes)
|
||||
@ -642,6 +670,7 @@ struct StorageDistributedDirectoryMonitor::Batch
|
||||
StorageDistributedDirectoryMonitor & parent;
|
||||
const std::map<UInt64, String> & file_index_to_path;
|
||||
|
||||
bool split_batch_on_failure = true;
|
||||
bool fsync = false;
|
||||
bool dir_fsync = false;
|
||||
|
||||
@ -650,6 +679,7 @@ struct StorageDistributedDirectoryMonitor::Batch
|
||||
const std::map<UInt64, String> & file_index_to_path_)
|
||||
: parent(parent_)
|
||||
, file_index_to_path(file_index_to_path_)
|
||||
, split_batch_on_failure(parent.split_batch_on_failure)
|
||||
, fsync(parent.storage.getDistributedSettingsRef().fsync_after_insert)
|
||||
, dir_fsync(parent.dir_fsync)
|
||||
{}
|
||||
@ -703,37 +733,23 @@ struct StorageDistributedDirectoryMonitor::Batch
|
||||
auto connection = parent.pool->get(timeouts);
|
||||
|
||||
bool batch_broken = false;
|
||||
bool batch_marked_as_broken = false;
|
||||
try
|
||||
{
|
||||
std::unique_ptr<RemoteBlockOutputStream> remote;
|
||||
|
||||
for (UInt64 file_idx : file_indices)
|
||||
try
|
||||
{
|
||||
auto file_path = file_index_to_path.find(file_idx);
|
||||
if (file_path == file_index_to_path.end())
|
||||
{
|
||||
LOG_ERROR(parent.log, "Failed to send batch: file with index {} is absent", file_idx);
|
||||
batch_broken = true;
|
||||
break;
|
||||
}
|
||||
|
||||
ReadBufferFromFile in(file_path->second);
|
||||
const auto & distributed_header = readDistributedHeader(in, parent.log);
|
||||
|
||||
if (!remote)
|
||||
{
|
||||
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts,
|
||||
distributed_header.insert_query,
|
||||
distributed_header.insert_settings,
|
||||
distributed_header.client_info);
|
||||
remote->writePrefix();
|
||||
}
|
||||
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
|
||||
writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log);
|
||||
sendBatch(*connection, timeouts);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
if (split_batch_on_failure && isSplittableErrorCode(e.code(), e.isRemoteException()))
|
||||
{
|
||||
tryLogCurrentException(parent.log, "Trying to split batch due to");
|
||||
sendSeparateFiles(*connection, timeouts);
|
||||
}
|
||||
else
|
||||
throw;
|
||||
}
|
||||
|
||||
if (remote)
|
||||
remote->writeSuffix();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
@ -741,6 +757,8 @@ struct StorageDistributedDirectoryMonitor::Batch
|
||||
{
|
||||
tryLogCurrentException(parent.log, "Failed to send batch due to");
|
||||
batch_broken = true;
|
||||
if (!e.isRemoteException() && e.code() == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES)
|
||||
batch_marked_as_broken = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -761,7 +779,7 @@ struct StorageDistributedDirectoryMonitor::Batch
|
||||
for (UInt64 file_index : file_indices)
|
||||
parent.markAsSend(file_index_to_path.at(file_index));
|
||||
}
|
||||
else
|
||||
else if (!batch_marked_as_broken)
|
||||
{
|
||||
LOG_ERROR(parent.log, "Marking a batch of {} files as broken.", file_indices.size());
|
||||
|
||||
@ -797,6 +815,78 @@ struct StorageDistributedDirectoryMonitor::Batch
|
||||
}
|
||||
recovered = true;
|
||||
}
|
||||
|
||||
private:
|
||||
void sendBatch(Connection & connection, const ConnectionTimeouts & timeouts)
|
||||
{
|
||||
std::unique_ptr<RemoteBlockOutputStream> remote;
|
||||
|
||||
for (UInt64 file_idx : file_indices)
|
||||
{
|
||||
auto file_path = file_index_to_path.find(file_idx);
|
||||
if (file_path == file_index_to_path.end())
|
||||
throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO,
|
||||
"Failed to send batch: file with index {} is absent", file_idx);
|
||||
|
||||
ReadBufferFromFile in(file_path->second);
|
||||
const auto & distributed_header = readDistributedHeader(in, parent.log);
|
||||
|
||||
if (!remote)
|
||||
{
|
||||
remote = std::make_unique<RemoteBlockOutputStream>(connection, timeouts,
|
||||
distributed_header.insert_query,
|
||||
distributed_header.insert_settings,
|
||||
distributed_header.client_info);
|
||||
remote->writePrefix();
|
||||
}
|
||||
bool compression_expected = connection.getCompression() == Protocol::Compression::Enable;
|
||||
writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log);
|
||||
}
|
||||
|
||||
if (remote)
|
||||
remote->writeSuffix();
|
||||
}
|
||||
|
||||
void sendSeparateFiles(Connection & connection, const ConnectionTimeouts & timeouts)
|
||||
{
|
||||
size_t broken_files = 0;
|
||||
|
||||
for (UInt64 file_idx : file_indices)
|
||||
{
|
||||
auto file_path = file_index_to_path.find(file_idx);
|
||||
if (file_path == file_index_to_path.end())
|
||||
{
|
||||
LOG_ERROR(parent.log, "Failed to send one file from batch: file with index {} is absent", file_idx);
|
||||
++broken_files;
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
ReadBufferFromFile in(file_path->second);
|
||||
const auto & distributed_header = readDistributedHeader(in, parent.log);
|
||||
|
||||
RemoteBlockOutputStream remote(connection, timeouts,
|
||||
distributed_header.insert_query,
|
||||
distributed_header.insert_settings,
|
||||
distributed_header.client_info);
|
||||
remote.writePrefix();
|
||||
bool compression_expected = connection.getCompression() == Protocol::Compression::Enable;
|
||||
writeRemoteConvert(distributed_header, remote, compression_expected, in, parent.log);
|
||||
remote.writeSuffix();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage(fmt::format("While sending {}", file_path->second));
|
||||
parent.maybeMarkAsBroken(file_path->second, e);
|
||||
++broken_files;
|
||||
}
|
||||
}
|
||||
|
||||
if (broken_files)
|
||||
throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES,
|
||||
"Failed to send {} files", broken_files);
|
||||
}
|
||||
};
|
||||
|
||||
class DirectoryMonitorBlockInputStream : public IBlockInputStream
|
||||
|
@ -92,6 +92,7 @@ private:
|
||||
std::string path;
|
||||
|
||||
const bool should_batch_inserts = false;
|
||||
const bool split_batch_on_failure = true;
|
||||
const bool dir_fsync = false;
|
||||
const size_t min_batched_block_size_rows = 0;
|
||||
const size_t min_batched_block_size_bytes = 0;
|
||||
|
@ -752,7 +752,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
|
||||
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
|
||||
for (const auto & dir_name : dir_names)
|
||||
{
|
||||
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
|
||||
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name, /* startup= */ false);
|
||||
directory_monitor.addAndSchedule(file_size, sleep_ms.totalMilliseconds());
|
||||
}
|
||||
}
|
||||
|
@ -49,7 +49,6 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int INCORRECT_QUERY;
|
||||
extern const int TABLE_WAS_NOT_DROPPED;
|
||||
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW;
|
||||
@ -82,9 +81,8 @@ static StorageID extractDependentTable(ASTPtr & query, ContextPtr context, const
|
||||
{
|
||||
auto * ast_select = subquery->as<ASTSelectWithUnionQuery>();
|
||||
if (!ast_select)
|
||||
throw Exception("Logical error while creating StorageLiveView."
|
||||
" Could not retrieve table name from select query.",
|
||||
DB::ErrorCodes::LOGICAL_ERROR);
|
||||
throw Exception("LIVE VIEWs are only supported for queries from tables, but there is no table name in select query.",
|
||||
DB::ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
|
||||
if (ast_select->list_of_selects->children.size() != 1)
|
||||
throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
|
||||
|
||||
|
@ -25,7 +25,8 @@ IBackgroundJobExecutor::IBackgroundJobExecutor(
|
||||
{
|
||||
for (const auto & pool_config : pools_configs_)
|
||||
{
|
||||
pools.try_emplace(pool_config.pool_type, pool_config.max_pool_size, 0, pool_config.max_pool_size, false);
|
||||
const auto max_pool_size = pool_config.get_max_pool_size();
|
||||
pools.try_emplace(pool_config.pool_type, max_pool_size, 0, max_pool_size, false);
|
||||
pools_configs.emplace(pool_config.pool_type, pool_config);
|
||||
}
|
||||
}
|
||||
@ -82,67 +83,64 @@ bool incrementMetricIfLessThanMax(std::atomic<Int64> & atomic_value, Int64 max_v
|
||||
|
||||
}
|
||||
|
||||
void IBackgroundJobExecutor::jobExecutingTask()
|
||||
void IBackgroundJobExecutor::execute(JobAndPool job_and_pool)
|
||||
try
|
||||
{
|
||||
auto job_and_pool = getBackgroundJob();
|
||||
if (job_and_pool) /// If we have job, then try to assign into background pool
|
||||
auto & pool_config = pools_configs[job_and_pool.pool_type];
|
||||
const auto max_pool_size = pool_config.get_max_pool_size();
|
||||
|
||||
/// If corresponding pool is not full increment metric and assign new job
|
||||
if (incrementMetricIfLessThanMax(CurrentMetrics::values[pool_config.tasks_metric], max_pool_size))
|
||||
{
|
||||
auto & pool_config = pools_configs[job_and_pool->pool_type];
|
||||
/// If corresponding pool is not full increment metric and assign new job
|
||||
if (incrementMetricIfLessThanMax(CurrentMetrics::values[pool_config.tasks_metric], pool_config.max_pool_size))
|
||||
try /// this try required because we have to manually decrement metric
|
||||
{
|
||||
try /// this try required because we have to manually decrement metric
|
||||
/// Synchronize pool size, because config could be reloaded
|
||||
pools[job_and_pool.pool_type].setMaxThreads(max_pool_size);
|
||||
pools[job_and_pool.pool_type].setQueueSize(max_pool_size);
|
||||
|
||||
pools[job_and_pool.pool_type].scheduleOrThrowOnError([this, pool_config, job{std::move(job_and_pool.job)}] ()
|
||||
{
|
||||
pools[job_and_pool->pool_type].scheduleOrThrowOnError([this, pool_config, job{std::move(job_and_pool->job)}] ()
|
||||
try /// We don't want exceptions in background pool
|
||||
{
|
||||
try /// We don't want exceptions in background pool
|
||||
bool job_success = job();
|
||||
/// Job done, decrement metric and reset no_work counter
|
||||
CurrentMetrics::values[pool_config.tasks_metric]--;
|
||||
|
||||
if (job_success)
|
||||
{
|
||||
bool job_success = job();
|
||||
/// Job done, decrement metric and reset no_work counter
|
||||
CurrentMetrics::values[pool_config.tasks_metric]--;
|
||||
|
||||
if (job_success)
|
||||
{
|
||||
/// Job done, new empty space in pool, schedule background task
|
||||
runTaskWithoutDelay();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Job done, but failed, schedule with backoff
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
|
||||
/// Job done, new empty space in pool, schedule background task
|
||||
runTaskWithoutDelay();
|
||||
}
|
||||
catch (...)
|
||||
else
|
||||
{
|
||||
CurrentMetrics::values[pool_config.tasks_metric]--;
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
/// Job done, but failed, schedule with backoff
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
});
|
||||
/// We've scheduled task in the background pool and when it will finish we will be triggered again. But this task can be
|
||||
/// extremely long and we may have a lot of other small tasks to do, so we schedule ourselves here.
|
||||
runTaskWithoutDelay();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// With our Pool settings scheduleOrThrowOnError shouldn't throw exceptions, but for safety catch added here
|
||||
CurrentMetrics::values[pool_config.tasks_metric]--;
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
}
|
||||
else /// Pool is full and we have some work to do
|
||||
{
|
||||
scheduleTask(/* with_backoff = */ false);
|
||||
}
|
||||
}
|
||||
else /// Nothing to do, no jobs
|
||||
{
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
CurrentMetrics::values[pool_config.tasks_metric]--;
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
});
|
||||
/// We've scheduled task in the background pool and when it will finish we will be triggered again. But this task can be
|
||||
/// extremely long and we may have a lot of other small tasks to do, so we schedule ourselves here.
|
||||
runTaskWithoutDelay();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// With our Pool settings scheduleOrThrowOnError shouldn't throw exceptions, but for safety catch added here
|
||||
CurrentMetrics::values[pool_config.tasks_metric]--;
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
}
|
||||
else /// Pool is full and we have some work to do
|
||||
{
|
||||
scheduleTask(/* with_backoff = */ false);
|
||||
}
|
||||
}
|
||||
catch (...) /// Exception while we looking for a task, reschedule
|
||||
{
|
||||
@ -156,7 +154,7 @@ void IBackgroundJobExecutor::start()
|
||||
if (!scheduling_task)
|
||||
{
|
||||
scheduling_task = getContext()->getSchedulePool().createTask(
|
||||
getBackgroundTaskName(), [this]{ jobExecutingTask(); });
|
||||
getBackgroundTaskName(), [this]{ backgroundTaskFunction(); });
|
||||
}
|
||||
|
||||
scheduling_task->activateAndSchedule();
|
||||
@ -180,6 +178,12 @@ void IBackgroundJobExecutor::triggerTask()
|
||||
scheduling_task->schedule();
|
||||
}
|
||||
|
||||
void IBackgroundJobExecutor::backgroundTaskFunction()
|
||||
{
|
||||
if (!scheduleJob())
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
|
||||
IBackgroundJobExecutor::~IBackgroundJobExecutor()
|
||||
{
|
||||
finish();
|
||||
@ -191,8 +195,19 @@ BackgroundJobsExecutor::BackgroundJobsExecutor(
|
||||
: IBackgroundJobExecutor(
|
||||
global_context_,
|
||||
global_context_->getBackgroundProcessingTaskSchedulingSettings(),
|
||||
{PoolConfig{PoolType::MERGE_MUTATE, global_context_->getSettingsRef().background_pool_size, CurrentMetrics::BackgroundPoolTask},
|
||||
PoolConfig{PoolType::FETCH, global_context_->getSettingsRef().background_fetches_pool_size, CurrentMetrics::BackgroundFetchesPoolTask}})
|
||||
{PoolConfig
|
||||
{
|
||||
.pool_type = PoolType::MERGE_MUTATE,
|
||||
.get_max_pool_size = [global_context_] () { return global_context_->getSettingsRef().background_pool_size; },
|
||||
.tasks_metric = CurrentMetrics::BackgroundPoolTask
|
||||
},
|
||||
PoolConfig
|
||||
{
|
||||
.pool_type = PoolType::FETCH,
|
||||
.get_max_pool_size = [global_context_] () { return global_context_->getSettingsRef().background_fetches_pool_size; },
|
||||
.tasks_metric = CurrentMetrics::BackgroundFetchesPoolTask
|
||||
}
|
||||
})
|
||||
, data(data_)
|
||||
{
|
||||
}
|
||||
@ -202,9 +217,9 @@ String BackgroundJobsExecutor::getBackgroundTaskName() const
|
||||
return data.getStorageID().getFullTableName() + " (dataProcessingTask)";
|
||||
}
|
||||
|
||||
std::optional<JobAndPool> BackgroundJobsExecutor::getBackgroundJob()
|
||||
bool BackgroundJobsExecutor::scheduleJob()
|
||||
{
|
||||
return data.getDataProcessingJob();
|
||||
return data.scheduleDataProcessingJob(*this);
|
||||
}
|
||||
|
||||
BackgroundMovesExecutor::BackgroundMovesExecutor(
|
||||
@ -213,7 +228,13 @@ BackgroundMovesExecutor::BackgroundMovesExecutor(
|
||||
: IBackgroundJobExecutor(
|
||||
global_context_,
|
||||
global_context_->getBackgroundMoveTaskSchedulingSettings(),
|
||||
{PoolConfig{PoolType::MOVE, global_context_->getSettingsRef().background_move_pool_size, CurrentMetrics::BackgroundMovePoolTask}})
|
||||
{PoolConfig
|
||||
{
|
||||
.pool_type = PoolType::MOVE,
|
||||
.get_max_pool_size = [global_context_] () { return global_context_->getSettingsRef().background_move_pool_size; },
|
||||
.tasks_metric = CurrentMetrics::BackgroundMovePoolTask
|
||||
}
|
||||
})
|
||||
, data(data_)
|
||||
{
|
||||
}
|
||||
@ -223,9 +244,9 @@ String BackgroundMovesExecutor::getBackgroundTaskName() const
|
||||
return data.getStorageID().getFullTableName() + " (dataMovingTask)";
|
||||
}
|
||||
|
||||
std::optional<JobAndPool> BackgroundMovesExecutor::getBackgroundJob()
|
||||
bool BackgroundMovesExecutor::scheduleJob()
|
||||
{
|
||||
return data.getDataMovingJob();
|
||||
return data.scheduleDataMovingJob(*this);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ protected:
|
||||
/// This pool type
|
||||
PoolType pool_type;
|
||||
/// Max pool size in threads
|
||||
size_t max_pool_size;
|
||||
const std::function<size_t()> get_max_pool_size;
|
||||
/// Metric that we have to increment when we execute task in this pool
|
||||
CurrentMetrics::Metric tasks_metric;
|
||||
};
|
||||
@ -99,6 +99,9 @@ public:
|
||||
/// Finish execution: deactivate background task and wait already scheduled jobs
|
||||
void finish();
|
||||
|
||||
/// Executes job in a nested pool
|
||||
void execute(JobAndPool job_and_pool);
|
||||
|
||||
/// Just call finish
|
||||
virtual ~IBackgroundJobExecutor();
|
||||
|
||||
@ -110,12 +113,13 @@ protected:
|
||||
|
||||
/// Name for task in background schedule pool
|
||||
virtual String getBackgroundTaskName() const = 0;
|
||||
/// Get job for background execution
|
||||
virtual std::optional<JobAndPool> getBackgroundJob() = 0;
|
||||
|
||||
/// Schedules a job in a nested pool in this class.
|
||||
virtual bool scheduleJob() = 0;
|
||||
|
||||
private:
|
||||
/// Function that executes in background scheduling pool
|
||||
void jobExecutingTask();
|
||||
void backgroundTaskFunction();
|
||||
/// Recalculate timeouts when we have to check for a new job
|
||||
void scheduleTask(bool with_backoff);
|
||||
/// Run background task as fast as possible and reset errors counter
|
||||
@ -136,7 +140,7 @@ public:
|
||||
|
||||
protected:
|
||||
String getBackgroundTaskName() const override;
|
||||
std::optional<JobAndPool> getBackgroundJob() override;
|
||||
bool scheduleJob() override;
|
||||
};
|
||||
|
||||
/// Move jobs executor, move parts between disks in the background
|
||||
@ -152,7 +156,7 @@ public:
|
||||
|
||||
protected:
|
||||
String getBackgroundTaskName() const override;
|
||||
std::optional<JobAndPool> getBackgroundJob() override;
|
||||
bool scheduleJob() override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
@ -430,8 +431,14 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P
|
||||
block.erase(prewhere_info->prewhere_column_name);
|
||||
else
|
||||
{
|
||||
auto & ctn = block.getByName(prewhere_info->prewhere_column_name);
|
||||
ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
|
||||
WhichDataType which(removeNullable(recursiveRemoveLowCardinality(prewhere_column.type)));
|
||||
if (which.isInt() || which.isUInt())
|
||||
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
|
||||
else if (which.isFloat())
|
||||
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), 1.0f)->convertToFullColumnIfConst();
|
||||
else
|
||||
throw Exception("Illegal type " + prewhere_column.type->getName() + " of column for filter.",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4624,19 +4624,20 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger()
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<JobAndPool> MergeTreeData::getDataMovingJob()
|
||||
bool MergeTreeData::scheduleDataMovingJob(IBackgroundJobExecutor & executor)
|
||||
{
|
||||
if (parts_mover.moves_blocker.isCancelled())
|
||||
return {};
|
||||
return false;
|
||||
|
||||
auto moving_tagger = selectPartsForMove();
|
||||
if (moving_tagger->parts_to_move.empty())
|
||||
return {};
|
||||
return false;
|
||||
|
||||
return JobAndPool{[this, moving_tagger] () mutable
|
||||
executor.execute({[this, moving_tagger] () mutable
|
||||
{
|
||||
return moveParts(moving_tagger);
|
||||
}, PoolType::MOVE};
|
||||
}, PoolType::MOVE});
|
||||
return true;
|
||||
}
|
||||
|
||||
bool MergeTreeData::areBackgroundMovesNeeded() const
|
||||
|
@ -57,6 +57,7 @@ class ExpressionActions;
|
||||
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||
using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
|
||||
class MergeTreeDeduplicationLog;
|
||||
class IBackgroundJobExecutor;
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
@ -807,10 +808,10 @@ public:
|
||||
|
||||
PinnedPartUUIDsPtr getPinnedPartUUIDs() const;
|
||||
|
||||
/// Return main processing background job, like merge/mutate/fetch and so on
|
||||
virtual std::optional<JobAndPool> getDataProcessingJob() = 0;
|
||||
/// Return job to move parts between disks/volumes and so on.
|
||||
std::optional<JobAndPool> getDataMovingJob();
|
||||
/// Schedules background job to like merge/mutate/fetch an executor
|
||||
virtual bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) = 0;
|
||||
/// Schedules job to move parts between disks/volumes and so on.
|
||||
bool scheduleDataMovingJob(IBackgroundJobExecutor & executor);
|
||||
bool areBackgroundMovesNeeded() const;
|
||||
|
||||
/// Lock part in zookeeper for use common S3 data in several nodes
|
||||
|
@ -800,12 +800,33 @@ void StorageDistributed::startup()
|
||||
if (!storage_policy)
|
||||
return;
|
||||
|
||||
for (const DiskPtr & disk : data_volume->getDisks())
|
||||
createDirectoryMonitors(disk);
|
||||
const auto & disks = data_volume->getDisks();
|
||||
|
||||
for (const String & path : getDataPaths())
|
||||
/// Make initialization for large number of disks parallel.
|
||||
ThreadPool pool(disks.size());
|
||||
|
||||
for (const DiskPtr & disk : disks)
|
||||
{
|
||||
pool.scheduleOrThrowOnError([&]()
|
||||
{
|
||||
createDirectoryMonitors(disk);
|
||||
});
|
||||
}
|
||||
pool.wait();
|
||||
|
||||
const auto & paths = getDataPaths();
|
||||
std::vector<UInt64> last_increment(paths.size());
|
||||
for (size_t i = 0; i < paths.size(); ++i)
|
||||
{
|
||||
pool.scheduleOrThrowOnError([&, i]()
|
||||
{
|
||||
last_increment[i] = getMaximumFileNumber(paths[i]);
|
||||
});
|
||||
}
|
||||
pool.wait();
|
||||
|
||||
for (const auto inc : last_increment)
|
||||
{
|
||||
UInt64 inc = getMaximumFileNumber(path);
|
||||
if (inc > file_names_increment.value)
|
||||
file_names_increment.value.store(inc);
|
||||
}
|
||||
@ -907,30 +928,50 @@ void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk)
|
||||
}
|
||||
else
|
||||
{
|
||||
requireDirectoryMonitor(disk, dir_path.filename().string());
|
||||
requireDirectoryMonitor(disk, dir_path.filename().string(), /* startup= */ true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name)
|
||||
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup)
|
||||
{
|
||||
const std::string & disk_path = disk->getPath();
|
||||
const std::string key(disk_path + name);
|
||||
|
||||
std::lock_guard lock(cluster_nodes_mutex);
|
||||
auto & node_data = cluster_nodes_data[key];
|
||||
if (!node_data.directory_monitor)
|
||||
auto create_node_data = [&]()
|
||||
{
|
||||
node_data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
|
||||
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
|
||||
ClusterNodeData data;
|
||||
data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
|
||||
data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
|
||||
*this, disk, relative_data_path + name,
|
||||
node_data.connection_pool,
|
||||
data.connection_pool,
|
||||
monitors_blocker,
|
||||
getContext()->getDistributedSchedulePool());
|
||||
return data;
|
||||
};
|
||||
|
||||
/// In case of startup the lock can be acquired later.
|
||||
if (startup)
|
||||
{
|
||||
auto tmp_node_data = create_node_data();
|
||||
std::lock_guard lock(cluster_nodes_mutex);
|
||||
auto & node_data = cluster_nodes_data[key];
|
||||
assert(!node_data.directory_monitor);
|
||||
node_data = std::move(tmp_node_data);
|
||||
return *node_data.directory_monitor;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::lock_guard lock(cluster_nodes_mutex);
|
||||
auto & node_data = cluster_nodes_data[key];
|
||||
if (!node_data.directory_monitor)
|
||||
{
|
||||
node_data = create_node_data();
|
||||
}
|
||||
return *node_data.directory_monitor;
|
||||
}
|
||||
return *node_data.directory_monitor;
|
||||
}
|
||||
|
||||
std::vector<StorageDistributedDirectoryMonitor::Status> StorageDistributed::getDirectoryMonitorsStatuses() const
|
||||
|
@ -160,7 +160,7 @@ private:
|
||||
/// create directory monitors for each existing subdirectory
|
||||
void createDirectoryMonitors(const DiskPtr & disk);
|
||||
/// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name
|
||||
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name);
|
||||
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup);
|
||||
|
||||
/// Return list of metrics for all created monitors
|
||||
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)
|
||||
|
@ -1001,13 +1001,13 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn
|
||||
return true;
|
||||
}
|
||||
|
||||
std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob() //-V657
|
||||
bool StorageMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & executor) //-V657
|
||||
{
|
||||
if (shutdown_called)
|
||||
return {};
|
||||
return false;
|
||||
|
||||
if (merger_mutator.merges_blocker.isCancelled())
|
||||
return {};
|
||||
return false;
|
||||
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
std::shared_ptr<MergeMutateSelectedEntry> merge_entry, mutate_entry;
|
||||
@ -1017,21 +1017,25 @@ std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob() //-V657
|
||||
if (!merge_entry)
|
||||
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock);
|
||||
|
||||
if (merge_entry || mutate_entry)
|
||||
if (merge_entry)
|
||||
{
|
||||
return JobAndPool{[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
|
||||
executor.execute({[this, metadata_snapshot, merge_entry, share_lock] () mutable
|
||||
{
|
||||
if (merge_entry)
|
||||
return mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock);
|
||||
else if (mutate_entry)
|
||||
return mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
|
||||
|
||||
__builtin_unreachable();
|
||||
}, PoolType::MERGE_MUTATE};
|
||||
return mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock);
|
||||
}, PoolType::MERGE_MUTATE});
|
||||
return true;
|
||||
}
|
||||
if (mutate_entry)
|
||||
{
|
||||
executor.execute({[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
|
||||
{
|
||||
return mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
|
||||
}, PoolType::MERGE_MUTATE});
|
||||
return true;
|
||||
}
|
||||
else if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
|
||||
{
|
||||
return JobAndPool{[this, share_lock] ()
|
||||
executor.execute({[this, share_lock] ()
|
||||
{
|
||||
/// All use relative_data_path which changes during rename
|
||||
/// so execute under share lock.
|
||||
@ -1041,9 +1045,10 @@ std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob() //-V657
|
||||
clearOldMutations();
|
||||
clearEmptyParts();
|
||||
return true;
|
||||
}, PoolType::MERGE_MUTATE};
|
||||
}, PoolType::MERGE_MUTATE});
|
||||
return true;
|
||||
}
|
||||
return {};
|
||||
return false;
|
||||
}
|
||||
|
||||
Int64 StorageMergeTree::getCurrentMutationVersion(
|
||||
|
@ -94,7 +94,7 @@ public:
|
||||
|
||||
CheckResults checkData(const ASTPtr & query, ContextPtr context) override;
|
||||
|
||||
std::optional<JobAndPool> getDataProcessingJob() override;
|
||||
bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) override;
|
||||
|
||||
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }
|
||||
private:
|
||||
|
@ -562,6 +562,14 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
|
||||
throw Exception(ErrorCodes::UNFINISHED, "Mutation {} was killed, manually removed or table was dropped", mutation_id);
|
||||
}
|
||||
|
||||
if (partial_shutdown_called)
|
||||
throw Exception("Mutation is not finished because table shutdown was called. It will be done after table restart.",
|
||||
ErrorCodes::UNFINISHED);
|
||||
|
||||
/// Replica inactive, don't check mutation status
|
||||
if (!inactive_replicas.empty() && inactive_replicas.count(replica))
|
||||
continue;
|
||||
|
||||
/// At least we have our current mutation
|
||||
std::set<String> mutation_ids;
|
||||
mutation_ids.insert(mutation_id);
|
||||
@ -570,10 +578,6 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
|
||||
/// they will happen on each replica, so we can check only in-memory info.
|
||||
auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id, &mutation_ids);
|
||||
checkMutationStatus(mutation_status, mutation_ids);
|
||||
|
||||
if (partial_shutdown_called)
|
||||
throw Exception("Mutation is not finished because table shutdown was called. It will be done after table restart.",
|
||||
ErrorCodes::UNFINISHED);
|
||||
}
|
||||
|
||||
if (!inactive_replicas.empty())
|
||||
@ -3161,30 +3165,35 @@ bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::Sel
|
||||
});
|
||||
}
|
||||
|
||||
std::optional<JobAndPool> StorageReplicatedMergeTree::getDataProcessingJob()
|
||||
bool StorageReplicatedMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & executor)
|
||||
{
|
||||
/// If replication queue is stopped exit immediately as we successfully executed the task
|
||||
if (queue.actions_blocker.isCancelled())
|
||||
return {};
|
||||
return false;
|
||||
|
||||
/// This object will mark the element of the queue as running.
|
||||
ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry = selectQueueEntry();
|
||||
|
||||
if (!selected_entry)
|
||||
return {};
|
||||
|
||||
PoolType pool_type;
|
||||
return false;
|
||||
|
||||
/// Depending on entry type execute in fetches (small) pool or big merge_mutate pool
|
||||
if (selected_entry->log_entry->type == LogEntry::GET_PART)
|
||||
pool_type = PoolType::FETCH;
|
||||
else
|
||||
pool_type = PoolType::MERGE_MUTATE;
|
||||
|
||||
return JobAndPool{[this, selected_entry] () mutable
|
||||
{
|
||||
return processQueueEntry(selected_entry);
|
||||
}, pool_type};
|
||||
executor.execute({[this, selected_entry] () mutable
|
||||
{
|
||||
return processQueueEntry(selected_entry);
|
||||
}, PoolType::FETCH});
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
executor.execute({[this, selected_entry] () mutable
|
||||
{
|
||||
return processQueueEntry(selected_entry);
|
||||
}, PoolType::MERGE_MUTATE});
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -215,8 +215,8 @@ public:
|
||||
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
|
||||
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
|
||||
|
||||
/// Get job to execute in background pool (merge, mutate, drop range and so on)
|
||||
std::optional<JobAndPool> getDataProcessingJob() override;
|
||||
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
|
||||
bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) override;
|
||||
|
||||
/// Checks that fetches are not disabled with action blocker and pool for fetches
|
||||
/// is not overloaded
|
||||
|
@ -31,7 +31,7 @@ NamesAndTypesList StorageSystemClusters::getNamesAndTypes()
|
||||
|
||||
void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
{
|
||||
for (const auto & name_and_cluster : context->getClusters().getContainer())
|
||||
for (const auto & name_and_cluster : context->getClusters()->getContainer())
|
||||
writeCluster(res_columns, name_and_cluster);
|
||||
|
||||
const auto databases = DatabaseCatalog::instance().getDatabases();
|
||||
|
@ -14,6 +14,7 @@ const char * auto_contributors[] {
|
||||
"achulkov2",
|
||||
"adevyatova",
|
||||
"ageraab",
|
||||
"Ahmed Dardery",
|
||||
"akazz",
|
||||
"Akazz",
|
||||
"akonyaev",
|
||||
@ -81,8 +82,10 @@ const char * auto_contributors[] {
|
||||
"Aliaksandr Pliutau",
|
||||
"Aliaksandr Shylau",
|
||||
"Ali Demirci",
|
||||
"Alina Terekhova",
|
||||
"amesaru",
|
||||
"Amesaru",
|
||||
"Amir Vaza",
|
||||
"Amos Bird",
|
||||
"amoschen",
|
||||
"amudong",
|
||||
@ -153,6 +156,7 @@ const char * auto_contributors[] {
|
||||
"Artur Beglaryan",
|
||||
"AsiaKorushkina",
|
||||
"asiana21",
|
||||
"atereh",
|
||||
"Atri Sharma",
|
||||
"avasiliev",
|
||||
"avogar",
|
||||
@ -182,6 +186,7 @@ const char * auto_contributors[] {
|
||||
"Bogdan Voronin",
|
||||
"BohuTANG",
|
||||
"Bolinov",
|
||||
"BoloniniD",
|
||||
"booknouse",
|
||||
"Boris Granveaud",
|
||||
"Bowen Masco",
|
||||
@ -208,9 +213,11 @@ const char * auto_contributors[] {
|
||||
"Chienlung Cheung",
|
||||
"chou.fan",
|
||||
"Christian",
|
||||
"christophe.kalenzaga",
|
||||
"Ciprian Hacman",
|
||||
"Clement Rodriguez",
|
||||
"Clément Rodriguez",
|
||||
"cn-ds",
|
||||
"Colum",
|
||||
"comunodi",
|
||||
"Constantin S. Pan",
|
||||
@ -264,9 +271,11 @@ const char * auto_contributors[] {
|
||||
"Dmitry Luhtionov",
|
||||
"Dmitry Moskowski",
|
||||
"Dmitry Muzyka",
|
||||
"Dmitry Novik",
|
||||
"Dmitry Petukhov",
|
||||
"Dmitry Rubashkin",
|
||||
"Dmitry S..ky / skype: dvska-at-skype",
|
||||
"Dmitry Ukolov",
|
||||
"Doge",
|
||||
"Dongdong Yang",
|
||||
"DoomzD",
|
||||
@ -322,6 +331,7 @@ const char * auto_contributors[] {
|
||||
"fenglv",
|
||||
"fessmage",
|
||||
"FgoDt",
|
||||
"fibersel",
|
||||
"filimonov",
|
||||
"filipe",
|
||||
"Filipe Caixeta",
|
||||
@ -362,6 +372,7 @@ const char * auto_contributors[] {
|
||||
"Grigory Pervakov",
|
||||
"Guillaume Tassery",
|
||||
"guoleiyi",
|
||||
"Guo Wei (William)",
|
||||
"gyuton",
|
||||
"Haavard Kvaalen",
|
||||
"Habibullah Oladepo",
|
||||
@ -465,8 +476,10 @@ const char * auto_contributors[] {
|
||||
"Konstantin Lebedev",
|
||||
"Konstantin Malanchev",
|
||||
"Konstantin Podshumok",
|
||||
"Korenevskiy Denis",
|
||||
"Korviakov Andrey",
|
||||
"koshachy",
|
||||
"Kostiantyn Storozhuk",
|
||||
"Kozlov Ivan",
|
||||
"kreuzerkrieg",
|
||||
"Kruglov Pavel",
|
||||
@ -537,6 +550,7 @@ const char * auto_contributors[] {
|
||||
"Matwey V. Kornilov",
|
||||
"Max",
|
||||
"Max Akhmedov",
|
||||
"Max Bruce",
|
||||
"maxim",
|
||||
"Maxim Akhmedov",
|
||||
"MaximAL",
|
||||
@ -560,6 +574,7 @@ const char * auto_contributors[] {
|
||||
"melin",
|
||||
"memo",
|
||||
"meo",
|
||||
"meoww-bot",
|
||||
"mergify[bot]",
|
||||
"Metehan Çetinkaya",
|
||||
"Metikov Vadim",
|
||||
@ -581,6 +596,7 @@ const char * auto_contributors[] {
|
||||
"Mike Kot",
|
||||
"mikepop7",
|
||||
"Mikhail",
|
||||
"Mikhail Andreev",
|
||||
"Mikhail Cheshkov",
|
||||
"Mikhail Fandyushin",
|
||||
"Mikhail Filimonov",
|
||||
@ -601,6 +617,7 @@ const char * auto_contributors[] {
|
||||
"MovElb",
|
||||
"Mr.General",
|
||||
"Murat Kabilov",
|
||||
"muzzlerator",
|
||||
"m-ves",
|
||||
"mwish",
|
||||
"MyroTk",
|
||||
@ -614,6 +631,7 @@ const char * auto_contributors[] {
|
||||
"never lee",
|
||||
"NeZeD [Mac Pro]",
|
||||
"nicelulu",
|
||||
"Nickita",
|
||||
"Nickolay Yastrebov",
|
||||
"Nicolae Vartolomei",
|
||||
"Nico Mandery",
|
||||
@ -635,6 +653,7 @@ const char * auto_contributors[] {
|
||||
"Nikolay Degterinsky",
|
||||
"Nikolay Kirsh",
|
||||
"Nikolay Semyachkin",
|
||||
"Nikolay Shcheglov",
|
||||
"Nikolay Vasiliev",
|
||||
"Nikolay Volosatov",
|
||||
"Niu Zhaojie",
|
||||
@ -647,6 +666,7 @@ const char * auto_contributors[] {
|
||||
"Odin Hultgren Van Der Horst",
|
||||
"ogorbacheva",
|
||||
"Okada Haruki",
|
||||
"Oleg Ershov",
|
||||
"Oleg Favstov",
|
||||
"Oleg Komarov",
|
||||
"olegkv",
|
||||
@ -685,6 +705,7 @@ const char * auto_contributors[] {
|
||||
"potya",
|
||||
"Potya",
|
||||
"Pradeep Chhetri",
|
||||
"presto53",
|
||||
"proller",
|
||||
"pufit",
|
||||
"pyos",
|
||||
@ -699,6 +720,8 @@ const char * auto_contributors[] {
|
||||
"Ramazan Polat",
|
||||
"Raúl Marín",
|
||||
"Ravengg",
|
||||
"redclusive",
|
||||
"RedClusive",
|
||||
"RegulusZ",
|
||||
"Reilee",
|
||||
"Reto Kromer",
|
||||
@ -709,6 +732,7 @@ const char * auto_contributors[] {
|
||||
"robot-clickhouse",
|
||||
"robot-metrika-test",
|
||||
"rodrigargar",
|
||||
"Romain Neutron",
|
||||
"roman",
|
||||
"Roman Bug",
|
||||
"Roman Lipovsky",
|
||||
@ -776,6 +800,7 @@ const char * auto_contributors[] {
|
||||
"spongedc",
|
||||
"spyros87",
|
||||
"Stanislav Pavlovichev",
|
||||
"Stas Kelvich",
|
||||
"Stas Pavlovichev",
|
||||
"stavrolia",
|
||||
"Stefan Thies",
|
||||
@ -784,6 +809,7 @@ const char * auto_contributors[] {
|
||||
"stepenhu",
|
||||
"Steve-金勇",
|
||||
"Stig Bakken",
|
||||
"Storozhuk Kostiantyn",
|
||||
"Stupnikov Andrey",
|
||||
"su-houzhen",
|
||||
"sundy",
|
||||
@ -806,6 +832,7 @@ const char * auto_contributors[] {
|
||||
"Tema Novikov",
|
||||
"templarzq",
|
||||
"The-Alchemist",
|
||||
"Tiaonmmn",
|
||||
"tiger.yan",
|
||||
"tison",
|
||||
"TiunovNN",
|
||||
@ -891,6 +918,7 @@ const char * auto_contributors[] {
|
||||
"Xiang Zhou",
|
||||
"xPoSx",
|
||||
"Yağızcan Değirmenci",
|
||||
"yang",
|
||||
"Yangkuan Liu",
|
||||
"yangshuai",
|
||||
"Yatsishin Ilya",
|
||||
@ -906,6 +934,7 @@ const char * auto_contributors[] {
|
||||
"Y Lu",
|
||||
"Yohann Jardin",
|
||||
"yonesko",
|
||||
"yuchuansun",
|
||||
"yuefoo",
|
||||
"yulu86",
|
||||
"yuluxu",
|
||||
@ -935,6 +964,7 @@ const char * auto_contributors[] {
|
||||
"zvvr",
|
||||
"zzsmdfj",
|
||||
"Артем Стрельцов",
|
||||
"Владислав Тихонов",
|
||||
"Георгий Кондратьев",
|
||||
"Дмитрий Канатников",
|
||||
"Иванов Евгений",
|
||||
|
@ -130,8 +130,8 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, Context
|
||||
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
|
||||
zk_exception_code = code;
|
||||
|
||||
const auto & clusters = context->getClusters();
|
||||
for (const auto & name_and_cluster : clusters.getContainer())
|
||||
const auto clusters = context->getClusters();
|
||||
for (const auto & name_and_cluster : clusters->getContainer())
|
||||
{
|
||||
const ClusterPtr & cluster = name_and_cluster.second;
|
||||
const auto & shards_info = cluster->getShardsInfo();
|
||||
|
197
src/Storages/System/StorageSystemDataSkippingIndices.cpp
Normal file
197
src/Storages/System/StorageSystemDataSkippingIndices.cpp
Normal file
@ -0,0 +1,197 @@
|
||||
#include <Storages/System/StorageSystemDataSkippingIndices.h>
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Processors/Sources/SourceWithProgress.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
StorageSystemDataSkippingIndices::StorageSystemDataSkippingIndices(const StorageID & table_id_)
|
||||
: IStorage(table_id_)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(ColumnsDescription(
|
||||
{
|
||||
{ "database", std::make_shared<DataTypeString>() },
|
||||
{ "table", std::make_shared<DataTypeString>() },
|
||||
{ "name", std::make_shared<DataTypeString>() },
|
||||
{ "type", std::make_shared<DataTypeString>() },
|
||||
{ "expr", std::make_shared<DataTypeString>() },
|
||||
{ "granularity", std::make_shared<DataTypeUInt64>() },
|
||||
}));
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
|
||||
class DataSkippingIndicesSource : public SourceWithProgress
|
||||
{
|
||||
public:
|
||||
DataSkippingIndicesSource(
|
||||
std::vector<UInt8> columns_mask_,
|
||||
Block header,
|
||||
UInt64 max_block_size_,
|
||||
ColumnPtr databases_,
|
||||
ContextPtr context_)
|
||||
: SourceWithProgress(header)
|
||||
, column_mask(std::move(columns_mask_))
|
||||
, max_block_size(max_block_size_)
|
||||
, databases(std::move(databases_))
|
||||
, context(Context::createCopy(context_))
|
||||
, database_idx(0)
|
||||
{}
|
||||
|
||||
String getName() const override { return "DataSkippingIndices"; }
|
||||
|
||||
protected:
|
||||
Chunk generate() override
|
||||
{
|
||||
if (database_idx >= databases->size())
|
||||
return {};
|
||||
|
||||
MutableColumns res_columns = getPort().getHeader().cloneEmptyColumns();
|
||||
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
|
||||
size_t rows_count = 0;
|
||||
while (rows_count < max_block_size)
|
||||
{
|
||||
if (tables_it && !tables_it->isValid())
|
||||
++database_idx;
|
||||
|
||||
while (database_idx < databases->size() && (!tables_it || !tables_it->isValid()))
|
||||
{
|
||||
database_name = databases->getDataAt(database_idx).toString();
|
||||
database = DatabaseCatalog::instance().tryGetDatabase(database_name);
|
||||
|
||||
if (database)
|
||||
break;
|
||||
++database_idx;
|
||||
}
|
||||
|
||||
if (database_idx >= databases->size())
|
||||
break;
|
||||
|
||||
if (!tables_it || !tables_it->isValid())
|
||||
tables_it = database->getTablesIterator(context);
|
||||
|
||||
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, database_name);
|
||||
|
||||
for (; rows_count < max_block_size && tables_it->isValid(); tables_it->next())
|
||||
{
|
||||
auto table_name = tables_it->name();
|
||||
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, database_name, table_name))
|
||||
continue;
|
||||
|
||||
const auto table = tables_it->table();
|
||||
if (!table)
|
||||
continue;
|
||||
StorageMetadataPtr metadata_snapshot = table->getInMemoryMetadataPtr();
|
||||
if (!metadata_snapshot)
|
||||
continue;
|
||||
const auto indices = metadata_snapshot->getSecondaryIndices();
|
||||
|
||||
for (const auto & index : indices)
|
||||
{
|
||||
++rows_count;
|
||||
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
|
||||
// 'database' column
|
||||
if (column_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database_name);
|
||||
// 'table' column
|
||||
if (column_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table_name);
|
||||
// 'name' column
|
||||
if (column_mask[src_index++])
|
||||
res_columns[res_index++]->insert(index.name);
|
||||
// 'type' column
|
||||
if (column_mask[src_index++])
|
||||
res_columns[res_index++]->insert(index.type);
|
||||
// 'expr' column
|
||||
if (column_mask[src_index++])
|
||||
{
|
||||
if (auto expression = index.expression_list_ast)
|
||||
res_columns[res_index++]->insert(queryToString(expression));
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
// 'granularity' column
|
||||
if (column_mask[src_index++])
|
||||
res_columns[res_index++]->insert(index.granularity);
|
||||
}
|
||||
}
|
||||
}
|
||||
return Chunk(std::move(res_columns), rows_count);
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<UInt8> column_mask;
|
||||
UInt64 max_block_size;
|
||||
ColumnPtr databases;
|
||||
ContextPtr context;
|
||||
size_t database_idx;
|
||||
DatabasePtr database;
|
||||
std::string database_name;
|
||||
DatabaseTablesIteratorPtr tables_it;
|
||||
};
|
||||
|
||||
Pipe StorageSystemDataSkippingIndices::read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum /* processed_stage */,
|
||||
size_t max_block_size,
|
||||
unsigned int /* num_streams */)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
|
||||
NameSet names_set(column_names.begin(), column_names.end());
|
||||
|
||||
Block sample_block = metadata_snapshot->getSampleBlock();
|
||||
Block header;
|
||||
|
||||
std::vector<UInt8> columns_mask(sample_block.columns());
|
||||
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
|
||||
{
|
||||
if (names_set.count(sample_block.getByPosition(i).name))
|
||||
{
|
||||
columns_mask[i] = 1;
|
||||
header.insert(sample_block.getByPosition(i));
|
||||
}
|
||||
}
|
||||
|
||||
MutableColumnPtr column = ColumnString::create();
|
||||
|
||||
const auto databases = DatabaseCatalog::instance().getDatabases();
|
||||
for (const auto & [database_name, database] : databases)
|
||||
{
|
||||
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
|
||||
continue;
|
||||
|
||||
/// Lazy database can contain only very primitive tables,
|
||||
/// it cannot contain tables with data skipping indices.
|
||||
/// Skip it to avoid unnecessary tables loading in the Lazy database.
|
||||
if (database->getEngineName() != "Lazy")
|
||||
column->insert(database_name);
|
||||
}
|
||||
|
||||
/// Condition on "database" in a query acts like an index.
|
||||
Block block { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database") };
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block, context);
|
||||
|
||||
ColumnPtr & filtered_databases = block.getByPosition(0).column;
|
||||
return Pipe(std::make_shared<DataSkippingIndicesSource>(
|
||||
std::move(columns_mask), std::move(header), max_block_size, std::move(filtered_databases), context));
|
||||
}
|
||||
|
||||
}
|
30
src/Storages/System/StorageSystemDataSkippingIndices.h
Normal file
30
src/Storages/System/StorageSystemDataSkippingIndices.h
Normal file
@ -0,0 +1,30 @@
|
||||
#pragma once
|
||||
|
||||
#include <common/shared_ptr_helper.h>
|
||||
#include <Storages/IStorage.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// For system.data_skipping_indices table - describes the data skipping indices in tables, similar to system.columns.
|
||||
class StorageSystemDataSkippingIndices : public shared_ptr_helper<StorageSystemDataSkippingIndices>, public IStorage
|
||||
{
|
||||
friend struct shared_ptr_helper<StorageSystemDataSkippingIndices>;
|
||||
public:
|
||||
std::string getName() const override { return "SystemDataSkippingIndices"; }
|
||||
|
||||
Pipe read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
protected:
|
||||
StorageSystemDataSkippingIndices(const StorageID & table_id_);
|
||||
};
|
||||
|
||||
}
|
@ -9,6 +9,7 @@
|
||||
#include <Storages/System/StorageSystemClusters.h>
|
||||
#include <Storages/System/StorageSystemColumns.h>
|
||||
#include <Storages/System/StorageSystemDatabases.h>
|
||||
#include <Storages/System/StorageSystemDataSkippingIndices.h>
|
||||
#include <Storages/System/StorageSystemDataTypeFamilies.h>
|
||||
#include <Storages/System/StorageSystemDetachedParts.h>
|
||||
#include <Storages/System/StorageSystemDictionaries.h>
|
||||
@ -115,6 +116,7 @@ void attachSystemTablesLocal(IDatabase & system_database)
|
||||
attach<StorageSystemUserDirectories>(system_database, "user_directories");
|
||||
attach<StorageSystemPrivileges>(system_database, "privileges");
|
||||
attach<StorageSystemErrors>(system_database, "errors");
|
||||
attach<StorageSystemDataSkippingIndices>(system_database, "data_skipping_indices");
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
attach<StorageSystemLicenses>(system_database, "licenses");
|
||||
attach<StorageSystemTimeZones>(system_database, "time_zones");
|
||||
|
@ -1,62 +0,0 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/tests/gtest_global_context.h>
|
||||
#include <memory>
|
||||
#include <chrono>
|
||||
using namespace std::chrono_literals;
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric BackgroundPoolTask;
|
||||
}
|
||||
|
||||
using namespace DB;
|
||||
|
||||
static std::atomic<Int64> counter{0};
|
||||
|
||||
class TestJobExecutor : public IBackgroundJobExecutor
|
||||
{
|
||||
public:
|
||||
explicit TestJobExecutor(ContextPtr local_context)
|
||||
:IBackgroundJobExecutor(
|
||||
local_context,
|
||||
BackgroundTaskSchedulingSettings{},
|
||||
{PoolConfig{PoolType::MERGE_MUTATE, 4, CurrentMetrics::BackgroundPoolTask}})
|
||||
{}
|
||||
|
||||
protected:
|
||||
String getBackgroundTaskName() const override
|
||||
{
|
||||
return "TestTask";
|
||||
}
|
||||
|
||||
std::optional<JobAndPool> getBackgroundJob() override
|
||||
{
|
||||
return JobAndPool{[] { std::this_thread::sleep_for(1s); counter++; return true; }, PoolType::MERGE_MUTATE};
|
||||
}
|
||||
};
|
||||
|
||||
using TestExecutorPtr = std::unique_ptr<TestJobExecutor>;
|
||||
|
||||
TEST(BackgroundExecutor, TestMetric)
|
||||
{
|
||||
const auto & context_holder = getContext();
|
||||
std::vector<TestExecutorPtr> executors;
|
||||
for (size_t i = 0; i < 100; ++i)
|
||||
executors.emplace_back(std::make_unique<TestJobExecutor>(context_holder.context));
|
||||
|
||||
for (size_t i = 0; i < 100; ++i)
|
||||
executors[i]->start();
|
||||
|
||||
for (size_t i = 0; i < 100; ++i)
|
||||
{
|
||||
EXPECT_TRUE(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load() <= 4);
|
||||
std::this_thread::sleep_for(200ms);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < 100; ++i)
|
||||
executors[i]->finish();
|
||||
|
||||
/// Sanity check
|
||||
EXPECT_TRUE(counter > 50);
|
||||
}
|
@ -158,6 +158,7 @@ SRCS(
|
||||
System/StorageSystemContributors.generated.cpp
|
||||
System/StorageSystemCurrentRoles.cpp
|
||||
System/StorageSystemDDLWorkerQueue.cpp
|
||||
System/StorageSystemDataSkippingIndices.cpp
|
||||
System/StorageSystemDataTypeFamilies.cpp
|
||||
System/StorageSystemDatabases.cpp
|
||||
System/StorageSystemDetachedParts.cpp
|
||||
|
@ -0,0 +1,15 @@
|
||||
<yandex>
|
||||
<profiles>
|
||||
<default>
|
||||
<!-- always send via network -->
|
||||
<prefer_localhost_replica>0</prefer_localhost_replica>
|
||||
<!-- enable batching to check splitting -->
|
||||
<distributed_directory_monitor_batch_inserts>1</distributed_directory_monitor_batch_inserts>
|
||||
<!-- override defaults just in case they will be changed -->
|
||||
<distributed_directory_monitor_split_batch_on_failure>1</distributed_directory_monitor_split_batch_on_failure>
|
||||
<!-- wait for explicit flush -->
|
||||
<distributed_directory_monitor_sleep_time_ms>86400</distributed_directory_monitor_sleep_time_ms>
|
||||
<distributed_directory_monitor_max_sleep_time_ms>86400</distributed_directory_monitor_max_sleep_time_ms>
|
||||
</default>
|
||||
</profiles>
|
||||
</yandex>
|
@ -0,0 +1,15 @@
|
||||
<yandex>
|
||||
<profiles>
|
||||
<default>
|
||||
<!-- always send via network -->
|
||||
<prefer_localhost_replica>0</prefer_localhost_replica>
|
||||
<!-- enable batching to check splitting -->
|
||||
<distributed_directory_monitor_batch_inserts>1</distributed_directory_monitor_batch_inserts>
|
||||
<!-- disable -->
|
||||
<distributed_directory_monitor_split_batch_on_failure>0</distributed_directory_monitor_split_batch_on_failure>
|
||||
<!-- wait for explicit flush -->
|
||||
<distributed_directory_monitor_sleep_time_ms>86400</distributed_directory_monitor_sleep_time_ms>
|
||||
<distributed_directory_monitor_max_sleep_time_ms>86400</distributed_directory_monitor_max_sleep_time_ms>
|
||||
</default>
|
||||
</profiles>
|
||||
</yandex>
|
@ -0,0 +1,18 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<test_cluster>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster>
|
||||
</remote_servers>
|
||||
</yandex>
|
@ -0,0 +1,60 @@
|
||||
import pytest
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
# node1 -- distributed_directory_monitor_split_batch_on_failure=on
|
||||
node1 = cluster.add_instance('node1',
|
||||
main_configs=['configs/remote_servers.xml'],
|
||||
user_configs=['configs/overrides_1.xml'],
|
||||
)
|
||||
# node2 -- distributed_directory_monitor_split_batch_on_failure=off
|
||||
node2 = cluster.add_instance('node2',
|
||||
main_configs=['configs/remote_servers.xml'],
|
||||
user_configs=['configs/overrides_2.xml'],
|
||||
)
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
for _, node in cluster.instances.items():
|
||||
node.query("""
|
||||
create table null_ (key Int, value Int) engine=Null();
|
||||
create table dist as null_ engine=Distributed(test_cluster, currentDatabase(), null_, key);
|
||||
create table data (key Int, uniq_values Int) engine=Memory();
|
||||
create materialized view mv to data as select key, uniqExact(value) uniq_values from null_ group by key;
|
||||
system stop distributed sends dist;
|
||||
|
||||
create table dist_data as data engine=Distributed(test_cluster, currentDatabase(), data);
|
||||
""")
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
def test_distributed_directory_monitor_split_batch_on_failure_OFF(started_cluster):
|
||||
for i in range(0, 100):
|
||||
limit = 100e3
|
||||
node2.query(f'insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}', settings={
|
||||
# max_memory_usage is the limit for the batch on the remote node
|
||||
# (local query should not be affected since 30MB is enough for 100K rows)
|
||||
'max_memory_usage': '30Mi',
|
||||
})
|
||||
# "Received from" is mandatory, since the exception should be thrown on the remote node.
|
||||
with pytest.raises(QueryRuntimeException, match=r'DB::Exception: Received from.*Memory limit \(for query\) exceeded: .*while pushing to view default\.mv'):
|
||||
node2.query('system flush distributed dist')
|
||||
assert int(node2.query('select count() from dist_data')) == 0
|
||||
|
||||
def test_distributed_directory_monitor_split_batch_on_failure_ON(started_cluster):
|
||||
for i in range(0, 100):
|
||||
limit = 100e3
|
||||
node1.query(f'insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}', settings={
|
||||
# max_memory_usage is the limit for the batch on the remote node
|
||||
# (local query should not be affected since 30MB is enough for 100K rows)
|
||||
'max_memory_usage': '30Mi',
|
||||
})
|
||||
node1.query('system flush distributed dist')
|
||||
assert int(node1.query('select count() from dist_data')) == 100000
|
@ -0,0 +1,32 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<insert_distributed_async_send_cluster_two_replicas>
|
||||
<shard>
|
||||
<internal_replication>false</internal_replication>
|
||||
<replica>
|
||||
<host>n3</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>n4</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</insert_distributed_async_send_cluster_two_replicas>
|
||||
<insert_distributed_async_send_cluster_two_shards>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>n3</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>n4</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</insert_distributed_async_send_cluster_two_shards>
|
||||
</remote_servers>
|
||||
</yandex>
|
||||
|
@ -0,0 +1,7 @@
|
||||
<yandex>
|
||||
<profiles>
|
||||
<default>
|
||||
<distributed_directory_monitor_split_batch_on_failure>1</distributed_directory_monitor_split_batch_on_failure>
|
||||
</default>
|
||||
</profiles>
|
||||
</yandex>
|
@ -17,11 +17,29 @@ n1 = cluster.add_instance('n1', main_configs=['configs/remote_servers.xml'], use
|
||||
# n2 -- distributed_directory_monitor_batch_inserts=0
|
||||
n2 = cluster.add_instance('n2', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users.d/no_batch.xml'])
|
||||
|
||||
# n3 -- distributed_directory_monitor_batch_inserts=1/distributed_directory_monitor_split_batch_on_failure=1
|
||||
n3 = cluster.add_instance('n3', main_configs=['configs/remote_servers_split.xml'], user_configs=[
|
||||
'configs/users.d/batch.xml',
|
||||
'configs/users.d/split.xml',
|
||||
])
|
||||
# n4 -- distributed_directory_monitor_batch_inserts=0/distributed_directory_monitor_split_batch_on_failure=1
|
||||
n4 = cluster.add_instance('n4', main_configs=['configs/remote_servers_split.xml'], user_configs=[
|
||||
'configs/users.d/no_batch.xml',
|
||||
'configs/users.d/split.xml',
|
||||
])
|
||||
|
||||
batch_params = pytest.mark.parametrize('batch', [
|
||||
(1),
|
||||
(0),
|
||||
])
|
||||
|
||||
batch_and_split_params = pytest.mark.parametrize('batch,split', [
|
||||
(1, 0),
|
||||
(0, 0),
|
||||
(1, 1),
|
||||
(0, 1),
|
||||
])
|
||||
|
||||
@pytest.fixture(scope='module', autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
@ -62,15 +80,19 @@ def insert_data(node):
|
||||
assert size > 1<<16
|
||||
return size
|
||||
|
||||
def get_node(batch):
|
||||
def get_node(batch, split=None):
|
||||
if split:
|
||||
if batch:
|
||||
return n3
|
||||
return n4
|
||||
if batch:
|
||||
return n1
|
||||
return n2
|
||||
|
||||
def bootstrap(batch):
|
||||
def bootstrap(batch, split=None):
|
||||
drop_tables()
|
||||
create_tables('insert_distributed_async_send_cluster_two_replicas')
|
||||
return insert_data(get_node(batch))
|
||||
return insert_data(get_node(batch, split))
|
||||
|
||||
def get_path_to_dist_batch(file='2.bin'):
|
||||
# There are:
|
||||
@ -80,8 +102,8 @@ def get_path_to_dist_batch(file='2.bin'):
|
||||
# @return the file for the n2 shard
|
||||
return f'/var/lib/clickhouse/data/default/dist/shard1_replica2/{file}'
|
||||
|
||||
def check_dist_after_corruption(truncate, batch):
|
||||
node = get_node(batch)
|
||||
def check_dist_after_corruption(truncate, batch, split=None):
|
||||
node = get_node(batch, split)
|
||||
|
||||
if batch:
|
||||
# In batch mode errors are ignored
|
||||
@ -102,8 +124,12 @@ def check_dist_after_corruption(truncate, batch):
|
||||
broken = get_path_to_dist_batch('broken')
|
||||
node.exec_in_container(['bash', '-c', f'ls {broken}/2.bin'])
|
||||
|
||||
assert int(n1.query('SELECT count() FROM data')) == 10000
|
||||
assert int(n2.query('SELECT count() FROM data')) == 0
|
||||
if split:
|
||||
assert int(n3.query('SELECT count() FROM data')) == 10000
|
||||
assert int(n4.query('SELECT count() FROM data')) == 0
|
||||
else:
|
||||
assert int(n1.query('SELECT count() FROM data')) == 10000
|
||||
assert int(n2.query('SELECT count() FROM data')) == 0
|
||||
|
||||
|
||||
@batch_params
|
||||
@ -114,17 +140,17 @@ def test_insert_distributed_async_send_success(batch):
|
||||
assert int(n1.query('SELECT count() FROM data')) == 10000
|
||||
assert int(n2.query('SELECT count() FROM data')) == 10000
|
||||
|
||||
@batch_params
|
||||
def test_insert_distributed_async_send_truncated_1(batch):
|
||||
size = bootstrap(batch)
|
||||
@batch_and_split_params
|
||||
def test_insert_distributed_async_send_truncated_1(batch, split):
|
||||
size = bootstrap(batch, split)
|
||||
path = get_path_to_dist_batch()
|
||||
node = get_node(batch)
|
||||
node = get_node(batch, split)
|
||||
|
||||
new_size = size - 10
|
||||
# we cannot use truncate, due to hardlinks
|
||||
node.exec_in_container(['bash', '-c', f'mv {path} /tmp/bin && head -c {new_size} /tmp/bin > {path}'])
|
||||
|
||||
check_dist_after_corruption(True, batch)
|
||||
check_dist_after_corruption(True, batch, split)
|
||||
|
||||
@batch_params
|
||||
def test_insert_distributed_async_send_truncated_2(batch):
|
||||
|
@ -307,7 +307,7 @@ def test_postgres_distributed(started_cluster):
|
||||
started_cluster.unpause_container('postgres1')
|
||||
assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n')
|
||||
|
||||
|
||||
|
||||
def test_datetime_with_timezone(started_cluster):
|
||||
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
|
||||
cursor = conn.cursor()
|
||||
@ -323,6 +323,22 @@ def test_datetime_with_timezone(started_cluster):
|
||||
assert(node1.query("select * from test_timezone") == "2014-04-04 20:00:00\t2014-04-04 16:00:00\n")
|
||||
|
||||
|
||||
def test_postgres_ndim(started_cluster):
|
||||
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('CREATE TABLE arr1 (a Integer[])')
|
||||
cursor.execute("INSERT INTO arr1 SELECT '{{1}, {2}}'")
|
||||
|
||||
# The point is in creating a table via 'as select *', in postgres att_ndim will not be correct in this case.
|
||||
cursor.execute('CREATE TABLE arr2 AS SELECT * FROM arr1')
|
||||
cursor.execute("SELECT attndims AS dims FROM pg_attribute WHERE attrelid = 'arr2'::regclass; ")
|
||||
result = cursor.fetchall()[0]
|
||||
assert(int(result[0]) == 0)
|
||||
|
||||
result = node1.query('''SELECT toTypeName(a) FROM postgresql('postgres1:5432', 'clickhouse', 'arr2', 'postgres', 'mysecretpassword')''')
|
||||
assert(result.strip() == "Array(Array(Nullable(Int32)))")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
@ -79,7 +79,6 @@ def test_upgrade_while_mutation(start_cluster):
|
||||
|
||||
node3.restart_with_latest_version(signal=9)
|
||||
|
||||
# wait replica became active
|
||||
exec_query_with_retry(node3, "SYSTEM RESTART REPLICA mt1")
|
||||
|
||||
node3.query("ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"})
|
||||
|
@ -1,13 +0,0 @@
|
||||
<test>
|
||||
|
||||
|
||||
|
||||
<preconditions>
|
||||
<table_exists>trips_mergetree</table_exists>
|
||||
</preconditions>
|
||||
|
||||
<query>SELECT cab_type, count(*) FROM trips_mergetree GROUP BY cab_type</query>
|
||||
<query>SELECT passenger_count, avg(total_amount) FROM trips_mergetree GROUP BY passenger_count</query>
|
||||
<query>SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree GROUP BY passenger_count, year</query>
|
||||
<query>SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree GROUP BY passenger_count, year, distance ORDER BY year, count(*) DESC</query>
|
||||
</test>
|
@ -1,6 +1,7 @@
|
||||
<test>
|
||||
<preconditions>
|
||||
<table_exists>hits_100m_single</table_exists>
|
||||
<table_exists>hits_10m_single</table_exists>
|
||||
</preconditions>
|
||||
|
||||
<substitutions>
|
||||
@ -10,9 +11,17 @@
|
||||
<value>SearchEngineID</value>
|
||||
<value>RegionID</value>
|
||||
<value>SearchPhrase</value>
|
||||
</values>
|
||||
</substitution>
|
||||
|
||||
<!-- For some keys, the query is slower, so we choose smaller dataset. -->
|
||||
<substitution>
|
||||
<name>key_slow</name>
|
||||
<values>
|
||||
<value>ClientIP</value>
|
||||
</values>
|
||||
</substitution>
|
||||
|
||||
<substitution>
|
||||
<name>func</name>
|
||||
<values>
|
||||
@ -26,4 +35,5 @@
|
||||
</substitutions>
|
||||
|
||||
<query>SELECT {key} AS k, {func}(ResolutionWidth) FROM hits_100m_single GROUP BY k FORMAT Null</query>
|
||||
<query>SELECT {key_slow} AS k, {func}(ResolutionWidth) FROM hits_10m_single GROUP BY k FORMAT Null</query>
|
||||
</test>
|
||||
|
@ -1,6 +1,7 @@
|
||||
<test>
|
||||
<preconditions>
|
||||
<table_exists>hits_100m_single</table_exists>
|
||||
<table_exists>hits_10m_single</table_exists>
|
||||
<ram_size>30000000000</ram_size>
|
||||
</preconditions>
|
||||
|
||||
@ -58,4 +59,5 @@
|
||||
</substitutions>
|
||||
|
||||
<query>SELECT {key} AS k, {func}(UserID) FROM hits_100m_single GROUP BY k FORMAT Null</query>
|
||||
<query>SELECT {key} AS k, uniqTheta(UserID) FROM hits_10m_single GROUP BY k FORMAT Null</query>
|
||||
</test>
|
||||
|
@ -44,6 +44,7 @@ LIFETIME(0);
|
||||
SELECT * FROM 01913_db.test_dictionary;
|
||||
|
||||
DROP DICTIONARY 01913_db.test_dictionary;
|
||||
|
||||
DROP TABLE 01913_db.test_source_table_1;
|
||||
DROP TABLE 01913_db.test_source_table_2;
|
||||
|
||||
|
@ -1,34 +1,39 @@
|
||||
DROP TABLE IF EXISTS table_1;
|
||||
CREATE TABLE table_1 (id UInt64, value String) ENGINE=TinyLog;
|
||||
DROP DATABASE IF EXISTS 01914_db;
|
||||
CREATE DATABASE 01914_db ENGINE=Atomic;
|
||||
|
||||
DROP TABLE IF EXISTS table_2;
|
||||
CREATE TABLE table_2 (id UInt64, value String) ENGINE=TinyLog;
|
||||
DROP TABLE IF EXISTS 01914_db.table_1;
|
||||
CREATE TABLE 01914_db.table_1 (id UInt64, value String) ENGINE=TinyLog;
|
||||
|
||||
INSERT INTO table_1 VALUES (1, 'Table1');
|
||||
INSERT INTO table_2 VALUES (2, 'Table2');
|
||||
DROP TABLE IF EXISTS 01914_db.table_2;
|
||||
CREATE TABLE 01914_db.table_2 (id UInt64, value String) ENGINE=TinyLog;
|
||||
|
||||
DROP DICTIONARY IF EXISTS dictionary_1;
|
||||
CREATE DICTIONARY dictionary_1 (id UInt64, value String)
|
||||
INSERT INTO 01914_db.table_1 VALUES (1, 'Table1');
|
||||
INSERT INTO 01914_db.table_2 VALUES (2, 'Table2');
|
||||
|
||||
DROP DICTIONARY IF EXISTS 01914_db.dictionary_1;
|
||||
CREATE DICTIONARY 01914_db.dictionary_1 (id UInt64, value String)
|
||||
PRIMARY KEY id
|
||||
LAYOUT(DIRECT())
|
||||
SOURCE(CLICKHOUSE(TABLE 'table_1'));
|
||||
SOURCE(CLICKHOUSE(DB '01914_db' TABLE 'table_1'));
|
||||
|
||||
DROP DICTIONARY IF EXISTS dictionary_2;
|
||||
CREATE DICTIONARY dictionary_2 (id UInt64, value String)
|
||||
DROP DICTIONARY IF EXISTS 01914_db.dictionary_2;
|
||||
CREATE DICTIONARY 01914_db.dictionary_2 (id UInt64, value String)
|
||||
PRIMARY KEY id
|
||||
LAYOUT(DIRECT())
|
||||
SOURCE(CLICKHOUSE(TABLE 'table_2'));
|
||||
SOURCE(CLICKHOUSE(DB '01914_db' TABLE 'table_2'));
|
||||
|
||||
SELECT * FROM dictionary_1;
|
||||
SELECT * FROM dictionary_2;
|
||||
SELECT * FROM 01914_db.dictionary_1;
|
||||
SELECT * FROM 01914_db.dictionary_2;
|
||||
|
||||
EXCHANGE DICTIONARIES dictionary_1 AND dictionary_2;
|
||||
EXCHANGE DICTIONARIES 01914_db.dictionary_1 AND 01914_db.dictionary_2;
|
||||
|
||||
SELECT * FROM dictionary_1;
|
||||
SELECT * FROM dictionary_2;
|
||||
SELECT * FROM 01914_db.dictionary_1;
|
||||
SELECT * FROM 01914_db.dictionary_2;
|
||||
|
||||
DROP DICTIONARY dictionary_1;
|
||||
DROP DICTIONARY dictionary_2;
|
||||
DROP DICTIONARY 01914_db.dictionary_1;
|
||||
DROP DICTIONARY 01914_db.dictionary_2;
|
||||
|
||||
DROP TABLE table_1;
|
||||
DROP TABLE table_2;
|
||||
DROP TABLE 01914_db.table_1;
|
||||
DROP TABLE 01914_db.table_2;
|
||||
|
||||
DROP DATABASE 01914_db;
|
||||
|
@ -44,6 +44,7 @@ LIFETIME(0);
|
||||
SELECT * FROM 01915_db.test_dictionary;
|
||||
|
||||
DROP DICTIONARY 01915_db.test_dictionary;
|
||||
|
||||
DROP TABLE 01915_db.test_source_table_1;
|
||||
DROP TABLE 01915_db.test_source_table_2;
|
||||
|
||||
|
@ -0,0 +1 @@
|
||||
111
|
16
tests/queries/0_stateless/01917_prewhere_column_type.sql
Normal file
16
tests/queries/0_stateless/01917_prewhere_column_type.sql
Normal file
@ -0,0 +1,16 @@
|
||||
DROP TABLE IF EXISTS t1;
|
||||
|
||||
CREATE TABLE t1 ( s String, f Float32, e UInt16 ) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part = '100G';
|
||||
|
||||
INSERT INTO t1 VALUES ('111', 1, 1);
|
||||
|
||||
SELECT s FROM t1 WHERE f AND (e = 1); -- { serverError 59 }
|
||||
SELECT s FROM t1 PREWHERE f; -- { serverError 59 }
|
||||
SELECT s FROM t1 PREWHERE f WHERE (e = 1); -- { serverError 59 }
|
||||
SELECT s FROM t1 PREWHERE f WHERE f AND (e = 1); -- { serverError 59 }
|
||||
|
||||
SELECT s FROM t1 WHERE e AND (e = 1);
|
||||
SELECT s FROM t1 PREWHERE e; -- { serverError 59 }
|
||||
SELECT s FROM t1 PREWHERE e WHERE (e = 1); -- { serverError 59 }
|
||||
SELECT s FROM t1 PREWHERE e WHERE f AND (e = 1); -- { serverError 59 }
|
||||
|
@ -0,0 +1,10 @@
|
||||
default data_01917 d1_idx minmax d1 1
|
||||
default data_01917 d1_null_idx minmax assumeNotNull(d1_null) 1
|
||||
default data_01917_2 memory set frequency * length(name) 5
|
||||
default data_01917_2 sample_index1 minmax length(name), name 4
|
||||
default data_01917_2 sample_index2 ngrambf_v1 lower(name), name 4
|
||||
2
|
||||
3
|
||||
d1_idx
|
||||
d1_null_idx
|
||||
sample_index1
|
@ -0,0 +1,35 @@
|
||||
DROP TABLE IF EXISTS data_01917;
|
||||
DROP TABLE IF EXISTS data_01917_2;
|
||||
|
||||
CREATE TABLE data_01917
|
||||
(
|
||||
key Int,
|
||||
d1 Int,
|
||||
d1_null Nullable(Int),
|
||||
INDEX d1_idx d1 TYPE minmax GRANULARITY 1,
|
||||
INDEX d1_null_idx assumeNotNull(d1_null) TYPE minmax GRANULARITY 1
|
||||
)
|
||||
Engine=MergeTree()
|
||||
ORDER BY key;
|
||||
|
||||
CREATE TABLE data_01917_2
|
||||
(
|
||||
name String,
|
||||
frequency UInt64,
|
||||
INDEX memory (frequency * length(name)) TYPE set(1000) GRANULARITY 5,
|
||||
INDEX sample_index1 (length(name), name) TYPE minmax GRANULARITY 4,
|
||||
INDEX sample_index2 (lower(name), name) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4
|
||||
)
|
||||
Engine=MergeTree()
|
||||
ORDER BY name;
|
||||
|
||||
SELECT * FROM system.data_skipping_indices WHERE database = currentDatabase();
|
||||
|
||||
SELECT count(*) FROM system.data_skipping_indices WHERE table = 'data_01917' AND database = currentDatabase();
|
||||
SELECT count(*) FROM system.data_skipping_indices WHERE table = 'data_01917_2' AND database = currentDatabase();
|
||||
|
||||
SELECT name FROM system.data_skipping_indices WHERE type = 'minmax' AND database = currentDatabase();
|
||||
|
||||
DROP TABLE data_01917;
|
||||
DROP TABLE data_01917_2;
|
||||
|
@ -0,0 +1,2 @@
|
||||
x_1 10
|
||||
x_2 10
|
13
tests/queries/0_stateless/01925_merge_prewhere_table.sql
Normal file
13
tests/queries/0_stateless/01925_merge_prewhere_table.sql
Normal file
@ -0,0 +1,13 @@
|
||||
DROP TABLE IF EXISTS x_1;
|
||||
DROP TABLE IF EXISTS x_2;
|
||||
DROP TABLE IF EXISTS x;
|
||||
|
||||
create table x_1 engine=Log as select * from numbers(10);
|
||||
create table x_2 engine=Log as select * from numbers(10);
|
||||
create table x engine=Merge(currentDatabase(), '^x_(1|2)$') as x_1;
|
||||
|
||||
select _table, count() from x group by _table order by _table;
|
||||
|
||||
DROP TABLE x_1;
|
||||
DROP TABLE x_2;
|
||||
DROP TABLE x;
|
@ -249,3 +249,5 @@
|
||||
01824_prefer_global_in_and_join
|
||||
01576_alias_column_rewrite
|
||||
01924_argmax_bitmap_state
|
||||
01914_exchange_dictionaries
|
||||
01923_different_expression_name_alias
|
||||
|
@ -112,7 +112,8 @@
|
||||
"00738_lock_for_inner_table",
|
||||
"01153_attach_mv_uuid",
|
||||
/// Sometimes cannot lock file most likely due to concurrent or adjacent tests, but we don't care how it works in Ordinary database.
|
||||
"rocksdb"
|
||||
"rocksdb",
|
||||
"01914_exchange_dictionaries" /// Requires Atomic database
|
||||
],
|
||||
"database-replicated": [
|
||||
/// Unclassified
|
||||
@ -519,7 +520,8 @@
|
||||
"01924_argmax_bitmap_state",
|
||||
"01913_replace_dictionary",
|
||||
"01914_exchange_dictionaries",
|
||||
"01915_create_or_replace_dictionary"
|
||||
"01915_create_or_replace_dictionary",
|
||||
"01913_names_of_tuple_literal"
|
||||
],
|
||||
"parallel":
|
||||
[
|
||||
@ -846,6 +848,9 @@
|
||||
"01870_buffer_flush", // creates database
|
||||
"01889_postgresql_protocol_null_fields",
|
||||
"01889_check_row_policy_defined_using_user_function",
|
||||
"01921_concurrent_ttl_and_normal_merges_zookeeper_long" // heavy test, better to run sequentially
|
||||
"01921_concurrent_ttl_and_normal_merges_zookeeper_long", // heavy test, better to run sequentially
|
||||
"01913_replace_dictionary",
|
||||
"01914_exchange_dictionaries",
|
||||
"01915_create_or_replace_dictionary"
|
||||
]
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user