mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge branch 'master' of github.com:ClickHouse/ClickHouse into divanik/generate_series_function
This commit is contained in:
commit
c5d35bb187
1
.github/workflows/pull_request.yml
vendored
1
.github/workflows/pull_request.yml
vendored
@ -172,6 +172,7 @@ jobs:
|
||||
run: |
|
||||
cd "$GITHUB_WORKSPACE/tests/ci"
|
||||
python3 finish_check.py
|
||||
python3 merge_pr.py --check-approved
|
||||
|
||||
|
||||
#############################################################################################
|
||||
|
3
.github/workflows/reusable_build.yml
vendored
3
.github/workflows/reusable_build.yml
vendored
@ -43,8 +43,7 @@ jobs:
|
||||
runs-on: [self-hosted, '${{inputs.runner_type}}']
|
||||
steps:
|
||||
- name: Check out repository code
|
||||
# WIP: temporary try commit with limited perallelization of checkout
|
||||
uses: ClickHouse/checkout@0be3f7b3098bae494d3ef5d29d2e0676fb606232
|
||||
uses: ClickHouse/checkout@v1
|
||||
with:
|
||||
clear-repository: true
|
||||
ref: ${{ fromJson(inputs.data).git_ref }}
|
||||
|
@ -110,11 +110,6 @@ endif()
|
||||
# - sanitize.cmake
|
||||
add_library(global-libs INTERFACE)
|
||||
|
||||
# We don't want to instrument everything with fuzzer, but only specific targets (see below),
|
||||
# also, since we build our own llvm, we specifically don't want to instrument
|
||||
# libFuzzer library itself - it would result in infinite recursion
|
||||
#include (cmake/fuzzer.cmake)
|
||||
|
||||
include (cmake/sanitize.cmake)
|
||||
|
||||
option(ENABLE_COLORED_BUILD "Enable colors in compiler output" ON)
|
||||
@ -554,7 +549,9 @@ if (ENABLE_RUST)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
if (CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO" AND NOT SANITIZE AND NOT SANITIZE_COVERAGE AND OS_LINUX AND (ARCH_AMD64 OR ARCH_AARCH64))
|
||||
if (CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO"
|
||||
AND NOT SANITIZE AND NOT SANITIZE_COVERAGE AND NOT ENABLE_FUZZING
|
||||
AND OS_LINUX AND (ARCH_AMD64 OR ARCH_AARCH64))
|
||||
set(CHECK_LARGE_OBJECT_SIZES_DEFAULT ON)
|
||||
else ()
|
||||
set(CHECK_LARGE_OBJECT_SIZES_DEFAULT OFF)
|
||||
@ -577,10 +574,7 @@ if (FUZZER)
|
||||
if (NOT(target_type STREQUAL "INTERFACE_LIBRARY" OR target_type STREQUAL "UTILITY"))
|
||||
target_compile_options(${target} PRIVATE "-fsanitize=fuzzer-no-link")
|
||||
endif()
|
||||
# clickhouse fuzzer isn't working correctly
|
||||
# initial PR https://github.com/ClickHouse/ClickHouse/pull/27526
|
||||
#if (target MATCHES ".+_fuzzer" OR target STREQUAL "clickhouse")
|
||||
if (target_type STREQUAL "EXECUTABLE" AND target MATCHES ".+_fuzzer")
|
||||
if (target_type STREQUAL "EXECUTABLE" AND (target MATCHES ".+_fuzzer" OR target STREQUAL "clickhouse"))
|
||||
message(STATUS "${target} instrumented with fuzzer")
|
||||
target_link_libraries(${target} PUBLIC ch_contrib::fuzzer)
|
||||
# Add to fuzzers bundle
|
||||
|
@ -1,17 +0,0 @@
|
||||
# see ./CMakeLists.txt for variable declaration
|
||||
if (FUZZER)
|
||||
if (FUZZER STREQUAL "libfuzzer")
|
||||
# NOTE: Eldar Zaitov decided to name it "libfuzzer" instead of "fuzzer" to keep in mind another possible fuzzer backends.
|
||||
# NOTE: no-link means that all the targets are built with instrumentation for fuzzer, but only some of them
|
||||
# (tests) have entry point for fuzzer and it's not checked.
|
||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=fuzzer-no-link -DFUZZER=1")
|
||||
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=fuzzer-no-link -DFUZZER=1")
|
||||
|
||||
# NOTE: oss-fuzz can change LIB_FUZZING_ENGINE variable
|
||||
if (NOT LIB_FUZZING_ENGINE)
|
||||
set (LIB_FUZZING_ENGINE "-fsanitize=fuzzer")
|
||||
endif ()
|
||||
else ()
|
||||
message (FATAL_ERROR "Unknown fuzzer type: ${FUZZER}")
|
||||
endif ()
|
||||
endif()
|
@ -21,3 +21,79 @@ When restarting a server, data disappears from the table and the table becomes e
|
||||
Normally, using this table engine is not justified. However, it can be used for tests, and for tasks where maximum speed is required on a relatively small number of rows (up to approximately 100,000,000).
|
||||
|
||||
The Memory engine is used by the system for temporary tables with external query data (see the section “External data for processing a query”), and for implementing `GLOBAL IN` (see the section “IN operators”).
|
||||
|
||||
Upper and lower bounds can be specified to limit Memory engine table size, effectively allowing it to act as a circular buffer (see [Engine Parameters](#engine-parameters)).
|
||||
|
||||
## Engine Parameters {#engine-parameters}
|
||||
|
||||
- `min_bytes_to_keep` — Minimum bytes to keep when memory table is size-capped.
|
||||
- Default value: `0`
|
||||
- Requires `max_bytes_to_keep`
|
||||
- `max_bytes_to_keep` — Maximum bytes to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max bytes can exceed the stated limit if the oldest batch of rows to remove falls under the `min_bytes_to_keep` limit when adding a large block.
|
||||
- Default value: `0`
|
||||
- `min_rows_to_keep` — Minimum rows to keep when memory table is size-capped.
|
||||
- Default value: `0`
|
||||
- Requires `max_rows_to_keep`
|
||||
- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block.
|
||||
- Default value: `0`
|
||||
|
||||
## Usage {#usage}
|
||||
|
||||
|
||||
**Initialize settings**
|
||||
``` sql
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000;
|
||||
```
|
||||
|
||||
**Note:** Both `bytes` and `rows` capping parameters can be set at the same time, however, the lower bounds of `max` and `min` will be adhered to.
|
||||
|
||||
## Examples {#examples}
|
||||
``` sql
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
|
||||
|
||||
/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 8'192 bytes
|
||||
|
||||
/* 2. adding block that doesn't get deleted */
|
||||
INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 1'024 bytes
|
||||
|
||||
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 8'192 bytes
|
||||
|
||||
/* 4. checking a very large block overrides all */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 65'536 bytes
|
||||
|
||||
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─total_bytes─┬─total_rows─┐
|
||||
│ 65536 │ 10000 │
|
||||
└─────────────┴────────────┘
|
||||
```
|
||||
|
||||
also, for rows:
|
||||
|
||||
``` sql
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 4000, max_rows_to_keep = 10000;
|
||||
|
||||
/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 1'600 rows
|
||||
|
||||
/* 2. adding block that doesn't get deleted */
|
||||
INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 100 rows
|
||||
|
||||
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 1'000 rows
|
||||
|
||||
/* 4. checking a very large block overrides all */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 10'000 rows
|
||||
|
||||
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─total_bytes─┬─total_rows─┐
|
||||
│ 65536 │ 10000 │
|
||||
└─────────────┴────────────┘
|
||||
```
|
||||
|
@ -55,7 +55,7 @@ CREATE TABLE criteo_log (
|
||||
) ENGINE = Log;
|
||||
```
|
||||
|
||||
Download the data:
|
||||
Insert the data:
|
||||
|
||||
``` bash
|
||||
$ for i in {00..23}; do echo $i; zcat datasets/criteo/day_${i#0}.gz | sed -r 's/^/2000-01-'${i/00/24}'\t/' | clickhouse-client --host=example-perftest01j --query="INSERT INTO criteo_log FORMAT TabSeparated"; done
|
||||
|
@ -95,9 +95,11 @@ which is equal to
|
||||
|
||||
## Substituting Configuration {#substitution}
|
||||
|
||||
The config can also define “substitutions”. If an element has the `incl` attribute, the corresponding substitution from the file will be used as the value. By default, the path to the file with substitutions is `/etc/metrika.xml`. This can be changed in the [include_from](../operations/server-configuration-parameters/settings.md#server_configuration_parameters-include_from) element in the server config. The substitution values are specified in `/clickhouse/substitution_name` elements in this file. If a substitution specified in `incl` does not exist, it is recorded in the log. To prevent ClickHouse from logging missing substitutions, specify the `optional="true"` attribute (for example, settings for [macros](../operations/server-configuration-parameters/settings.md#macros)).
|
||||
The config can define substitutions. There are two types of substitutions:
|
||||
|
||||
If you want to replace an entire element with a substitution use `include` as the element name.
|
||||
- If an element has the `incl` attribute, the corresponding substitution from the file will be used as the value. By default, the path to the file with substitutions is `/etc/metrika.xml`. This can be changed in the [include_from](../operations/server-configuration-parameters/settings.md#server_configuration_parameters-include_from) element in the server config. The substitution values are specified in `/clickhouse/substitution_name` elements in this file. If a substitution specified in `incl` does not exist, it is recorded in the log. To prevent ClickHouse from logging missing substitutions, specify the `optional="true"` attribute (for example, settings for [macros](../operations/server-configuration-parameters/settings.md#macros)).
|
||||
|
||||
- If you want to replace an entire element with a substitution, use `include` as the element name. Substitutions can also be performed from ZooKeeper by specifying attribute `from_zk = "/path/to/node"`. In this case, the element value is replaced with the contents of the Zookeeper node at `/path/to/node`. This also works with you store an entire XML subtree as a Zookeeper node, it will be fully inserted into the source element.
|
||||
|
||||
XML substitution example:
|
||||
|
||||
@ -114,7 +116,7 @@ XML substitution example:
|
||||
</clickhouse>
|
||||
```
|
||||
|
||||
Substitutions can also be performed from ZooKeeper. To do this, specify the attribute `from_zk = "/path/to/node"`. The element value is replaced with the contents of the node at `/path/to/node` in ZooKeeper. You can also put an entire XML subtree on the ZooKeeper node, and it will be fully inserted into the source element.
|
||||
If you want to merge the substituting content with the existing configuration instead of appending you can use attribute `merge="true"`, for example: `<include from_zk="/some_path" merge="true">`. In this case, the existing configuration will be merged with the content from the substitution and the existing configuration settings will be replaced with values from substitution.
|
||||
|
||||
## Encrypting and Hiding Configuration {#encryption}
|
||||
|
||||
|
@ -933,9 +933,9 @@ Hard limit is configured via system tools
|
||||
|
||||
## database_atomic_delay_before_drop_table_sec {#database_atomic_delay_before_drop_table_sec}
|
||||
|
||||
Sets the delay before remove table data in seconds. If the query has `SYNC` modifier, this setting is ignored.
|
||||
The delay before a table data is dropped in seconds. If the `DROP TABLE` query has a `SYNC` modifier, this setting is ignored.
|
||||
|
||||
Default value: `480` (8 minute).
|
||||
Default value: `480` (8 minutes).
|
||||
|
||||
## database_catalog_unused_dir_hide_timeout_sec {#database_catalog_unused_dir_hide_timeout_sec}
|
||||
|
||||
|
@ -4337,6 +4337,18 @@ Possible values:
|
||||
|
||||
Default value: `0`.
|
||||
|
||||
|
||||
## function_locate_has_mysql_compatible_argument_order {#function-locate-has-mysql-compatible-argument-order}
|
||||
|
||||
Controls the order of arguments in function [locate](../../sql-reference/functions/string-search-functions.md#locate).
|
||||
|
||||
Possible values:
|
||||
|
||||
- 0 — Function `locate` accepts arguments `(haystack, needle[, start_pos])`.
|
||||
- 1 — Function `locate` accepts arguments `(needle, haystack, [, start_pos])` (MySQL-compatible behavior)
|
||||
|
||||
Default value: `1`.
|
||||
|
||||
## date_time_overflow_behavior {#date_time_overflow_behavior}
|
||||
|
||||
Defines the behavior when [Date](../../sql-reference/data-types/date.md), [Date32](../../sql-reference/data-types/date32.md), [DateTime](../../sql-reference/data-types/datetime.md), [DateTime64](../../sql-reference/data-types/datetime64.md) or integers are converted into Date, Date32, DateTime or DateTime64 but the value cannot be represented in the result type.
|
||||
|
@ -26,7 +26,9 @@ priority: 0
|
||||
is_active: 0
|
||||
active_children: 0
|
||||
dequeued_requests: 67
|
||||
canceled_requests: 0
|
||||
dequeued_cost: 4692272
|
||||
canceled_cost: 0
|
||||
busy_periods: 63
|
||||
vruntime: 938454.1999999989
|
||||
system_vruntime: ᴺᵁᴸᴸ
|
||||
@ -54,7 +56,9 @@ Columns:
|
||||
- `is_active` (`UInt8`) - Whether this node is currently active - has resource requests to be dequeued and constraints satisfied.
|
||||
- `active_children` (`UInt64`) - The number of children in active state.
|
||||
- `dequeued_requests` (`UInt64`) - The total number of resource requests dequeued from this node.
|
||||
- `canceled_requests` (`UInt64`) - The total number of resource requests canceled from this node.
|
||||
- `dequeued_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests dequeued from this node.
|
||||
- `canceled_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests canceled from this node.
|
||||
- `busy_periods` (`UInt64`) - The total number of deactivations of this node.
|
||||
- `vruntime` (`Nullable(Float64)`) - For children of `fair` nodes only. Virtual runtime of a node used by SFQ algorithm to select the next child to process in a max-min fair manner.
|
||||
- `system_vruntime` (`Nullable(Float64)`) - For `fair` nodes only. Virtual runtime showing `vruntime` of the last processed resource request. Used during child activation as the new value of `vruntime`.
|
||||
|
@ -36,9 +36,9 @@ You can explicitly set a time zone for `DateTime`-type columns when creating a t
|
||||
|
||||
The [clickhouse-client](../../interfaces/cli.md) applies the server time zone by default if a time zone isn’t explicitly set when initializing the data type. To use the client time zone, run `clickhouse-client` with the `--use_client_time_zone` parameter.
|
||||
|
||||
ClickHouse outputs values depending on the value of the [date_time_output_format](../../operations/settings/settings-formats.md#date_time_output_format) setting. `YYYY-MM-DD hh:mm:ss` text format by default. Additionally, you can change the output with the [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime) function.
|
||||
ClickHouse outputs values depending on the value of the [date_time_output_format](../../operations/settings/settings.md#settings-date_time_output_format) setting. `YYYY-MM-DD hh:mm:ss` text format by default. Additionally, you can change the output with the [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime) function.
|
||||
|
||||
When inserting data into ClickHouse, you can use different formats of date and time strings, depending on the value of the [date_time_input_format](../../operations/settings/settings-formats.md#date_time_input_format) setting.
|
||||
When inserting data into ClickHouse, you can use different formats of date and time strings, depending on the value of the [date_time_input_format](../../operations/settings/settings.md#settings-date_time_input_format) setting.
|
||||
|
||||
## Examples
|
||||
|
||||
@ -147,8 +147,8 @@ Time shifts for multiple days. Some pacific islands changed their timezone offse
|
||||
- [Type conversion functions](../../sql-reference/functions/type-conversion-functions.md)
|
||||
- [Functions for working with dates and times](../../sql-reference/functions/date-time-functions.md)
|
||||
- [Functions for working with arrays](../../sql-reference/functions/array-functions.md)
|
||||
- [The `date_time_input_format` setting](../../operations/settings/settings-formats.md#date_time_input_format)
|
||||
- [The `date_time_output_format` setting](../../operations/settings/settings-formats.md#date_time_output_format)
|
||||
- [The `date_time_input_format` setting](../../operations/settings/settings-formats.md#settings-date_time_input_format)
|
||||
- [The `date_time_output_format` setting](../../operations/settings/settings-formats.md#settings-date_time_output_format)
|
||||
- [The `timezone` server configuration parameter](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone)
|
||||
- [The `session_timezone` setting](../../operations/settings/settings.md#session_timezone)
|
||||
- [Operators for working with dates and times](../../sql-reference/operators/index.md#operators-datetime)
|
||||
|
@ -30,7 +30,6 @@ position(haystack, needle[, start_pos])
|
||||
|
||||
Alias:
|
||||
- `position(needle IN haystack)`
|
||||
- `locate(haystack, needle[, start_pos])`.
|
||||
|
||||
**Arguments**
|
||||
|
||||
@ -49,7 +48,7 @@ If substring `needle` is empty, these rules apply:
|
||||
- if `start_pos >= 1` and `start_pos <= length(haystack) + 1`: return `start_pos`
|
||||
- otherwise: return `0`
|
||||
|
||||
The same rules also apply to functions `positionCaseInsensitive`, `positionUTF8` and `positionCaseInsensitiveUTF8`
|
||||
The same rules also apply to functions `locate`, `positionCaseInsensitive`, `positionUTF8` and `positionCaseInsensitiveUTF8`.
|
||||
|
||||
Type: `Integer`.
|
||||
|
||||
@ -114,6 +113,21 @@ SELECT
|
||||
└─────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┘
|
||||
```
|
||||
|
||||
## locate
|
||||
|
||||
Like [position](#position) but with arguments `haystack` and `locate` switched.
|
||||
|
||||
The behavior of this function depends on the ClickHouse version:
|
||||
- in versions < v24.3, `locate` was an alias of function `position` and accepted arguments `(haystack, needle[, start_pos])`.
|
||||
- in versions >= 24.3,, `locate` is an individual function (for better compatibility with MySQL) and accepts arguments `(needle, haystack[, start_pos])`. The previous behavior
|
||||
can be restored using setting [function_locate_has_mysql_compatible_argument_order = false](../../operations/settings/settings.md#function-locate-has-mysql-compatible-argument-order);
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
locate(needle, haystack[, start_pos])
|
||||
```
|
||||
|
||||
## positionCaseInsensitive
|
||||
|
||||
Like [position](#position) but searches case-insensitively.
|
||||
|
@ -13,13 +13,6 @@ a system table called `system.dropped_tables`.
|
||||
|
||||
If you have a materialized view without a `TO` clause associated with the dropped table, then you will also have to UNDROP the inner table of that view.
|
||||
|
||||
:::note
|
||||
UNDROP TABLE is experimental. To use it add this setting:
|
||||
```sql
|
||||
set allow_experimental_undrop_table_query = 1;
|
||||
```
|
||||
:::
|
||||
|
||||
:::tip
|
||||
Also see [DROP TABLE](/docs/en/sql-reference/statements/drop.md)
|
||||
:::
|
||||
@ -32,60 +25,53 @@ UNDROP TABLE [db.]name [UUID '<uuid>'] [ON CLUSTER cluster]
|
||||
|
||||
**Example**
|
||||
|
||||
``` sql
|
||||
set allow_experimental_undrop_table_query = 1;
|
||||
```
|
||||
|
||||
```sql
|
||||
CREATE TABLE undropMe
|
||||
CREATE TABLE tab
|
||||
(
|
||||
`id` UInt8
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY id
|
||||
```
|
||||
ORDER BY id;
|
||||
|
||||
DROP TABLE tab;
|
||||
|
||||
```sql
|
||||
DROP TABLE undropMe
|
||||
```
|
||||
```sql
|
||||
SELECT *
|
||||
FROM system.dropped_tables
|
||||
FORMAT Vertical
|
||||
FORMAT Vertical;
|
||||
```
|
||||
|
||||
```response
|
||||
Row 1:
|
||||
──────
|
||||
index: 0
|
||||
database: default
|
||||
table: undropMe
|
||||
table: tab
|
||||
uuid: aa696a1a-1d70-4e60-a841-4c80827706cc
|
||||
engine: MergeTree
|
||||
metadata_dropped_path: /var/lib/clickhouse/metadata_dropped/default.undropMe.aa696a1a-1d70-4e60-a841-4c80827706cc.sql
|
||||
metadata_dropped_path: /var/lib/clickhouse/metadata_dropped/default.tab.aa696a1a-1d70-4e60-a841-4c80827706cc.sql
|
||||
table_dropped_time: 2023-04-05 14:12:12
|
||||
|
||||
1 row in set. Elapsed: 0.001 sec.
|
||||
```
|
||||
|
||||
```sql
|
||||
UNDROP TABLE undropMe
|
||||
```
|
||||
```response
|
||||
Ok.
|
||||
```
|
||||
```sql
|
||||
UNDROP TABLE tab;
|
||||
|
||||
SELECT *
|
||||
FROM system.dropped_tables
|
||||
FORMAT Vertical
|
||||
```
|
||||
FORMAT Vertical;
|
||||
|
||||
```response
|
||||
Ok.
|
||||
|
||||
0 rows in set. Elapsed: 0.001 sec.
|
||||
```
|
||||
|
||||
```sql
|
||||
DESCRIBE TABLE undropMe
|
||||
FORMAT Vertical
|
||||
DESCRIBE TABLE tab
|
||||
FORMAT Vertical;
|
||||
```
|
||||
|
||||
```response
|
||||
Row 1:
|
||||
──────
|
||||
|
@ -27,9 +27,9 @@ DateTime([timezone])
|
||||
|
||||
Консольный клиент ClickHouse по умолчанию использует часовой пояс сервера, если для значения `DateTime` часовой пояс не был задан в явном виде при инициализации типа данных. Чтобы использовать часовой пояс клиента, запустите [clickhouse-client](../../interfaces/cli.md) с параметром `--use_client_time_zone`.
|
||||
|
||||
ClickHouse отображает значения в зависимости от значения параметра [date\_time\_output\_format](../../operations/settings/settings-formats.md#date_time_output_format). Текстовый формат по умолчанию `YYYY-MM-DD hh:mm:ss`. Кроме того, вы можете поменять отображение с помощью функции [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime).
|
||||
ClickHouse отображает значения в зависимости от значения параметра [date\_time\_output\_format](../../operations/settings/index.md#settings-date_time_output_format). Текстовый формат по умолчанию `YYYY-MM-DD hh:mm:ss`. Кроме того, вы можете поменять отображение с помощью функции [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime).
|
||||
|
||||
При вставке данных в ClickHouse, можно использовать различные форматы даты и времени в зависимости от значения настройки [date_time_input_format](../../operations/settings/settings-formats.md#date_time_input_format).
|
||||
При вставке данных в ClickHouse, можно использовать различные форматы даты и времени в зависимости от значения настройки [date_time_input_format](../../operations/settings/index.md#settings-date_time_input_format).
|
||||
|
||||
## Примеры {#primery}
|
||||
|
||||
@ -119,8 +119,8 @@ FROM dt
|
||||
- [Функции преобразования типов](../../sql-reference/functions/type-conversion-functions.md)
|
||||
- [Функции для работы с датой и временем](../../sql-reference/functions/date-time-functions.md)
|
||||
- [Функции для работы с массивами](../../sql-reference/functions/array-functions.md)
|
||||
- [Настройка `date_time_input_format`](../../operations/settings/settings-formats.md#date_time_input_format)
|
||||
- [Настройка `date_time_output_format`](../../operations/settings/settings-formats.md#date_time_output_format)
|
||||
- [Настройка `date_time_input_format`](../../operations/settings/index.md#settings-date_time_input_format)
|
||||
- [Настройка `date_time_output_format`](../../operations/settings/index.md)
|
||||
- [Конфигурационный параметр сервера `timezone`](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone)
|
||||
- [Параметр `session_timezone`](../../operations/settings/settings.md#session_timezone)
|
||||
- [Операторы для работы с датой и временем](../../sql-reference/operators/index.md#operators-datetime)
|
||||
|
@ -143,7 +143,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
|
||||
ParserCodec codec_parser;
|
||||
|
||||
std::string codecs_line = boost::algorithm::join(codecs, ",");
|
||||
auto ast = parseQuery(codec_parser, "(" + codecs_line + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
auto ast = parseQuery(codec_parser, "(" + codecs_line + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
codec = CompressionCodecFactory::instance().get(ast, nullptr);
|
||||
}
|
||||
else
|
||||
|
@ -234,7 +234,7 @@ int mainEntryClickHouseFormat(int argc, char ** argv)
|
||||
size_t approx_query_length = multiple ? find_first_symbols<';'>(pos, end) - pos : end - pos;
|
||||
|
||||
ASTPtr res = parseQueryAndMovePosition(
|
||||
parser, pos, end, "query", multiple, cmd_settings.max_query_size, cmd_settings.max_parser_depth);
|
||||
parser, pos, end, "query", multiple, cmd_settings.max_query_size, cmd_settings.max_parser_depth, cmd_settings.max_parser_backtracks);
|
||||
|
||||
std::unique_ptr<ReadBuffer> insert_query_payload = nullptr;
|
||||
/// If the query is INSERT ... VALUES, then we will try to parse the data.
|
||||
|
@ -44,7 +44,7 @@ String KeeperClient::executeFourLetterCommand(const String & command)
|
||||
std::vector<String> KeeperClient::getCompletions(const String & prefix) const
|
||||
{
|
||||
Tokens tokens(prefix.data(), prefix.data() + prefix.size(), 0, false);
|
||||
IParser::Pos pos(tokens, 0);
|
||||
IParser::Pos pos(tokens, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
|
||||
if (pos->type != TokenType::BareWord)
|
||||
return registered_commands_and_four_letter_words;
|
||||
@ -278,6 +278,7 @@ bool KeeperClient::processQueryText(const String & text)
|
||||
/* allow_multi_statements = */ true,
|
||||
/* max_query_size = */ 0,
|
||||
/* max_parser_depth = */ 0,
|
||||
/* max_parser_backtracks = */ 0,
|
||||
/* skip_insignificant = */ false);
|
||||
|
||||
if (!res)
|
||||
|
@ -11,6 +11,7 @@ set (CLICKHOUSE_LIBRARY_BRIDGE_SOURCES
|
||||
LibraryBridgeHandlers.cpp
|
||||
SharedLibrary.cpp
|
||||
library-bridge.cpp
|
||||
createFunctionBaseCast.cpp
|
||||
)
|
||||
|
||||
clickhouse_add_executable(clickhouse-library-bridge ${CLICKHOUSE_LIBRARY_BRIDGE_SOURCES})
|
||||
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Bridge/IBridge.h>
|
||||
#include "LibraryBridgeHandlerFactory.h"
|
||||
|
||||
|
23
programs/library-bridge/createFunctionBaseCast.cpp
Normal file
23
programs/library-bridge/createFunctionBaseCast.cpp
Normal file
@ -0,0 +1,23 @@
|
||||
#include <memory>
|
||||
#include <Functions/CastOverloadResolver.h>
|
||||
#include <Core/ColumnsWithTypeAndName.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
class IFunctionBase;
|
||||
using FunctionBasePtr = std::shared_ptr<const IFunctionBase>;
|
||||
|
||||
FunctionBasePtr createFunctionBaseCast(
|
||||
ContextPtr, const char *, const ColumnsWithTypeAndName &, const DataTypePtr &, std::optional<CastDiagnostic>, CastType)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Type conversions are not implemented for Library Bridge");
|
||||
}
|
||||
|
||||
}
|
@ -1000,12 +1000,6 @@ extern "C" int LLVMFuzzerInitialize(int * pargc, char *** pargv)
|
||||
{
|
||||
std::vector<char *> argv(*pargv, *pargv + (*pargc + 1));
|
||||
|
||||
if (!isClickhouseApp("local", argv))
|
||||
{
|
||||
std::cerr << "\033[31m" << "ClickHouse compiled in fuzzing mode, only clickhouse local is available." << "\033[0m" << std::endl;
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/// As a user you can add flags to clickhouse binary in fuzzing mode as follows
|
||||
/// clickhouse local <set of clickhouse-local specific flag> -- <set of libfuzzer flags>
|
||||
|
||||
@ -1013,13 +1007,16 @@ extern "C" int LLVMFuzzerInitialize(int * pargc, char *** pargv)
|
||||
|
||||
auto it = argv.begin() + 1;
|
||||
for (; *it; ++it)
|
||||
{
|
||||
if (strcmp(*it, "--") == 0)
|
||||
{
|
||||
++it;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
while (*it)
|
||||
{
|
||||
if (strncmp(*it, "--", 2) != 0)
|
||||
{
|
||||
*(p++) = *it;
|
||||
@ -1027,6 +1024,7 @@ extern "C" int LLVMFuzzerInitialize(int * pargc, char *** pargv)
|
||||
}
|
||||
else
|
||||
++it;
|
||||
}
|
||||
|
||||
*pargc = static_cast<int>(p - &(*pargv)[0]);
|
||||
*p = nullptr;
|
||||
|
@ -68,7 +68,6 @@ namespace
|
||||
using MainFunc = int (*)(int, char**);
|
||||
|
||||
#if !defined(FUZZING_MODE)
|
||||
|
||||
/// Add an item here to register new application
|
||||
std::pair<std::string_view, MainFunc> clickhouse_applications[] =
|
||||
{
|
||||
@ -105,13 +104,6 @@ std::pair<std::string_view, MainFunc> clickhouse_applications[] =
|
||||
{"restart", mainEntryClickHouseRestart},
|
||||
};
|
||||
|
||||
/// Add an item here to register a new short name
|
||||
std::pair<std::string_view, std::string_view> clickhouse_short_names[] =
|
||||
{
|
||||
{"chl", "local"},
|
||||
{"chc", "client"},
|
||||
};
|
||||
|
||||
int printHelp(int, char **)
|
||||
{
|
||||
std::cerr << "Use one of the following commands:" << std::endl;
|
||||
@ -121,6 +113,13 @@ int printHelp(int, char **)
|
||||
}
|
||||
#endif
|
||||
|
||||
/// Add an item here to register a new short name
|
||||
std::pair<std::string_view, std::string_view> clickhouse_short_names[] =
|
||||
{
|
||||
{"chl", "local"},
|
||||
{"chc", "client"},
|
||||
};
|
||||
|
||||
|
||||
enum class InstructionFail
|
||||
{
|
||||
|
@ -13,6 +13,7 @@ set (CLICKHOUSE_ODBC_BRIDGE_SOURCES
|
||||
getIdentifierQuote.cpp
|
||||
odbc-bridge.cpp
|
||||
validateODBCConnectionString.cpp
|
||||
createFunctionBaseCast.cpp
|
||||
)
|
||||
|
||||
clickhouse_add_executable(clickhouse-odbc-bridge ${CLICKHOUSE_ODBC_BRIDGE_SOURCES})
|
||||
|
23
programs/odbc-bridge/createFunctionBaseCast.cpp
Normal file
23
programs/odbc-bridge/createFunctionBaseCast.cpp
Normal file
@ -0,0 +1,23 @@
|
||||
#include <memory>
|
||||
#include <Functions/CastOverloadResolver.h>
|
||||
#include <Core/ColumnsWithTypeAndName.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
class IFunctionBase;
|
||||
using FunctionBasePtr = std::shared_ptr<const IFunctionBase>;
|
||||
|
||||
FunctionBasePtr createFunctionBaseCast(
|
||||
ContextPtr, const char *, const ColumnsWithTypeAndName &, const DataTypePtr &, std::optional<CastDiagnostic>, CastType)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Type conversions are not implemented for Library Bridge");
|
||||
}
|
||||
|
||||
}
|
@ -733,8 +733,6 @@ try
|
||||
LOG_INFO(log, "Available CPU instruction sets: {}", cpu_info);
|
||||
#endif
|
||||
|
||||
sanityChecks(*this);
|
||||
|
||||
// Initialize global thread pool. Do it before we fetch configs from zookeeper
|
||||
// nodes (`from_zk`), because ZooKeeper interface uses the pool. We will
|
||||
// ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well.
|
||||
@ -904,6 +902,7 @@ try
|
||||
config_processor.savePreprocessedConfig(loaded_config, config().getString("path", DBMS_DEFAULT_PATH));
|
||||
config().removeConfiguration(old_configuration.get());
|
||||
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
|
||||
global_context->setConfig(loaded_config.configuration);
|
||||
}
|
||||
|
||||
Settings::checkNoSettingNamesAtTopLevel(config(), config_path);
|
||||
@ -911,6 +910,9 @@ try
|
||||
/// We need to reload server settings because config could be updated via zookeeper.
|
||||
server_settings.loadSettingsFromConfig(config());
|
||||
|
||||
/// NOTE: Do sanity checks after we loaded all possible substitutions (for the configuration) from ZK
|
||||
sanityChecks(*this);
|
||||
|
||||
#if defined(OS_LINUX)
|
||||
std::string executable_path = getExecutablePath();
|
||||
|
||||
|
@ -62,7 +62,7 @@ AccessEntityPtr deserializeAccessEntityImpl(const String & definition)
|
||||
const char * end = begin + definition.size();
|
||||
while (pos < end)
|
||||
{
|
||||
queries.emplace_back(parseQueryAndMovePosition(parser, pos, end, "", true, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH));
|
||||
queries.emplace_back(parseQueryAndMovePosition(parser, pos, end, "", true, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS));
|
||||
while (isWhitespaceASCII(*pos) || *pos == ';')
|
||||
++pos;
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ void RowPolicyCache::PolicyInfo::setPolicy(const RowPolicyPtr & policy_)
|
||||
try
|
||||
{
|
||||
ParserExpression parser;
|
||||
parsed_filters[filter_type_i] = parseQuery(parser, filter, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
parsed_filters[filter_type_i] = parseQuery(parser, filter, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -66,7 +66,7 @@ namespace
|
||||
|
||||
String error_message;
|
||||
const char * pos = string_query.data();
|
||||
auto ast = tryParseQuery(parser, pos, pos + string_query.size(), error_message, false, "", false, 0, 0);
|
||||
auto ast = tryParseQuery(parser, pos, pos + string_query.size(), error_message, false, "", false, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, true);
|
||||
|
||||
if (!ast)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failed to parse grant query. Error: {}", error_message);
|
||||
|
@ -483,6 +483,7 @@ public:
|
||||
}
|
||||
|
||||
bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
|
||||
bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; }
|
||||
|
||||
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
|
||||
{
|
||||
@ -576,6 +577,7 @@ public:
|
||||
}
|
||||
|
||||
bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
|
||||
bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; }
|
||||
|
||||
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
|
||||
{
|
||||
|
@ -142,6 +142,7 @@ public:
|
||||
}
|
||||
|
||||
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
|
||||
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
|
||||
|
||||
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
|
||||
{
|
||||
|
@ -165,6 +165,7 @@ public:
|
||||
}
|
||||
|
||||
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
|
||||
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
|
||||
|
||||
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
|
||||
{
|
||||
|
@ -111,6 +111,7 @@ public:
|
||||
}
|
||||
|
||||
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
|
||||
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
|
||||
|
||||
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
|
||||
{
|
||||
|
@ -152,6 +152,7 @@ public:
|
||||
}
|
||||
|
||||
bool isAbleToParallelizeMerge() const override { return nested_function->isAbleToParallelizeMerge(); }
|
||||
bool canOptimizeEqualKeysRanges() const override { return nested_function->canOptimizeEqualKeysRanges(); }
|
||||
|
||||
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
|
||||
{
|
||||
|
@ -92,6 +92,7 @@ public:
|
||||
}
|
||||
|
||||
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
|
||||
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
|
||||
|
||||
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
|
||||
{
|
||||
|
@ -162,6 +162,10 @@ public:
|
||||
/// Tells if merge() with thread pool parameter could be used.
|
||||
virtual bool isAbleToParallelizeMerge() const { return false; }
|
||||
|
||||
/// Return true if it is allowed to replace call of `addBatch`
|
||||
/// to `addBatchSinglePlace` for ranges of consecutive equal keys.
|
||||
virtual bool canOptimizeEqualKeysRanges() const { return true; }
|
||||
|
||||
/// Should be used only if isAbleToParallelizeMerge() returned true.
|
||||
virtual void
|
||||
merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, ThreadPool & /*thread_pool*/, Arena * /*arena*/) const
|
||||
|
@ -27,6 +27,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
|
||||
|
||||
auto initialize = [&]() mutable
|
||||
{
|
||||
if (context)
|
||||
return true;
|
||||
|
||||
shared_context = Context::createShared();
|
||||
context = Context::createGlobal(shared_context.get());
|
||||
context->makeGlobalContext();
|
||||
|
@ -81,7 +81,8 @@ void getAggregateFunctionNameAndParametersArray(
|
||||
ParserExpressionList params_parser(false);
|
||||
ASTPtr args_ast = parseQuery(params_parser,
|
||||
parameters_str.data(), parameters_str.data() + parameters_str.size(),
|
||||
"parameters of aggregate function in " + error_context, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
"parameters of aggregate function in " + error_context,
|
||||
0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
|
||||
if (args_ast->children.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect list of parameters to aggregate function {}",
|
||||
|
@ -54,10 +54,10 @@ public:
|
||||
if (!constant_node)
|
||||
return;
|
||||
|
||||
const auto & constant_value_literal = constant_node->getValue();
|
||||
if (!isInt64OrUInt64FieldType(constant_value_literal.getType()))
|
||||
if (auto constant_type = constant_node->getResultType(); !isNativeInteger(constant_type))
|
||||
return;
|
||||
|
||||
const auto & constant_value_literal = constant_node->getValue();
|
||||
if (getSettings().aggregate_functions_null_for_empty)
|
||||
return;
|
||||
|
||||
|
@ -25,7 +25,7 @@ String BackupInfo::toString() const
|
||||
BackupInfo BackupInfo::fromString(const String & str)
|
||||
{
|
||||
ParserIdentifierWithOptionalParameters parser;
|
||||
ASTPtr ast = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
ASTPtr ast = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
return fromAST(*ast);
|
||||
}
|
||||
|
||||
|
@ -101,10 +101,12 @@ RestorerFromBackup::RestorerFromBackup(
|
||||
|
||||
RestorerFromBackup::~RestorerFromBackup()
|
||||
{
|
||||
if (!futures.empty())
|
||||
/// If an exception occurs we can come here to the destructor having some tasks still unfinished.
|
||||
/// We have to wait until they finish.
|
||||
if (getNumFutures() > 0)
|
||||
{
|
||||
LOG_ERROR(log, "RestorerFromBackup must not be destroyed while {} tasks are still running", futures.size());
|
||||
chassert(false && "RestorerFromBackup must not be destroyed while some tasks are still running");
|
||||
LOG_INFO(log, "Waiting for {} tasks to finish", getNumFutures());
|
||||
waitFutures();
|
||||
}
|
||||
}
|
||||
|
||||
@ -422,7 +424,7 @@ void RestorerFromBackup::findTableInBackupImpl(const QualifiedTableName & table_
|
||||
readStringUntilEOF(create_query_str, *read_buffer);
|
||||
read_buffer.reset();
|
||||
ParserCreateQuery create_parser;
|
||||
ASTPtr create_table_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
ASTPtr create_table_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
applyCustomStoragePolicy(create_table_query);
|
||||
renameDatabaseAndTableNameInCreateQuery(create_table_query, renaming_map, context->getGlobalContext());
|
||||
String create_table_query_str = serializeAST(*create_table_query);
|
||||
@ -532,7 +534,7 @@ void RestorerFromBackup::findDatabaseInBackupImpl(const String & database_name_i
|
||||
readStringUntilEOF(create_query_str, *read_buffer);
|
||||
read_buffer.reset();
|
||||
ParserCreateQuery create_parser;
|
||||
ASTPtr create_database_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
ASTPtr create_database_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
renameDatabaseAndTableNameInCreateQuery(create_database_query, renaming_map, context->getGlobalContext());
|
||||
String create_database_query_str = serializeAST(*create_database_query);
|
||||
|
||||
|
@ -345,7 +345,7 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_mu
|
||||
if (dialect == Dialect::kusto)
|
||||
parser = std::make_unique<ParserKQLStatement>(end, global_context->getSettings().allow_settings_after_format_in_insert);
|
||||
else if (dialect == Dialect::prql)
|
||||
parser = std::make_unique<ParserPRQLQuery>(max_length, settings.max_parser_depth);
|
||||
parser = std::make_unique<ParserPRQLQuery>(max_length, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
else
|
||||
parser = std::make_unique<ParserQuery>(end, global_context->getSettings().allow_settings_after_format_in_insert);
|
||||
|
||||
@ -353,9 +353,9 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_mu
|
||||
{
|
||||
String message;
|
||||
if (dialect == Dialect::kusto)
|
||||
res = tryParseKQLQuery(*parser, pos, end, message, true, "", allow_multi_statements, max_length, settings.max_parser_depth);
|
||||
res = tryParseKQLQuery(*parser, pos, end, message, true, "", allow_multi_statements, max_length, settings.max_parser_depth, settings.max_parser_backtracks, true);
|
||||
else
|
||||
res = tryParseQuery(*parser, pos, end, message, true, "", allow_multi_statements, max_length, settings.max_parser_depth);
|
||||
res = tryParseQuery(*parser, pos, end, message, true, "", allow_multi_statements, max_length, settings.max_parser_depth, settings.max_parser_backtracks, true);
|
||||
|
||||
if (!res)
|
||||
{
|
||||
@ -366,9 +366,9 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_mu
|
||||
else
|
||||
{
|
||||
if (dialect == Dialect::kusto)
|
||||
res = parseKQLQueryAndMovePosition(*parser, pos, end, "", allow_multi_statements, max_length, settings.max_parser_depth);
|
||||
res = parseKQLQueryAndMovePosition(*parser, pos, end, "", allow_multi_statements, max_length, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
else
|
||||
res = parseQueryAndMovePosition(*parser, pos, end, "", allow_multi_statements, max_length, settings.max_parser_depth);
|
||||
res = parseQueryAndMovePosition(*parser, pos, end, "", allow_multi_statements, max_length, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
}
|
||||
|
||||
if (is_interactive)
|
||||
@ -385,12 +385,12 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_mu
|
||||
|
||||
|
||||
/// Consumes trailing semicolons and tries to consume the same-line trailing comment.
|
||||
void ClientBase::adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, uint32_t max_parser_depth)
|
||||
void ClientBase::adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, uint32_t max_parser_depth, uint32_t max_parser_backtracks)
|
||||
{
|
||||
// We have to skip the trailing semicolon that might be left
|
||||
// after VALUES parsing or just after a normal semicolon-terminated query.
|
||||
Tokens after_query_tokens(this_query_end, all_queries_end);
|
||||
IParser::Pos after_query_iterator(after_query_tokens, max_parser_depth);
|
||||
IParser::Pos after_query_iterator(after_query_tokens, max_parser_depth, max_parser_backtracks);
|
||||
while (after_query_iterator.isValid() && after_query_iterator->type == TokenType::Semicolon)
|
||||
{
|
||||
this_query_end = after_query_iterator->end;
|
||||
@ -1984,6 +1984,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
||||
return MultiQueryProcessingStage::QUERIES_END;
|
||||
|
||||
unsigned max_parser_depth = static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth);
|
||||
unsigned max_parser_backtracks = static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks);
|
||||
|
||||
// If there are only comments left until the end of file, we just
|
||||
// stop. The parser can't handle this situation because it always
|
||||
@ -1994,7 +1995,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
||||
// and it makes more sense to treat them as such.
|
||||
{
|
||||
Tokens tokens(this_query_begin, all_queries_end);
|
||||
IParser::Pos token_iterator(tokens, max_parser_depth);
|
||||
IParser::Pos token_iterator(tokens, max_parser_depth, max_parser_backtracks);
|
||||
if (!token_iterator.isValid())
|
||||
return MultiQueryProcessingStage::QUERIES_END;
|
||||
}
|
||||
@ -2015,7 +2016,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
||||
if (ignore_error)
|
||||
{
|
||||
Tokens tokens(this_query_begin, all_queries_end);
|
||||
IParser::Pos token_iterator(tokens, max_parser_depth);
|
||||
IParser::Pos token_iterator(tokens, max_parser_depth, max_parser_backtracks);
|
||||
while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
|
||||
++token_iterator;
|
||||
this_query_begin = token_iterator->end;
|
||||
@ -2055,7 +2056,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
||||
// after we have processed the query. But even this guess is
|
||||
// beneficial so that we see proper trailing comments in "echo" and
|
||||
// server log.
|
||||
adjustQueryEnd(this_query_end, all_queries_end, max_parser_depth);
|
||||
adjustQueryEnd(this_query_end, all_queries_end, max_parser_depth, max_parser_backtracks);
|
||||
return MultiQueryProcessingStage::EXECUTE_QUERY;
|
||||
}
|
||||
|
||||
@ -2251,7 +2252,8 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
|
||||
this_query_end = insert_ast->end;
|
||||
adjustQueryEnd(
|
||||
this_query_end, all_queries_end,
|
||||
static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth));
|
||||
static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth),
|
||||
static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks));
|
||||
}
|
||||
|
||||
// Report error.
|
||||
|
@ -94,7 +94,7 @@ protected:
|
||||
void processParsedSingleQuery(const String & full_query, const String & query_to_execute,
|
||||
ASTPtr parsed_query, std::optional<bool> echo_query_ = {}, bool report_error = false);
|
||||
|
||||
static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, uint32_t max_parser_depth);
|
||||
static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, uint32_t max_parser_depth, uint32_t max_parser_backtracks);
|
||||
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const;
|
||||
static void setupSignalHandler();
|
||||
|
||||
|
@ -569,7 +569,8 @@ void QueryFuzzer::fuzzColumnDeclaration(ASTColumnDeclaration & column)
|
||||
auto data_type = fuzzDataType(DataTypeFactory::instance().get(column.type));
|
||||
|
||||
ParserDataType parser;
|
||||
column.type = parseQuery(parser, data_type->getName(), DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
column.type = parseQuery(parser, data_type->getName(),
|
||||
DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
}
|
||||
}
|
||||
|
||||
@ -821,7 +822,8 @@ static ASTPtr tryParseInsertQuery(const String & full_query)
|
||||
ParserInsertQuery parser(end, false);
|
||||
String message;
|
||||
|
||||
return tryParseQuery(parser, pos, end, message, false, "", false, DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||
return tryParseQuery(parser, pos, end, message, false, "", false,
|
||||
DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, true);
|
||||
}
|
||||
|
||||
ASTs QueryFuzzer::getInsertQueriesForFuzzedTables(const String & full_query)
|
||||
@ -914,6 +916,38 @@ ASTPtr QueryFuzzer::fuzzLiteralUnderExpressionList(ASTPtr child)
|
||||
child = makeASTFunction(
|
||||
"toFixedString", std::make_shared<ASTLiteral>(value), std::make_shared<ASTLiteral>(static_cast<UInt64>(value.size())));
|
||||
}
|
||||
else if (type == Field::Types::Which::UInt64 && fuzz_rand() % 7 == 0)
|
||||
{
|
||||
child = makeASTFunction(fuzz_rand() % 2 == 0 ? "toUInt128" : "toUInt256", std::make_shared<ASTLiteral>(l->value.get<UInt64>()));
|
||||
}
|
||||
else if (type == Field::Types::Which::Int64 && fuzz_rand() % 7 == 0)
|
||||
{
|
||||
child = makeASTFunction(fuzz_rand() % 2 == 0 ? "toInt128" : "toInt256", std::make_shared<ASTLiteral>(l->value.get<Int64>()));
|
||||
}
|
||||
else if (type == Field::Types::Which::Float64 && fuzz_rand() % 7 == 0)
|
||||
{
|
||||
int decimal = fuzz_rand() % 4;
|
||||
if (decimal == 0)
|
||||
child = makeASTFunction(
|
||||
"toDecimal32",
|
||||
std::make_shared<ASTLiteral>(l->value.get<Float64>()),
|
||||
std::make_shared<ASTLiteral>(static_cast<UInt64>(fuzz_rand() % 9)));
|
||||
else if (decimal == 1)
|
||||
child = makeASTFunction(
|
||||
"toDecimal64",
|
||||
std::make_shared<ASTLiteral>(l->value.get<Float64>()),
|
||||
std::make_shared<ASTLiteral>(static_cast<UInt64>(fuzz_rand() % 18)));
|
||||
else if (decimal == 2)
|
||||
child = makeASTFunction(
|
||||
"toDecimal128",
|
||||
std::make_shared<ASTLiteral>(l->value.get<Float64>()),
|
||||
std::make_shared<ASTLiteral>(static_cast<UInt64>(fuzz_rand() % 38)));
|
||||
else
|
||||
child = makeASTFunction(
|
||||
"toDecimal256",
|
||||
std::make_shared<ASTLiteral>(l->value.get<Float64>()),
|
||||
std::make_shared<ASTLiteral>(static_cast<UInt64>(fuzz_rand() % 76)));
|
||||
}
|
||||
|
||||
if (fuzz_rand() % 7 == 0)
|
||||
child = makeASTFunction("toNullable", child);
|
||||
@ -933,7 +967,19 @@ ASTPtr QueryFuzzer::reverseLiteralFuzzing(ASTPtr child)
|
||||
{
|
||||
if (auto * function = child.get()->as<ASTFunction>())
|
||||
{
|
||||
std::unordered_set<String> can_be_reverted{"toNullable", "toLowCardinality", "materialize"};
|
||||
const std::unordered_set<String> can_be_reverted{
|
||||
"materialize",
|
||||
"toDecimal32", /// Keeping the first parameter only should be ok (valid query most of the time)
|
||||
"toDecimal64",
|
||||
"toDecimal128",
|
||||
"toDecimal256",
|
||||
"toFixedString", /// Same as toDecimal
|
||||
"toInt128",
|
||||
"toInt256",
|
||||
"toLowCardinality",
|
||||
"toNullable",
|
||||
"toUInt128",
|
||||
"toUInt256"};
|
||||
if (can_be_reverted.contains(function->name) && function->children.size() == 1)
|
||||
{
|
||||
if (fuzz_rand() % 7 == 0)
|
||||
|
@ -39,7 +39,7 @@ void logAboutProgress(LoggerPtr log, size_t processed, size_t total, AtomicStopw
|
||||
{
|
||||
if (total && (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)))
|
||||
{
|
||||
LOG_INFO(log, "Processed: {}%", static_cast<Int64>(processed * 1000.0 / total) * 0.1);
|
||||
LOG_INFO(log, "Processed: {:.1f}%", static_cast<double>(processed) * 100.0 / total);
|
||||
watch.restart();
|
||||
}
|
||||
}
|
||||
|
@ -62,7 +62,6 @@ struct LastElementCache
|
||||
bool check(const Key & key) const { return value.first == key; }
|
||||
|
||||
bool hasOnlyOneValue() const { return found && misses == 1; }
|
||||
UInt64 getMisses() const { return misses; }
|
||||
};
|
||||
|
||||
template <typename Data>
|
||||
@ -232,7 +231,7 @@ public:
|
||||
ALWAYS_INLINE UInt64 getCacheMissesSinceLastReset() const
|
||||
{
|
||||
if constexpr (consecutive_keys_optimization)
|
||||
return cache.getMisses();
|
||||
return cache.misses;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -427,6 +427,8 @@ void ConfigProcessor::doIncludesRecursive(
|
||||
|
||||
/// Replace the original contents, not add to it.
|
||||
bool replace = attributes->getNamedItem("replace");
|
||||
/// Merge with the original contents
|
||||
bool merge = attributes->getNamedItem("merge");
|
||||
|
||||
bool included_something = false;
|
||||
|
||||
@ -450,7 +452,6 @@ void ConfigProcessor::doIncludesRecursive(
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Replace the whole node not just contents.
|
||||
if (node->nodeName() == "include")
|
||||
{
|
||||
const NodeListPtr children = node_to_include->childNodes();
|
||||
@ -458,8 +459,18 @@ void ConfigProcessor::doIncludesRecursive(
|
||||
for (Node * child = children->item(0); child; child = next_child)
|
||||
{
|
||||
next_child = child->nextSibling();
|
||||
NodePtr new_node = config->importNode(child, true);
|
||||
node->parentNode()->insertBefore(new_node, node);
|
||||
|
||||
/// Recursively replace existing nodes in merge mode
|
||||
if (merge)
|
||||
{
|
||||
NodePtr new_node = config->importNode(child->parentNode(), true);
|
||||
mergeRecursive(config, node->parentNode(), new_node);
|
||||
}
|
||||
else /// Append to existing node by default
|
||||
{
|
||||
NodePtr new_node = config->importNode(child, true);
|
||||
node->parentNode()->insertBefore(new_node, node);
|
||||
}
|
||||
}
|
||||
|
||||
node->parentNode()->removeChild(node);
|
||||
@ -777,9 +788,9 @@ ConfigProcessor::LoadedConfig ConfigProcessor::loadConfig(bool allow_zk_includes
|
||||
}
|
||||
|
||||
ConfigProcessor::LoadedConfig ConfigProcessor::loadConfigWithZooKeeperIncludes(
|
||||
zkutil::ZooKeeperNodeCache & zk_node_cache,
|
||||
const zkutil::EventPtr & zk_changed_event,
|
||||
bool fallback_to_preprocessed)
|
||||
zkutil::ZooKeeperNodeCache & zk_node_cache,
|
||||
const zkutil::EventPtr & zk_changed_event,
|
||||
bool fallback_to_preprocessed)
|
||||
{
|
||||
XMLDocumentPtr config_xml;
|
||||
bool has_zk_includes;
|
||||
|
@ -584,10 +584,6 @@
|
||||
M(703, INVALID_IDENTIFIER) \
|
||||
M(704, QUERY_CACHE_USED_WITH_NONDETERMINISTIC_FUNCTIONS) \
|
||||
M(705, TABLE_NOT_EMPTY) \
|
||||
\
|
||||
M(900, DISTRIBUTED_CACHE_ERROR) \
|
||||
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
|
||||
\
|
||||
M(706, LIBSSH_ERROR) \
|
||||
M(707, GCP_ERROR) \
|
||||
M(708, ILLEGAL_STATISTIC) \
|
||||
@ -599,6 +595,10 @@
|
||||
M(715, CANNOT_DETECT_FORMAT) \
|
||||
M(716, CANNOT_FORGET_PARTITION) \
|
||||
M(717, EXPERIMENTAL_FEATURE_ERROR) \
|
||||
M(718, TOO_SLOW_PARSING) \
|
||||
\
|
||||
M(900, DISTRIBUTED_CACHE_ERROR) \
|
||||
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
|
||||
\
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
M(1000, POCO_EXCEPTION) \
|
||||
|
@ -25,6 +25,18 @@ inline bool isFinite(T x)
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool canConvertTo(Float64 x)
|
||||
{
|
||||
if constexpr (std::is_floating_point_v<T>)
|
||||
return true;
|
||||
if (!isFinite(x))
|
||||
return false;
|
||||
if (x > Float64(std::numeric_limits<T>::max()) || x < Float64(std::numeric_limits<T>::lowest()))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
T NaNOrZero()
|
||||
|
@ -302,7 +302,7 @@ private:
|
||||
readStringUntilEOF(query, in);
|
||||
|
||||
ParserCreateNamedCollectionQuery parser;
|
||||
auto ast = parseQuery(parser, query, "in file " + path, 0, settings.max_parser_depth);
|
||||
auto ast = parseQuery(parser, query, "in file " + path, 0, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
const auto & create_query = ast->as<const ASTCreateNamedCollectionQuery &>();
|
||||
return create_query;
|
||||
}
|
||||
|
@ -534,6 +534,7 @@ The server successfully detected this situation and will download merged part fr
|
||||
\
|
||||
M(AggregationPreallocatedElementsInHashTables, "How many elements were preallocated in hash tables for aggregation.") \
|
||||
M(AggregationHashTablesInitializedAsTwoLevel, "How many hash tables were inited as two-level for aggregation.") \
|
||||
M(AggregationOptimizedEqualRangesOfKeys, "For how many blocks optimization of equal ranges of keys was applied") \
|
||||
\
|
||||
M(MetadataFromKeeperCacheHit, "Number of times an object storage metadata request was answered from cache without making request to Keeper") \
|
||||
M(MetadataFromKeeperCacheMiss, "Number of times an object storage metadata request had to be answered from Keeper") \
|
||||
|
@ -387,7 +387,9 @@ public:
|
||||
|
||||
/// Introspection
|
||||
std::atomic<UInt64> dequeued_requests{0};
|
||||
std::atomic<UInt64> canceled_requests{0};
|
||||
std::atomic<ResourceCost> dequeued_cost{0};
|
||||
std::atomic<ResourceCost> canceled_cost{0};
|
||||
std::atomic<UInt64> busy_periods{0};
|
||||
};
|
||||
|
||||
|
@ -50,6 +50,12 @@ public:
|
||||
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
|
||||
virtual void enqueueRequest(ResourceRequest * request) = 0;
|
||||
|
||||
/// Cancel previously enqueued request.
|
||||
/// Returns `false` and does nothing given unknown or already executed request.
|
||||
/// Returns `true` if requests has been found and canceled.
|
||||
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
|
||||
virtual bool cancelRequest(ResourceRequest * request) = 0;
|
||||
|
||||
/// For introspection
|
||||
ResourceCost getBudget() const
|
||||
{
|
||||
|
@ -134,56 +134,65 @@ public:
|
||||
|
||||
std::pair<ResourceRequest *, bool> dequeueRequest() override
|
||||
{
|
||||
if (heap_size == 0)
|
||||
return {nullptr, false};
|
||||
|
||||
// Recursively pull request from child
|
||||
auto [request, child_active] = items.front().child->dequeueRequest();
|
||||
assert(request != nullptr);
|
||||
std::pop_heap(items.begin(), items.begin() + heap_size);
|
||||
Item & current = items[heap_size - 1];
|
||||
|
||||
// SFQ fairness invariant: system vruntime equals last served request start-time
|
||||
assert(current.vruntime >= system_vruntime);
|
||||
system_vruntime = current.vruntime;
|
||||
|
||||
// By definition vruntime is amount of consumed resource (cost) divided by weight
|
||||
current.vruntime += double(request->cost) / current.child->info.weight;
|
||||
max_vruntime = std::max(max_vruntime, current.vruntime);
|
||||
|
||||
if (child_active) // Put active child back in heap after vruntime update
|
||||
// Cycle is required to do deactivations in the case of canceled requests, when dequeueRequest returns `nullptr`
|
||||
while (true)
|
||||
{
|
||||
std::push_heap(items.begin(), items.begin() + heap_size);
|
||||
}
|
||||
else // Deactivate child if it is empty, but remember it's vruntime for latter activations
|
||||
{
|
||||
heap_size--;
|
||||
if (heap_size == 0)
|
||||
return {nullptr, false};
|
||||
|
||||
// Store index of this inactive child in `parent.idx`
|
||||
// This enables O(1) search of inactive children instead of O(n)
|
||||
current.child->info.parent.idx = heap_size;
|
||||
}
|
||||
// Recursively pull request from child
|
||||
auto [request, child_active] = items.front().child->dequeueRequest();
|
||||
std::pop_heap(items.begin(), items.begin() + heap_size);
|
||||
Item & current = items[heap_size - 1];
|
||||
|
||||
// Reset any difference between children on busy period end
|
||||
if (heap_size == 0)
|
||||
{
|
||||
// Reset vtime to zero to avoid floating-point error accumulation,
|
||||
// but do not reset too often, because it's O(N)
|
||||
UInt64 ns = clock_gettime_ns();
|
||||
if (last_reset_ns + 1000000000 < ns)
|
||||
if (request)
|
||||
{
|
||||
last_reset_ns = ns;
|
||||
for (Item & item : items)
|
||||
item.vruntime = 0;
|
||||
max_vruntime = 0;
|
||||
}
|
||||
system_vruntime = max_vruntime;
|
||||
busy_periods++;
|
||||
}
|
||||
// SFQ fairness invariant: system vruntime equals last served request start-time
|
||||
assert(current.vruntime >= system_vruntime);
|
||||
system_vruntime = current.vruntime;
|
||||
|
||||
dequeued_requests++;
|
||||
dequeued_cost += request->cost;
|
||||
return {request, heap_size > 0};
|
||||
// By definition vruntime is amount of consumed resource (cost) divided by weight
|
||||
current.vruntime += double(request->cost) / current.child->info.weight;
|
||||
max_vruntime = std::max(max_vruntime, current.vruntime);
|
||||
}
|
||||
|
||||
if (child_active) // Put active child back in heap after vruntime update
|
||||
{
|
||||
std::push_heap(items.begin(), items.begin() + heap_size);
|
||||
}
|
||||
else // Deactivate child if it is empty, but remember it's vruntime for latter activations
|
||||
{
|
||||
heap_size--;
|
||||
|
||||
// Store index of this inactive child in `parent.idx`
|
||||
// This enables O(1) search of inactive children instead of O(n)
|
||||
current.child->info.parent.idx = heap_size;
|
||||
}
|
||||
|
||||
// Reset any difference between children on busy period end
|
||||
if (heap_size == 0)
|
||||
{
|
||||
// Reset vtime to zero to avoid floating-point error accumulation,
|
||||
// but do not reset too often, because it's O(N)
|
||||
UInt64 ns = clock_gettime_ns();
|
||||
if (last_reset_ns + 1000000000 < ns)
|
||||
{
|
||||
last_reset_ns = ns;
|
||||
for (Item & item : items)
|
||||
item.vruntime = 0;
|
||||
max_vruntime = 0;
|
||||
}
|
||||
system_vruntime = max_vruntime;
|
||||
busy_periods++;
|
||||
}
|
||||
|
||||
if (request)
|
||||
{
|
||||
dequeued_requests++;
|
||||
dequeued_cost += request->cost;
|
||||
return {request, heap_size > 0};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool isActive() override
|
||||
|
@ -39,8 +39,7 @@ public:
|
||||
|
||||
void enqueueRequest(ResourceRequest * request) override
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
request->enqueue_ns = clock_gettime_ns();
|
||||
std::lock_guard lock(mutex);
|
||||
queue_cost += request->cost;
|
||||
bool was_empty = requests.empty();
|
||||
requests.push_back(request);
|
||||
@ -50,7 +49,7 @@ public:
|
||||
|
||||
std::pair<ResourceRequest *, bool> dequeueRequest() override
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
if (requests.empty())
|
||||
return {nullptr, false};
|
||||
ResourceRequest * result = requests.front();
|
||||
@ -63,9 +62,29 @@ public:
|
||||
return {result, !requests.empty()};
|
||||
}
|
||||
|
||||
bool cancelRequest(ResourceRequest * request) override
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
// TODO(serxa): reimplement queue as intrusive list of ResourceRequest to make this O(1) instead of O(N)
|
||||
for (auto i = requests.begin(), e = requests.end(); i != e; ++i)
|
||||
{
|
||||
if (*i == request)
|
||||
{
|
||||
requests.erase(i);
|
||||
if (requests.empty())
|
||||
busy_periods++;
|
||||
queue_cost -= request->cost;
|
||||
canceled_requests++;
|
||||
canceled_cost += request->cost;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool isActive() override
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
return !requests.empty();
|
||||
}
|
||||
|
||||
@ -98,14 +117,14 @@ public:
|
||||
|
||||
std::pair<UInt64, Int64> getQueueLengthAndCost()
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
return {requests.size(), queue_cost};
|
||||
}
|
||||
|
||||
private:
|
||||
std::mutex mutex;
|
||||
Int64 queue_cost = 0;
|
||||
std::deque<ResourceRequest *> requests;
|
||||
std::deque<ResourceRequest *> requests; // TODO(serxa): reimplement it using intrusive list to avoid allocations/deallocations and O(N) during cancel
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -102,25 +102,31 @@ public:
|
||||
|
||||
std::pair<ResourceRequest *, bool> dequeueRequest() override
|
||||
{
|
||||
if (items.empty())
|
||||
return {nullptr, false};
|
||||
|
||||
// Recursively pull request from child
|
||||
auto [request, child_active] = items.front().child->dequeueRequest();
|
||||
assert(request != nullptr);
|
||||
|
||||
// Deactivate child if it is empty
|
||||
if (!child_active)
|
||||
// Cycle is required to do deactivations in the case of canceled requests, when dequeueRequest returns `nullptr`
|
||||
while (true)
|
||||
{
|
||||
std::pop_heap(items.begin(), items.end());
|
||||
items.pop_back();
|
||||
if (items.empty())
|
||||
busy_periods++;
|
||||
}
|
||||
return {nullptr, false};
|
||||
|
||||
dequeued_requests++;
|
||||
dequeued_cost += request->cost;
|
||||
return {request, !items.empty()};
|
||||
// Recursively pull request from child
|
||||
auto [request, child_active] = items.front().child->dequeueRequest();
|
||||
|
||||
// Deactivate child if it is empty
|
||||
if (!child_active)
|
||||
{
|
||||
std::pop_heap(items.begin(), items.end());
|
||||
items.pop_back();
|
||||
if (items.empty())
|
||||
busy_periods++;
|
||||
}
|
||||
|
||||
if (request)
|
||||
{
|
||||
dequeued_requests++;
|
||||
dequeued_cost += request->cost;
|
||||
return {request, !items.empty()};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool isActive() override
|
||||
|
@ -38,7 +38,6 @@ TEST(SchedulerDynamicResourceManager, Smoke)
|
||||
{
|
||||
ResourceGuard gA(cA->get("res1"), ResourceGuard::PostponeLocking);
|
||||
gA.lock();
|
||||
gA.setFailure();
|
||||
gA.unlock();
|
||||
|
||||
ResourceGuard gB(cB->get("res1"));
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
#include <Common/Scheduler/Nodes/tests/ResourceTest.h>
|
||||
|
||||
#include <barrier>
|
||||
#include <future>
|
||||
|
||||
using namespace DB;
|
||||
@ -73,6 +74,22 @@ struct ResourceHolder
|
||||
}
|
||||
};
|
||||
|
||||
struct MyRequest : public ResourceRequest
|
||||
{
|
||||
std::function<void()> on_execute;
|
||||
|
||||
explicit MyRequest(ResourceCost cost_, std::function<void()> on_execute_)
|
||||
: ResourceRequest(cost_)
|
||||
, on_execute(on_execute_)
|
||||
{}
|
||||
|
||||
void execute() override
|
||||
{
|
||||
if (on_execute)
|
||||
on_execute();
|
||||
}
|
||||
};
|
||||
|
||||
TEST(SchedulerRoot, Smoke)
|
||||
{
|
||||
ResourceTest t;
|
||||
@ -111,3 +128,49 @@ TEST(SchedulerRoot, Smoke)
|
||||
EXPECT_TRUE(fc2->requests.contains(&rg.request));
|
||||
}
|
||||
}
|
||||
|
||||
TEST(SchedulerRoot, Cancel)
|
||||
{
|
||||
ResourceTest t;
|
||||
|
||||
ResourceHolder r1(t);
|
||||
auto * fc1 = r1.add<ConstraintTest>("/", "<max_requests>1</max_requests>");
|
||||
r1.add<PriorityPolicy>("/prio");
|
||||
auto a = r1.addQueue("/prio/A", "<priority>1</priority>");
|
||||
auto b = r1.addQueue("/prio/B", "<priority>2</priority>");
|
||||
r1.registerResource();
|
||||
|
||||
std::barrier destruct_sync(2);
|
||||
std::barrier sync(2);
|
||||
std::thread consumer1([&]
|
||||
{
|
||||
MyRequest request(1,[&]
|
||||
{
|
||||
sync.arrive_and_wait(); // (A)
|
||||
EXPECT_TRUE(fc1->requests.contains(&request));
|
||||
sync.arrive_and_wait(); // (B)
|
||||
request.finish();
|
||||
destruct_sync.arrive_and_wait(); // (C)
|
||||
});
|
||||
a.queue->enqueueRequest(&request);
|
||||
destruct_sync.arrive_and_wait(); // (C)
|
||||
});
|
||||
|
||||
std::thread consumer2([&]
|
||||
{
|
||||
MyRequest request(1,[&]
|
||||
{
|
||||
FAIL() << "This request must be canceled, but instead executes";
|
||||
});
|
||||
sync.arrive_and_wait(); // (A) wait for request of consumer1 to be inside execute, so that constraint is in violated state and our request will not be executed immediately
|
||||
b.queue->enqueueRequest(&request);
|
||||
bool canceled = b.queue->cancelRequest(&request);
|
||||
EXPECT_TRUE(canceled);
|
||||
sync.arrive_and_wait(); // (B) release request of consumer1 to be finished
|
||||
});
|
||||
|
||||
consumer1.join();
|
||||
consumer2.join();
|
||||
|
||||
EXPECT_TRUE(fc1->requests.empty());
|
||||
}
|
||||
|
@ -71,8 +71,7 @@ public:
|
||||
// lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread
|
||||
chassert(state == Dequeued);
|
||||
state = Finished;
|
||||
if (constraint)
|
||||
constraint->finishRequest(this);
|
||||
ResourceRequest::finish();
|
||||
}
|
||||
|
||||
static Request & local()
|
||||
@ -126,12 +125,6 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
/// Mark request as unsuccessful; by default request is considered to be successful
|
||||
void setFailure()
|
||||
{
|
||||
request.successful = false;
|
||||
}
|
||||
|
||||
ResourceLink link;
|
||||
Request & request;
|
||||
};
|
||||
|
13
src/Common/Scheduler/ResourceRequest.cpp
Normal file
13
src/Common/Scheduler/ResourceRequest.cpp
Normal file
@ -0,0 +1,13 @@
|
||||
#include <Common/Scheduler/ResourceRequest.h>
|
||||
#include <Common/Scheduler/ISchedulerConstraint.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void ResourceRequest::finish()
|
||||
{
|
||||
if (constraint)
|
||||
constraint->finishRequest(this);
|
||||
}
|
||||
|
||||
}
|
@ -14,9 +14,6 @@ class ISchedulerConstraint;
|
||||
using ResourceCost = Int64;
|
||||
constexpr ResourceCost ResourceCostMax = std::numeric_limits<int>::max();
|
||||
|
||||
/// Timestamps (nanoseconds since epoch)
|
||||
using ResourceNs = UInt64;
|
||||
|
||||
/*
|
||||
* Request for a resource consumption. The main moving part of the scheduling subsystem.
|
||||
* Resource requests processing workflow:
|
||||
@ -31,7 +28,7 @@ using ResourceNs = UInt64;
|
||||
* 3) Scheduler calls ISchedulerNode::dequeueRequest() that returns the request.
|
||||
* 4) Callback ResourceRequest::execute() is called to provide access to the resource.
|
||||
* 5) The resource consumption is happening outside of the scheduling subsystem.
|
||||
* 6) request->constraint->finishRequest() is called when consumption is finished.
|
||||
* 6) ResourceRequest::finish() is called when consumption is finished.
|
||||
*
|
||||
* Steps (5) and (6) can be omitted if constraint is not used by the resource.
|
||||
*
|
||||
@ -39,7 +36,10 @@ using ResourceNs = UInt64;
|
||||
* Request ownership is done outside of the scheduling subsystem.
|
||||
* After (6) request can be destructed safely.
|
||||
*
|
||||
* Request cancelling is not supported yet.
|
||||
* Request can also be canceled before (3) using ISchedulerQueue::cancelRequest().
|
||||
* Returning false means it is too late for request to be canceled. It should be processed in a regular way.
|
||||
* Returning true means successful cancel and therefore steps (4) and (5) are not going to happen
|
||||
* and step (6) MUST be omitted.
|
||||
*/
|
||||
class ResourceRequest
|
||||
{
|
||||
@ -48,32 +48,20 @@ public:
|
||||
/// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it)
|
||||
ResourceCost cost;
|
||||
|
||||
/// Request outcome
|
||||
/// Should be filled during resource consumption
|
||||
bool successful;
|
||||
|
||||
/// Scheduler node to be notified on consumption finish
|
||||
/// Auto-filled during request enqueue/dequeue
|
||||
ISchedulerConstraint * constraint;
|
||||
|
||||
/// Timestamps for introspection
|
||||
ResourceNs enqueue_ns;
|
||||
ResourceNs execute_ns;
|
||||
ResourceNs finish_ns;
|
||||
|
||||
explicit ResourceRequest(ResourceCost cost_ = 1)
|
||||
{
|
||||
reset(cost_);
|
||||
}
|
||||
|
||||
/// ResourceRequest object may be reused again after reset()
|
||||
void reset(ResourceCost cost_)
|
||||
{
|
||||
cost = cost_;
|
||||
successful = true;
|
||||
constraint = nullptr;
|
||||
enqueue_ns = 0;
|
||||
execute_ns = 0;
|
||||
finish_ns = 0;
|
||||
}
|
||||
|
||||
virtual ~ResourceRequest() = default;
|
||||
@ -83,6 +71,12 @@ public:
|
||||
/// just triggering start of a consumption, not doing the consumption itself
|
||||
/// (e.g. setting an std::promise or creating a job in a thread pool)
|
||||
virtual void execute() = 0;
|
||||
|
||||
/// Stop resource consumption and notify resource scheduler.
|
||||
/// Should be called when resource consumption is finished by consumer.
|
||||
/// ResourceRequest should not be destructed or reset before calling to `finish()`.
|
||||
/// WARNING: this function MUST not be called if request was canceled.
|
||||
void finish();
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -145,22 +145,27 @@ public:
|
||||
|
||||
std::pair<ResourceRequest *, bool> dequeueRequest() override
|
||||
{
|
||||
if (current == nullptr) // No active resources
|
||||
return {nullptr, false};
|
||||
while (true)
|
||||
{
|
||||
if (current == nullptr) // No active resources
|
||||
return {nullptr, false};
|
||||
|
||||
// Dequeue request from current resource
|
||||
auto [request, resource_active] = current->root->dequeueRequest();
|
||||
assert(request != nullptr);
|
||||
// Dequeue request from current resource
|
||||
auto [request, resource_active] = current->root->dequeueRequest();
|
||||
|
||||
// Deactivate resource if required
|
||||
if (!resource_active)
|
||||
deactivate(current);
|
||||
else
|
||||
current = current->next; // Just move round-robin pointer
|
||||
// Deactivate resource if required
|
||||
if (!resource_active)
|
||||
deactivate(current);
|
||||
else
|
||||
current = current->next; // Just move round-robin pointer
|
||||
|
||||
dequeued_requests++;
|
||||
dequeued_cost += request->cost;
|
||||
return {request, current != nullptr};
|
||||
if (request == nullptr) // Possible in case of request cancel, just retry
|
||||
continue;
|
||||
|
||||
dequeued_requests++;
|
||||
dequeued_cost += request->cost;
|
||||
return {request, current != nullptr};
|
||||
}
|
||||
}
|
||||
|
||||
bool isActive() override
|
||||
@ -245,7 +250,6 @@ private:
|
||||
|
||||
void execute(ResourceRequest * request)
|
||||
{
|
||||
request->execute_ns = clock_gettime_ns();
|
||||
request->execute();
|
||||
}
|
||||
|
||||
|
@ -10,9 +10,3 @@ ContextHolder & getMutableContext()
|
||||
static ContextHolder holder;
|
||||
return holder;
|
||||
}
|
||||
|
||||
void destroyContext()
|
||||
{
|
||||
auto & holder = getMutableContext();
|
||||
return holder.destroy();
|
||||
}
|
||||
|
@ -28,5 +28,3 @@ struct ContextHolder
|
||||
const ContextHolder & getContext();
|
||||
|
||||
ContextHolder & getMutableContext();
|
||||
|
||||
void destroyContext();
|
||||
|
@ -1,5 +1,2 @@
|
||||
clickhouse_add_executable (compressed_buffer compressed_buffer.cpp)
|
||||
target_link_libraries (compressed_buffer PRIVATE dbms)
|
||||
|
||||
clickhouse_add_executable (cached_compressed_read_buffer cached_compressed_read_buffer.cpp)
|
||||
target_link_libraries (cached_compressed_read_buffer PRIVATE dbms)
|
||||
target_link_libraries (compressed_buffer PRIVATE clickhouse_common_io clickhouse_compression)
|
||||
|
@ -1,79 +0,0 @@
|
||||
#include <iostream>
|
||||
#include <iomanip>
|
||||
#include <limits>
|
||||
|
||||
#include <Compression/CompressionFactory.h>
|
||||
#include <Compression/CachedCompressedReadBuffer.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <Disks/IO/createReadBufferFromFileBase.h>
|
||||
#include <IO/copyData.h>
|
||||
|
||||
#include <Common/Stopwatch.h>
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
if (argc < 2)
|
||||
{
|
||||
std::cerr << "Usage: program path\n";
|
||||
return 1;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
UncompressedCache cache("SLRU", 1024, 0.5);
|
||||
std::string path = argv[1];
|
||||
|
||||
std::cerr << std::fixed << std::setprecision(3);
|
||||
|
||||
size_t hits = 0;
|
||||
size_t misses = 0;
|
||||
|
||||
{
|
||||
Stopwatch watch;
|
||||
CachedCompressedReadBuffer in(
|
||||
path,
|
||||
[&]()
|
||||
{
|
||||
return createReadBufferFromFileBase(path, {});
|
||||
},
|
||||
&cache
|
||||
);
|
||||
WriteBufferFromFile out("/dev/null");
|
||||
copyData(in, out);
|
||||
|
||||
std::cerr << "Elapsed: " << watch.elapsedSeconds() << std::endl;
|
||||
}
|
||||
|
||||
cache.getStats(hits, misses);
|
||||
std::cerr << "Hits: " << hits << ", misses: " << misses << std::endl;
|
||||
|
||||
{
|
||||
Stopwatch watch;
|
||||
CachedCompressedReadBuffer in(
|
||||
path,
|
||||
[&]()
|
||||
{
|
||||
return createReadBufferFromFileBase(path, {});
|
||||
},
|
||||
&cache
|
||||
);
|
||||
WriteBufferFromFile out("/dev/null");
|
||||
copyData(in, out);
|
||||
|
||||
std::cerr << "Elapsed: " << watch.elapsedSeconds() << std::endl;
|
||||
}
|
||||
|
||||
cache.getStats(hits, misses);
|
||||
std::cerr << "Hits: " << hits << ", misses: " << misses << std::endl;
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
std::cerr << e.what() << ", " << e.displayText() << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
@ -1,7 +1,4 @@
|
||||
#include <string>
|
||||
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
#include <iomanip>
|
||||
|
||||
#include <Common/Stopwatch.h>
|
||||
|
@ -442,7 +442,7 @@ CompressionCodecPtr makeCodec(const std::string & codec_string, const DataTypePt
|
||||
{
|
||||
const std::string codec_statement = "(" + codec_string + ")";
|
||||
Tokens tokens(codec_statement.begin().base(), codec_statement.end().base());
|
||||
IParser::Pos token_iterator(tokens, 0);
|
||||
IParser::Pos token_iterator(tokens, 0, 0);
|
||||
|
||||
Expected expected;
|
||||
ASTPtr codec_ast;
|
||||
|
@ -121,7 +121,8 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
|
||||
auth_settings.use_insecure_imds_request.value_or(false),
|
||||
auth_settings.expiration_window_seconds.value_or(S3::DEFAULT_EXPIRATION_WINDOW_SECONDS),
|
||||
auth_settings.no_sign_request.value_or(false),
|
||||
});
|
||||
},
|
||||
credentials.GetSessionToken());
|
||||
|
||||
auto new_client = std::make_shared<KeeperSnapshotManagerS3::S3Configuration>(std::move(new_uri), std::move(auth_settings), std::move(client));
|
||||
|
||||
|
@ -41,13 +41,13 @@ BaseSettingsHelpers::Flags BaseSettingsHelpers::readFlags(ReadBuffer & in)
|
||||
|
||||
void BaseSettingsHelpers::throwSettingNotFound(std::string_view name)
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_SETTING, "Unknown setting {}", String{name});
|
||||
throw Exception(ErrorCodes::UNKNOWN_SETTING, "Unknown setting '{}'", String{name});
|
||||
}
|
||||
|
||||
|
||||
void BaseSettingsHelpers::warningSettingNotFound(std::string_view name)
|
||||
{
|
||||
LOG_WARNING(getLogger("Settings"), "Unknown setting {}, skipping", name);
|
||||
LOG_WARNING(getLogger("Settings"), "Unknown setting '{}', skipping", name);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -63,6 +63,8 @@ static constexpr auto DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC = 120;
|
||||
|
||||
/// Default limit on recursion depth of recursive descend parser.
|
||||
static constexpr auto DBMS_DEFAULT_MAX_PARSER_DEPTH = 1000;
|
||||
/// Default limit on the amount of backtracking of recursive descend parser.
|
||||
static constexpr auto DBMS_DEFAULT_MAX_PARSER_BACKTRACKS = 1000000;
|
||||
|
||||
/// Default limit on query size.
|
||||
static constexpr auto DBMS_DEFAULT_MAX_QUERY_SIZE = 262144;
|
||||
|
@ -175,6 +175,7 @@ class IColumn;
|
||||
M(Bool, enable_positional_arguments, true, "Enable positional arguments in ORDER BY, GROUP BY and LIMIT BY", 0) \
|
||||
M(Bool, enable_extended_results_for_datetime_functions, false, "Enable date functions like toLastDayOfMonth return Date32 results (instead of Date results) for Date32/DateTime64 arguments.", 0) \
|
||||
M(Bool, allow_nonconst_timezone_arguments, false, "Allow non-const timezone arguments in certain time-related functions like toTimeZone(), fromUnixTimestamp*(), snowflakeToDateTime*()", 0) \
|
||||
M(Bool, function_locate_has_mysql_compatible_argument_order, true, "Function locate() has arguments (needle, haystack[, start_pos]) like in MySQL instead of (haystack, needle[, start_pos]) like function position()", 0) \
|
||||
\
|
||||
M(Bool, group_by_use_nulls, false, "Treat columns mentioned in ROLLUP, CUBE or GROUPING SETS as Nullable", 0) \
|
||||
\
|
||||
@ -607,6 +608,7 @@ class IColumn;
|
||||
M(Bool, use_compact_format_in_distributed_parts_names, true, "Changes format of directories names for distributed table insert parts.", 0) \
|
||||
M(Bool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \
|
||||
M(UInt64, max_parser_depth, DBMS_DEFAULT_MAX_PARSER_DEPTH, "Maximum parser depth (recursion depth of recursive descend parser).", 0) \
|
||||
M(UInt64, max_parser_backtracks, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, "Maximum parser backtracking (how many times it tries different alternatives in the recursive descend parsing process).", 0) \
|
||||
M(Bool, allow_settings_after_format_in_insert, false, "Allow SETTINGS after FORMAT, but note, that this is not always safe (note: this is a compatibility setting).", 0) \
|
||||
M(Seconds, periodic_live_view_refresh, 60, "Interval after which periodically refreshed live view is forced to refresh.", 0) \
|
||||
M(Bool, transform_null_in, false, "If enabled, NULL values will be matched with 'IN' operator as if they are considered equal.", 0) \
|
||||
|
@ -94,7 +94,9 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
|
||||
{"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"},
|
||||
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."},
|
||||
{"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"},
|
||||
{"function_locate_has_mysql_compatible_argument_order", false, true, "Increase compatibility with MySQL's locate function."},
|
||||
{"filesystem_cache_reserve_space_wait_lock_timeout_milliseconds", 1000, 1000, "Wait time to lock cache for sapce reservation in filesystem cache"},
|
||||
{"max_parser_backtracks", 0, 1000000, "Limiting the complexity of parsing"},
|
||||
}},
|
||||
{"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"},
|
||||
{"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"},
|
||||
|
@ -6,6 +6,3 @@ target_link_libraries (field PRIVATE dbms)
|
||||
|
||||
clickhouse_add_executable (string_ref_hash string_ref_hash.cpp)
|
||||
target_link_libraries (string_ref_hash PRIVATE clickhouse_common_io)
|
||||
|
||||
clickhouse_add_executable (mysql_protocol mysql_protocol.cpp)
|
||||
target_link_libraries (mysql_protocol PRIVATE dbms)
|
||||
|
@ -1,390 +0,0 @@
|
||||
#include <string>
|
||||
|
||||
#include <Core/MySQL/Authentication.h>
|
||||
#include <Core/MySQL/MySQLClient.h>
|
||||
#include <Core/MySQL/PacketsConnection.h>
|
||||
#include <Core/MySQL/PacketsGeneric.h>
|
||||
#include <Core/MySQL/PacketsProtocolText.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/WriteBufferFromOStream.h>
|
||||
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
using namespace DB;
|
||||
using namespace MySQLProtocol;
|
||||
using namespace MySQLProtocol::Generic;
|
||||
using namespace MySQLProtocol::Authentication;
|
||||
using namespace MySQLProtocol::ConnectionPhase;
|
||||
using namespace MySQLProtocol::ProtocolText;
|
||||
|
||||
|
||||
uint8_t server_sequence_id = 1;
|
||||
uint8_t client_sequence_id = 1;
|
||||
String user = "default";
|
||||
String password = "123";
|
||||
String database;
|
||||
|
||||
UInt8 charset_utf8 = 33;
|
||||
UInt32 max_packet_size = MAX_PACKET_LENGTH;
|
||||
String mysql_native_password = "mysql_native_password";
|
||||
|
||||
UInt32 server_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH
|
||||
| CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF;
|
||||
|
||||
UInt32 client_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_PLUGIN_AUTH | CLIENT_SECURE_CONNECTION;
|
||||
|
||||
/// Handshake packet
|
||||
{
|
||||
/// 1. Greeting:
|
||||
/// 1.1 Server writes greeting to client
|
||||
std::string s0;
|
||||
WriteBufferFromString out0(s0);
|
||||
|
||||
Handshake server_handshake(
|
||||
server_capability_flags, -1, "ClickHouse", "mysql_native_password", "aaaaaaaaaaaaaaaaaaaaa", CharacterSet::utf8_general_ci);
|
||||
server_handshake.writePayload(out0, server_sequence_id);
|
||||
|
||||
/// 1.2 Client reads the greeting
|
||||
ReadBufferFromString in0(s0);
|
||||
Handshake client_handshake;
|
||||
client_handshake.readPayload(in0, client_sequence_id);
|
||||
|
||||
/// Check packet
|
||||
ASSERT(server_handshake.capability_flags == client_handshake.capability_flags)
|
||||
ASSERT(server_handshake.status_flags == client_handshake.status_flags)
|
||||
ASSERT(server_handshake.server_version == client_handshake.server_version)
|
||||
ASSERT(server_handshake.protocol_version == client_handshake.protocol_version)
|
||||
ASSERT(server_handshake.auth_plugin_data.substr(0, 20) == client_handshake.auth_plugin_data)
|
||||
ASSERT(server_handshake.auth_plugin_name == client_handshake.auth_plugin_name)
|
||||
|
||||
/// 2. Greeting Response:
|
||||
std::string s1;
|
||||
WriteBufferFromString out1(s1);
|
||||
|
||||
/// 2.1 Client writes to server
|
||||
Native41 native41(password, client_handshake.auth_plugin_data);
|
||||
String auth_plugin_data = native41.getAuthPluginData();
|
||||
HandshakeResponse client_handshake_response(
|
||||
client_capability_flags, max_packet_size, charset_utf8, user, database, auth_plugin_data, mysql_native_password);
|
||||
client_handshake_response.writePayload(out1, client_sequence_id);
|
||||
|
||||
/// 2.2 Server reads the response
|
||||
ReadBufferFromString in1(s1);
|
||||
HandshakeResponse server_handshake_response;
|
||||
server_handshake_response.readPayload(in1, server_sequence_id);
|
||||
|
||||
/// Check
|
||||
ASSERT(server_handshake_response.capability_flags == client_handshake_response.capability_flags)
|
||||
ASSERT(server_handshake_response.character_set == client_handshake_response.character_set)
|
||||
ASSERT(server_handshake_response.username == client_handshake_response.username)
|
||||
ASSERT(server_handshake_response.database == client_handshake_response.database)
|
||||
ASSERT(server_handshake_response.auth_response == client_handshake_response.auth_response)
|
||||
ASSERT(server_handshake_response.auth_plugin_name == client_handshake_response.auth_plugin_name)
|
||||
}
|
||||
|
||||
/// OK Packet
|
||||
{
|
||||
// 1. Server writes packet
|
||||
std::string s0;
|
||||
WriteBufferFromString out0(s0);
|
||||
OKPacket server(0x00, server_capability_flags, 0, 0, 0, "", "");
|
||||
server.writePayload(out0, server_sequence_id);
|
||||
|
||||
// 2. Client reads packet
|
||||
ReadBufferFromString in0(s0);
|
||||
ResponsePacket client(server_capability_flags);
|
||||
client.readPayload(in0, client_sequence_id);
|
||||
|
||||
// Check
|
||||
ASSERT(client.getType() == PACKET_OK)
|
||||
ASSERT(client.ok.header == server.header)
|
||||
ASSERT(client.ok.status_flags == server.status_flags)
|
||||
ASSERT(client.ok.capabilities == server.capabilities)
|
||||
}
|
||||
|
||||
/// ERR Packet
|
||||
{
|
||||
// 1. Server writes packet
|
||||
std::string s0;
|
||||
WriteBufferFromString out0(s0);
|
||||
ERRPacket server(123, "12345", "This is the error message");
|
||||
server.writePayload(out0, server_sequence_id);
|
||||
|
||||
// 2. Client reads packet
|
||||
ReadBufferFromString in0(s0);
|
||||
ResponsePacket client(server_capability_flags);
|
||||
client.readPayload(in0, client_sequence_id);
|
||||
|
||||
// Check
|
||||
ASSERT(client.getType() == PACKET_ERR)
|
||||
ASSERT(client.err.header == server.header)
|
||||
ASSERT(client.err.error_code == server.error_code)
|
||||
ASSERT(client.err.sql_state == server.sql_state)
|
||||
ASSERT(client.err.error_message == server.error_message)
|
||||
}
|
||||
|
||||
/// EOF Packet
|
||||
{
|
||||
// 1. Server writes packet
|
||||
std::string s0;
|
||||
WriteBufferFromString out0(s0);
|
||||
EOFPacket server(1, 1);
|
||||
server.writePayload(out0, server_sequence_id);
|
||||
|
||||
// 2. Client reads packet
|
||||
ReadBufferFromString in0(s0);
|
||||
ResponsePacket client(server_capability_flags);
|
||||
client.readPayload(in0, client_sequence_id);
|
||||
|
||||
// Check
|
||||
ASSERT(client.getType() == PACKET_EOF)
|
||||
ASSERT(client.eof.header == server.header)
|
||||
ASSERT(client.eof.warnings == server.warnings)
|
||||
ASSERT(client.eof.status_flags == server.status_flags)
|
||||
}
|
||||
|
||||
/// ColumnDefinition Packet
|
||||
{
|
||||
// 1. Server writes packet
|
||||
std::string s0;
|
||||
WriteBufferFromString out0(s0);
|
||||
ColumnDefinition server("schema", "tbl", "org_tbl", "name", "org_name", 33, 0x00, MYSQL_TYPE_STRING, 0x00, 0x00);
|
||||
server.writePayload(out0, server_sequence_id);
|
||||
|
||||
// 2. Client reads packet
|
||||
ReadBufferFromString in0(s0);
|
||||
ColumnDefinition client;
|
||||
client.readPayload(in0, client_sequence_id);
|
||||
|
||||
// Check
|
||||
ASSERT(client.column_type == server.column_type)
|
||||
ASSERT(client.column_length == server.column_length)
|
||||
ASSERT(client.next_length == server.next_length)
|
||||
ASSERT(client.character_set == server.character_set)
|
||||
ASSERT(client.decimals == server.decimals)
|
||||
ASSERT(client.name == server.name)
|
||||
ASSERT(client.org_name == server.org_name)
|
||||
ASSERT(client.table == server.table)
|
||||
ASSERT(client.org_table == server.org_table)
|
||||
ASSERT(client.schema == server.schema)
|
||||
}
|
||||
|
||||
/// GTID sets tests.
|
||||
{
|
||||
struct Testcase
|
||||
{
|
||||
String name;
|
||||
String sets;
|
||||
String want;
|
||||
};
|
||||
|
||||
Testcase cases[] = {
|
||||
{"gtid-sets-without-whitespace",
|
||||
"2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812,9f58c169-d121-11e7-835b-ac162db9c048:1-56060985:56060987-56061175:56061177-"
|
||||
"56061224:56061226-75201528:75201530-75201755:75201757-75201983:75201985-75407550:75407552-75407604:75407606-75407661:"
|
||||
"75407663-87889848:87889850-87889935:87889937-87890042:87890044-88391955:88391957-88392125:88392127-88392245:88392247-"
|
||||
"88755771:88755773-88755826:88755828-88755921:88755923-100279047:100279049-100279126:100279128-100279247:100279249-121672430:"
|
||||
"121672432-121672503:121672505-121672524:121672526-122946019:122946021-122946291:122946293-122946469:122946471-134313284:"
|
||||
"134313286-134313415:134313417-134313648:134313650-136492728:136492730-136492784:136492786-136492904:136492906-145582402:"
|
||||
"145582404-145582439:145582441-145582463:145582465-147455222:147455224-147455262:147455264-147455277:147455279-149319049:"
|
||||
"149319051-149319261:149319263-150635915,a6d83ff6-bfcf-11e7-8c93-246e96158550:1-126618302",
|
||||
"2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812,9f58c169-d121-11e7-835b-ac162db9c048:1-56060985:56060987-56061175:56061177-"
|
||||
"56061224:56061226-75201528:75201530-75201755:75201757-75201983:75201985-75407550:75407552-75407604:75407606-75407661:"
|
||||
"75407663-87889848:87889850-87889935:87889937-87890042:87890044-88391955:88391957-88392125:88392127-88392245:88392247-"
|
||||
"88755771:88755773-88755826:88755828-88755921:88755923-100279047:100279049-100279126:100279128-100279247:100279249-121672430:"
|
||||
"121672432-121672503:121672505-121672524:121672526-122946019:122946021-122946291:122946293-122946469:122946471-134313284:"
|
||||
"134313286-134313415:134313417-134313648:134313650-136492728:136492730-136492784:136492786-136492904:136492906-145582402:"
|
||||
"145582404-145582439:145582441-145582463:145582465-147455222:147455224-147455262:147455264-147455277:147455279-149319049:"
|
||||
"149319051-149319261:149319263-150635915,a6d83ff6-bfcf-11e7-8c93-246e96158550:1-126618302"},
|
||||
|
||||
{"gtid-sets-with-whitespace",
|
||||
"2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812, 9f58c169-d121-11e7-835b-ac162db9c048:1-56060985:56060987-56061175:56061177",
|
||||
"2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812,9f58c169-d121-11e7-835b-ac162db9c048:1-56060985:56060987-56061175:56061177"},
|
||||
|
||||
{"gtid-sets-single", "2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812", "2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812"}};
|
||||
|
||||
for (auto & tc : cases)
|
||||
{
|
||||
GTIDSets gtid_sets;
|
||||
gtid_sets.parse(tc.sets);
|
||||
|
||||
String want = tc.want;
|
||||
String got = gtid_sets.toString();
|
||||
ASSERT(want == got)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
struct Testcase
|
||||
{
|
||||
String name;
|
||||
String gtid_sets;
|
||||
String gtid_str;
|
||||
String want;
|
||||
};
|
||||
|
||||
Testcase cases[] = {
|
||||
{"merge",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:4-7",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:3",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-7"},
|
||||
|
||||
{"merge-front",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:5-7",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:3",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-3:5-7"},
|
||||
|
||||
{"extend-interval",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:6-7",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:4",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:4:6-7"},
|
||||
|
||||
{"extend-interval",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:4:7-9",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:5",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:4-5:7-9"},
|
||||
|
||||
{"extend-interval",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:4",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:4:6-7"},
|
||||
|
||||
{"extend-interval",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:9",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7:9"},
|
||||
|
||||
{"extend-interval",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7",
|
||||
"20662d71-9d91-11ea-bbc2-0242ac110003:9",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7,20662d71-9d91-11ea-bbc2-0242ac110003:9"},
|
||||
|
||||
{"shrink-sequence",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-3:4-5:7",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:6",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-7"},
|
||||
|
||||
{"shrink-sequence",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-3:4-5:10",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:8",
|
||||
"10662d71-9d91-11ea-bbc2-0242ac110003:1-5:8:10"
|
||||
}
|
||||
};
|
||||
|
||||
for (auto & tc : cases)
|
||||
{
|
||||
GTIDSets gtid_sets;
|
||||
gtid_sets.parse(tc.gtid_sets);
|
||||
ASSERT(tc.gtid_sets == gtid_sets.toString())
|
||||
|
||||
GTIDSets gtid_sets1;
|
||||
gtid_sets1.parse(tc.gtid_str);
|
||||
|
||||
GTID gtid;
|
||||
gtid.uuid = gtid_sets1.sets[0].uuid;
|
||||
gtid.seq_no = gtid_sets1.sets[0].intervals[0].start;
|
||||
gtid_sets.update(gtid);
|
||||
|
||||
String want = tc.want;
|
||||
String got = gtid_sets.toString();
|
||||
ASSERT(want == got)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
/// mysql_protocol --host=172.17.0.3 --user=root --password=123 --db=sbtest
|
||||
try
|
||||
{
|
||||
boost::program_options::options_description desc("Allowed options");
|
||||
desc.add_options()("host", boost::program_options::value<std::string>()->required(), "master host")(
|
||||
"port", boost::program_options::value<std::int32_t>()->default_value(3306), "master port")(
|
||||
"user", boost::program_options::value<std::string>()->default_value("root"), "master user")(
|
||||
"password", boost::program_options::value<std::string>()->required(), "master password")(
|
||||
"gtid", boost::program_options::value<std::string>()->default_value(""), "executed GTID sets")(
|
||||
"db", boost::program_options::value<std::string>()->required(), "replicate do db")(
|
||||
"binlog_checksum", boost::program_options::value<std::string>()->default_value("CRC32"), "master binlog_checksum");
|
||||
|
||||
boost::program_options::variables_map options;
|
||||
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
|
||||
if (argc == 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
auto host = options.at("host").as<String>();
|
||||
auto port = options.at("port").as<Int32>();
|
||||
auto master_user = options.at("user").as<String>();
|
||||
auto master_password = options.at("password").as<String>();
|
||||
auto gtid_sets = options.at("gtid").as<String>();
|
||||
auto replicate_db = options.at("db").as<String>();
|
||||
auto binlog_checksum = options.at("binlog_checksum").as<String>();
|
||||
|
||||
std::cerr << "Master Host: " << host << ", Port: " << port << ", User: " << master_user << ", Password: " << master_password
|
||||
<< ", Replicate DB: " << replicate_db << ", GTID: " << gtid_sets << std::endl;
|
||||
|
||||
UInt32 slave_id = 9004;
|
||||
MySQLClient slave(host, port, master_user, master_password);
|
||||
|
||||
/// Connect to the master.
|
||||
slave.connect();
|
||||
slave.startBinlogDumpGTID(slave_id, replicate_db, {}, gtid_sets, binlog_checksum);
|
||||
|
||||
WriteBufferFromOStream cerr(std::cerr);
|
||||
|
||||
/// Read one binlog event on by one.
|
||||
while (true)
|
||||
{
|
||||
auto event = slave.readOneBinlogEvent();
|
||||
switch (event->type())
|
||||
{
|
||||
case MYSQL_QUERY_EVENT: {
|
||||
auto binlog_event = std::static_pointer_cast<QueryEvent>(event);
|
||||
binlog_event->dump(cerr);
|
||||
|
||||
Position pos = slave.getPosition();
|
||||
pos.dump(cerr);
|
||||
break;
|
||||
}
|
||||
case MYSQL_WRITE_ROWS_EVENT: {
|
||||
auto binlog_event = std::static_pointer_cast<WriteRowsEvent>(event);
|
||||
binlog_event->dump(cerr);
|
||||
|
||||
Position pos = slave.getPosition();
|
||||
pos.dump(cerr);
|
||||
break;
|
||||
}
|
||||
case MYSQL_UPDATE_ROWS_EVENT: {
|
||||
auto binlog_event = std::static_pointer_cast<UpdateRowsEvent>(event);
|
||||
binlog_event->dump(cerr);
|
||||
|
||||
Position pos = slave.getPosition();
|
||||
pos.dump(cerr);
|
||||
break;
|
||||
}
|
||||
case MYSQL_DELETE_ROWS_EVENT: {
|
||||
auto binlog_event = std::static_pointer_cast<DeleteRowsEvent>(event);
|
||||
binlog_event->dump(cerr);
|
||||
|
||||
Position pos = slave.getPosition();
|
||||
pos.dump(cerr);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
if (event->header.type != MySQLReplication::EventType::HEARTBEAT_EVENT)
|
||||
{
|
||||
event->dump(cerr);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (const Exception & ex)
|
||||
{
|
||||
std::cerr << "Error: " << ex.message() << std::endl;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
}
|
@ -56,13 +56,14 @@ DataTypePtr DataTypeFactory::getImpl(const String & full_name) const
|
||||
{
|
||||
String out_err;
|
||||
const char * start = full_name.data();
|
||||
ast = tryParseQuery(parser, start, start + full_name.size(), out_err, false, "data type", false, DBMS_DEFAULT_MAX_QUERY_SIZE, data_type_max_parse_depth);
|
||||
ast = tryParseQuery(parser, start, start + full_name.size(), out_err, false, "data type", false,
|
||||
DBMS_DEFAULT_MAX_QUERY_SIZE, data_type_max_parse_depth, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, true);
|
||||
if (!ast)
|
||||
return nullptr;
|
||||
}
|
||||
else
|
||||
{
|
||||
ast = parseQuery(parser, full_name.data(), full_name.data() + full_name.size(), "data type", false, data_type_max_parse_depth);
|
||||
ast = parseQuery(parser, full_name.data(), full_name.data() + full_name.size(), "data type", false, data_type_max_parse_depth, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
}
|
||||
|
||||
return getImpl<nullptr_on_error>(ast);
|
||||
|
@ -533,6 +533,8 @@ class DataTypeDateTime;
|
||||
class DataTypeDateTime64;
|
||||
|
||||
template <is_decimal T> constexpr bool IsDataTypeDecimal<DataTypeDecimal<T>> = true;
|
||||
|
||||
/// TODO: this is garbage, remove it.
|
||||
template <> inline constexpr bool IsDataTypeDecimal<DataTypeDateTime64> = true;
|
||||
|
||||
template <typename T> constexpr bool IsDataTypeNumber<DataTypeNumber<T>> = true;
|
||||
|
@ -5,13 +5,11 @@
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/ObjectUtils.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <Common/JSONParsers/SimdJSONParser.h>
|
||||
#include <Common/JSONParsers/RapidJSONParser.h>
|
||||
#include <Common/HashTable/HashSet.h>
|
||||
#include <Columns/ColumnObject.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Functions/FunctionsConversion.h>
|
||||
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
@ -29,6 +27,7 @@ namespace ErrorCodes
|
||||
extern const int INCORRECT_DATA;
|
||||
extern const int CANNOT_READ_ALL_DATA;
|
||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||
extern const int CANNOT_PARSE_TEXT;
|
||||
extern const int EXPERIMENTAL_FEATURE_ERROR;
|
||||
}
|
||||
|
||||
@ -344,7 +343,20 @@ void SerializationObject<Parser>::deserializeBinaryBulkFromString(
|
||||
state.nested_serialization->deserializeBinaryBulkWithMultipleStreams(
|
||||
column_string, limit, settings, state.nested_state, cache);
|
||||
|
||||
ConvertImplGenericFromString<ColumnString>::executeImpl(*column_string, column_object, *this, column_string->size());
|
||||
size_t input_rows_count = column_string->size();
|
||||
column_object.reserve(input_rows_count);
|
||||
|
||||
FormatSettings format_settings;
|
||||
for (size_t i = 0; i < input_rows_count; ++i)
|
||||
{
|
||||
const auto & val = column_string->getDataAt(i);
|
||||
ReadBufferFromMemory read_buffer(val.data, val.size);
|
||||
deserializeWholeText(column_object, read_buffer, format_settings);
|
||||
|
||||
if (!read_buffer.eof())
|
||||
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT,
|
||||
"Cannot parse string to column Object. Expected eof");
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Parser>
|
||||
|
@ -24,6 +24,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
|
||||
|
||||
auto initialize = [&]() mutable
|
||||
{
|
||||
if (context)
|
||||
return true;
|
||||
|
||||
shared_context = Context::createShared();
|
||||
context = Context::createGlobal(shared_context.get());
|
||||
context->makeGlobalContext();
|
||||
|
@ -444,8 +444,9 @@ namespace
|
||||
ParserSelectWithUnionQuery parser;
|
||||
String description = fmt::format("Query for ClickHouse dictionary {}", data.table_name);
|
||||
String fixed_query = removeWhereConditionPlaceholder(query);
|
||||
const Settings & settings = data.context->getSettingsRef();
|
||||
ASTPtr select = parseQuery(parser, fixed_query, description,
|
||||
data.context->getSettingsRef().max_query_size, data.context->getSettingsRef().max_parser_depth);
|
||||
settings.max_query_size, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
|
||||
DDLDependencyVisitor::Visitor visitor{data};
|
||||
visitor.visit(select);
|
||||
|
@ -115,7 +115,7 @@ ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const String & table_name, Co
|
||||
const char * pos = query.data();
|
||||
std::string error_message;
|
||||
auto ast = tryParseQuery(parser, pos, pos + query.size(), error_message,
|
||||
/* hilite = */ false, "", /* allow_multi_statements = */ false, 0, settings.max_parser_depth);
|
||||
/* hilite = */ false, "", /* allow_multi_statements = */ false, 0, settings.max_parser_depth, settings.max_parser_backtracks, true);
|
||||
|
||||
if (!ast && throw_on_error)
|
||||
throw Exception::createDeprecated(error_message, ErrorCodes::SYNTAX_ERROR);
|
||||
@ -134,7 +134,7 @@ ASTPtr DatabaseDictionary::getCreateDatabaseQuery() const
|
||||
}
|
||||
auto settings = getContext()->getSettingsRef();
|
||||
ParserCreateQuery parser;
|
||||
return parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
|
||||
return parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
}
|
||||
|
||||
void DatabaseDictionary::shutdown()
|
||||
|
@ -187,7 +187,7 @@ ASTPtr DatabaseFilesystem::getCreateDatabaseQuery() const
|
||||
const String query = fmt::format("CREATE DATABASE {} ENGINE = Filesystem('{}')", backQuoteIfNeed(getDatabaseName()), path);
|
||||
|
||||
ParserCreateQuery parser;
|
||||
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
|
||||
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
|
||||
if (const auto database_comment = getDatabaseComment(); !database_comment.empty())
|
||||
{
|
||||
|
@ -183,7 +183,7 @@ ASTPtr DatabaseHDFS::getCreateDatabaseQuery() const
|
||||
ParserCreateQuery parser;
|
||||
|
||||
const String query = fmt::format("CREATE DATABASE {} ENGINE = HDFS('{}')", backQuoteIfNeed(getDatabaseName()), source);
|
||||
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
|
||||
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
|
||||
if (const auto database_comment = getDatabaseComment(); !database_comment.empty())
|
||||
{
|
||||
|
@ -526,7 +526,7 @@ ASTPtr DatabaseOnDisk::getCreateDatabaseQuery() const
|
||||
/// If database.sql doesn't exist, then engine is Ordinary
|
||||
String query = "CREATE DATABASE " + backQuoteIfNeed(getDatabaseName()) + " ENGINE = Ordinary";
|
||||
ParserCreateQuery parser;
|
||||
ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
|
||||
ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
}
|
||||
|
||||
if (const auto database_comment = getDatabaseComment(); !database_comment.empty())
|
||||
@ -707,7 +707,7 @@ ASTPtr DatabaseOnDisk::parseQueryFromMetadata(
|
||||
const char * pos = query.data();
|
||||
std::string error_message;
|
||||
auto ast = tryParseQuery(parser, pos, pos + query.size(), error_message, /* hilite = */ false,
|
||||
"in file " + metadata_file_path, /* allow_multi_statements = */ false, 0, settings.max_parser_depth);
|
||||
"in file " + metadata_file_path, /* allow_multi_statements = */ false, 0, settings.max_parser_depth, settings.max_parser_backtracks, true);
|
||||
|
||||
if (!ast && throw_on_error)
|
||||
throw Exception::createDeprecated(error_message, ErrorCodes::SYNTAX_ERROR);
|
||||
@ -765,12 +765,14 @@ ASTPtr DatabaseOnDisk::getCreateQueryFromStorage(const String & table_name, cons
|
||||
auto ast_storage = std::make_shared<ASTStorage>();
|
||||
ast_storage->set(ast_storage->engine, ast_engine);
|
||||
|
||||
unsigned max_parser_depth = static_cast<unsigned>(getContext()->getSettingsRef().max_parser_depth);
|
||||
auto create_table_query = DB::getCreateQueryFromStorage(storage,
|
||||
ast_storage,
|
||||
false,
|
||||
max_parser_depth,
|
||||
throw_on_error);
|
||||
const Settings & settings = getContext()->getSettingsRef();
|
||||
auto create_table_query = DB::getCreateQueryFromStorage(
|
||||
storage,
|
||||
ast_storage,
|
||||
false,
|
||||
static_cast<unsigned>(settings.max_parser_depth),
|
||||
static_cast<unsigned>(settings.max_parser_backtracks),
|
||||
throw_on_error);
|
||||
|
||||
create_table_query->set(create_table_query->as<ASTCreateQuery>()->comment,
|
||||
std::make_shared<ASTLiteral>("SYSTEM TABLE is built on the fly."));
|
||||
|
@ -440,10 +440,22 @@ void DatabaseOrdinary::stopLoading()
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
|
||||
{
|
||||
auto result = DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
|
||||
std::scoped_lock lock(mutex);
|
||||
typeid_cast<DatabaseTablesSnapshotIterator &>(*result).setLoadTasks(startup_table);
|
||||
return result;
|
||||
// Wait for every table (matching the filter) to be loaded and started up before we make the snapshot.
|
||||
// It is important, because otherwise table might be:
|
||||
// - not attached and thus will be missed in the snapshot;
|
||||
// - not started, which is not good for DDL operations.
|
||||
LoadTaskPtrs tasks_to_wait;
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!filter_by_table_name)
|
||||
tasks_to_wait.reserve(startup_table.size());
|
||||
for (const auto & [table_name, task] : startup_table)
|
||||
if (!filter_by_table_name || filter_by_table_name(table_name))
|
||||
tasks_to_wait.emplace_back(task);
|
||||
}
|
||||
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), tasks_to_wait);
|
||||
|
||||
return DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
|
||||
}
|
||||
|
||||
void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
|
||||
@ -469,7 +481,7 @@ void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & ta
|
||||
statement.data() + statement.size(),
|
||||
"in file " + table_metadata_path,
|
||||
0,
|
||||
local_context->getSettingsRef().max_parser_depth);
|
||||
local_context->getSettingsRef().max_parser_depth, local_context->getSettingsRef().max_parser_backtracks);
|
||||
|
||||
applyMetadataChangesToCreateQuery(ast, metadata);
|
||||
|
||||
|
@ -812,7 +812,8 @@ static UUID getTableUUIDIfReplicated(const String & metadata, ContextPtr context
|
||||
ParserCreateQuery parser;
|
||||
auto size = context->getSettingsRef().max_query_size;
|
||||
auto depth = context->getSettingsRef().max_parser_depth;
|
||||
ASTPtr query = parseQuery(parser, metadata, size, depth);
|
||||
auto backtracks = context->getSettingsRef().max_parser_backtracks;
|
||||
ASTPtr query = parseQuery(parser, metadata, size, depth, backtracks);
|
||||
const ASTCreateQuery & create = query->as<const ASTCreateQuery &>();
|
||||
if (!create.storage || !create.storage->engine)
|
||||
return UUIDHelpers::Nil;
|
||||
@ -1234,7 +1235,7 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
|
||||
{
|
||||
ParserCreateQuery parser;
|
||||
String description = "in ZooKeeper " + zookeeper_path + "/metadata/" + node_name;
|
||||
auto ast = parseQuery(parser, query, description, 0, getContext()->getSettingsRef().max_parser_depth);
|
||||
auto ast = parseQuery(parser, query, description, 0, getContext()->getSettingsRef().max_parser_depth, getContext()->getSettingsRef().max_parser_backtracks);
|
||||
|
||||
auto & create = ast->as<ASTCreateQuery &>();
|
||||
if (create.uuid == UUIDHelpers::Nil || create.getTable() != TABLE_WITH_UUID_NAME_PLACEHOLDER || create.database)
|
||||
@ -1559,7 +1560,7 @@ DatabaseReplicated::getTablesForBackup(const FilterByNameFunction & filter, cons
|
||||
for (const auto & [table_name, metadata] : snapshot)
|
||||
{
|
||||
ParserCreateQuery parser;
|
||||
auto create_table_query = parseQuery(parser, metadata, 0, getContext()->getSettingsRef().max_parser_depth);
|
||||
auto create_table_query = parseQuery(parser, metadata, 0, getContext()->getSettingsRef().max_parser_depth, getContext()->getSettingsRef().max_parser_backtracks);
|
||||
|
||||
auto & create = create_table_query->as<ASTCreateQuery &>();
|
||||
create.attach = false;
|
||||
|
@ -191,7 +191,7 @@ ASTPtr DatabaseS3::getCreateDatabaseQuery() const
|
||||
creation_args += fmt::format(", '{}', '{}'", config.access_key_id.value(), config.secret_access_key.value());
|
||||
|
||||
const String query = fmt::format("CREATE DATABASE {} ENGINE = S3({})", backQuoteIfNeed(getDatabaseName()), creation_args);
|
||||
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
|
||||
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
|
||||
if (const auto database_comment = getDatabaseComment(); !database_comment.empty())
|
||||
{
|
||||
|
@ -108,7 +108,8 @@ void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemo
|
||||
}
|
||||
|
||||
|
||||
ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & ast_storage, bool only_ordinary, uint32_t max_parser_depth, bool throw_on_error)
|
||||
ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & ast_storage, bool only_ordinary,
|
||||
uint32_t max_parser_depth, uint32_t max_parser_backtracks, bool throw_on_error)
|
||||
{
|
||||
auto table_id = storage->getStorageID();
|
||||
auto metadata_ptr = storage->getInMemoryMetadataPtr();
|
||||
@ -148,7 +149,7 @@ ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & ast_
|
||||
Expected expected;
|
||||
expected.max_parsed_pos = string_end;
|
||||
Tokens tokens(type_name.c_str(), string_end);
|
||||
IParser::Pos pos(tokens, max_parser_depth);
|
||||
IParser::Pos pos(tokens, max_parser_depth, max_parser_backtracks);
|
||||
ParserDataType parser;
|
||||
if (!parser.parse(pos, ast_type, expected))
|
||||
{
|
||||
|
@ -13,7 +13,8 @@ namespace DB
|
||||
{
|
||||
|
||||
void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemoryMetadata & metadata);
|
||||
ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & ast_storage, bool only_ordinary, uint32_t max_parser_depth, bool throw_on_error);
|
||||
ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & ast_storage, bool only_ordinary,
|
||||
uint32_t max_parser_depth, uint32_t max_parser_backtracks, bool throw_on_error);
|
||||
|
||||
/// Cleans a CREATE QUERY from temporary flags like "IF NOT EXISTS", "OR REPLACE", "AS SELECT" (for non-views), etc.
|
||||
void cleanupObjectDefinitionFromTemporaryFlags(ASTCreateQuery & query);
|
||||
|
@ -77,17 +77,12 @@ private:
|
||||
Tables tables;
|
||||
Tables::iterator it;
|
||||
|
||||
// Tasks to wait before returning a table
|
||||
using Tasks = std::unordered_map<String, LoadTaskPtr>;
|
||||
Tasks tasks;
|
||||
|
||||
protected:
|
||||
DatabaseTablesSnapshotIterator(DatabaseTablesSnapshotIterator && other) noexcept
|
||||
: IDatabaseTablesIterator(std::move(other.database_name))
|
||||
{
|
||||
size_t idx = std::distance(other.tables.begin(), other.it);
|
||||
std::swap(tables, other.tables);
|
||||
std::swap(tasks, other.tasks);
|
||||
other.it = other.tables.end();
|
||||
it = tables.begin();
|
||||
std::advance(it, idx);
|
||||
@ -110,17 +105,7 @@ public:
|
||||
|
||||
const String & name() const override { return it->first; }
|
||||
|
||||
const StoragePtr & table() const override
|
||||
{
|
||||
if (auto task = tasks.find(it->first); task != tasks.end())
|
||||
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), task->second);
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void setLoadTasks(const Tasks & tasks_)
|
||||
{
|
||||
tasks = tasks_;
|
||||
}
|
||||
const StoragePtr & table() const override { return it->second; }
|
||||
};
|
||||
|
||||
using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
|
||||
|
@ -174,12 +174,14 @@ ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const String & table_name, Context
|
||||
ast_storage->settings = nullptr;
|
||||
}
|
||||
|
||||
unsigned max_parser_depth = static_cast<unsigned>(getContext()->getSettingsRef().max_parser_depth);
|
||||
auto create_table_query = DB::getCreateQueryFromStorage(storage,
|
||||
table_storage_define,
|
||||
true,
|
||||
max_parser_depth,
|
||||
throw_on_error);
|
||||
const Settings & settings = getContext()->getSettingsRef();
|
||||
auto create_table_query = DB::getCreateQueryFromStorage(
|
||||
storage,
|
||||
table_storage_define,
|
||||
true,
|
||||
static_cast<unsigned>(settings.max_parser_depth),
|
||||
static_cast<unsigned>(settings.max_parser_backtracks),
|
||||
throw_on_error);
|
||||
return create_table_query;
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,7 @@ static bool tryReadCharset(
|
||||
bool tryConvertStringLiterals(String & query)
|
||||
{
|
||||
Tokens tokens(query.data(), query.data() + query.size());
|
||||
IParser::Pos pos(tokens, 0);
|
||||
IParser::Pos pos(tokens, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
Expected expected;
|
||||
String rewritten_query;
|
||||
rewritten_query.reserve(query.size());
|
||||
|
@ -10,7 +10,7 @@ StorageID tryParseTableIDFromDDL(const String & query, const String & default_da
|
||||
{
|
||||
bool is_ddl = false;
|
||||
Tokens tokens(query.data(), query.data() + query.size());
|
||||
IParser::Pos pos(tokens, 0);
|
||||
IParser::Pos pos(tokens, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
Expected expected;
|
||||
if (ParserKeyword("CREATE TEMPORARY TABLE").ignore(pos, expected) || ParserKeyword("CREATE TABLE").ignore(pos, expected))
|
||||
{
|
||||
|
@ -37,7 +37,7 @@ static void quoteLiteral(
|
||||
bool tryQuoteUnrecognizedTokens(String & query)
|
||||
{
|
||||
Tokens tokens(query.data(), query.data() + query.size());
|
||||
IParser::Pos pos(tokens, 0);
|
||||
IParser::Pos pos(tokens, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
|
||||
Expected expected;
|
||||
String rewritten_query;
|
||||
const char * copy_from = query.data();
|
||||
|
@ -194,10 +194,10 @@ ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, Contex
|
||||
/// Add table_name to engine arguments
|
||||
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 1, std::make_shared<ASTLiteral>(table_id.table_name));
|
||||
|
||||
unsigned max_parser_depth = static_cast<unsigned>(getContext()->getSettingsRef().max_parser_depth);
|
||||
const Settings & settings = getContext()->getSettingsRef();
|
||||
|
||||
auto create_table_query = DB::getCreateQueryFromStorage(storage, table_storage_define, true,
|
||||
max_parser_depth,
|
||||
throw_on_error);
|
||||
static_cast<uint32_t>(settings.max_parser_depth), static_cast<uint32_t>(settings.max_parser_backtracks), throw_on_error);
|
||||
|
||||
return create_table_query;
|
||||
}
|
||||
|
@ -9,7 +9,7 @@
|
||||
#include <Common/ProfilingScopedRWLock.h>
|
||||
|
||||
#include <Dictionaries/DictionarySource.h>
|
||||
#include <Dictionaries/DictionarySourceHelpers.h>
|
||||
#include <Dictionaries/DictionaryPipelineExecutor.h>
|
||||
#include <Dictionaries/HierarchyDictionariesUtils.h>
|
||||
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
|
42
src/Dictionaries/DictionaryPipelineExecutor.cpp
Normal file
42
src/Dictionaries/DictionaryPipelineExecutor.cpp
Normal file
@ -0,0 +1,42 @@
|
||||
#include <Dictionaries/DictionaryPipelineExecutor.h>
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <QueryPipeline/QueryPipeline.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
DictionaryPipelineExecutor::DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async)
|
||||
: async_executor(async ? std::make_unique<PullingAsyncPipelineExecutor>(pipeline_) : nullptr)
|
||||
, executor(async ? nullptr : std::make_unique<PullingPipelineExecutor>(pipeline_))
|
||||
{
|
||||
}
|
||||
|
||||
bool DictionaryPipelineExecutor::pull(Block & block)
|
||||
{
|
||||
if (async_executor)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
bool has_data = async_executor->pull(block);
|
||||
if (has_data && !block)
|
||||
continue;
|
||||
return has_data;
|
||||
}
|
||||
}
|
||||
else if (executor)
|
||||
return executor->pull(block);
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "DictionaryPipelineExecutor is not initialized");
|
||||
}
|
||||
|
||||
DictionaryPipelineExecutor::~DictionaryPipelineExecutor() = default;
|
||||
|
||||
}
|
27
src/Dictionaries/DictionaryPipelineExecutor.h
Normal file
27
src/Dictionaries/DictionaryPipelineExecutor.h
Normal file
@ -0,0 +1,27 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Block;
|
||||
class QueryPipeline;
|
||||
class PullingAsyncPipelineExecutor;
|
||||
class PullingPipelineExecutor;
|
||||
|
||||
/// Wrapper for `Pulling(Async)PipelineExecutor` to dynamically dispatch calls to the right executor
|
||||
class DictionaryPipelineExecutor
|
||||
{
|
||||
public:
|
||||
DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async);
|
||||
bool pull(Block & block);
|
||||
|
||||
~DictionaryPipelineExecutor();
|
||||
|
||||
private:
|
||||
std::unique_ptr<PullingAsyncPipelineExecutor> async_executor;
|
||||
std::unique_ptr<PullingPipelineExecutor> executor;
|
||||
};
|
||||
|
||||
}
|
@ -9,15 +9,11 @@
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Common/SettingsChanges.h>
|
||||
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
@ -135,29 +131,4 @@ String TransformWithAdditionalColumns::getName() const
|
||||
return "TransformWithAdditionalColumns";
|
||||
}
|
||||
|
||||
DictionaryPipelineExecutor::DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async)
|
||||
: async_executor(async ? std::make_unique<PullingAsyncPipelineExecutor>(pipeline_) : nullptr)
|
||||
, executor(async ? nullptr : std::make_unique<PullingPipelineExecutor>(pipeline_))
|
||||
{}
|
||||
|
||||
bool DictionaryPipelineExecutor::pull(Block & block)
|
||||
{
|
||||
if (async_executor)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
bool has_data = async_executor->pull(block);
|
||||
if (has_data && !block)
|
||||
continue;
|
||||
return has_data;
|
||||
}
|
||||
}
|
||||
else if (executor)
|
||||
return executor->pull(block);
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "DictionaryPipelineExecutor is not initialized");
|
||||
}
|
||||
|
||||
DictionaryPipelineExecutor::~DictionaryPipelineExecutor() = default;
|
||||
|
||||
}
|
||||
|
@ -16,10 +16,6 @@ namespace DB
|
||||
struct DictionaryStructure;
|
||||
class SettingsChanges;
|
||||
|
||||
class PullingPipelineExecutor;
|
||||
class PullingAsyncPipelineExecutor;
|
||||
class QueryPipeline;
|
||||
|
||||
/// For simple key
|
||||
|
||||
Block blockForIds(
|
||||
@ -55,17 +51,4 @@ private:
|
||||
size_t current_range_index = 0;
|
||||
};
|
||||
|
||||
/// Wrapper for `Pulling(Async)PipelineExecutor` to dynamically dispatch calls to the right executor
|
||||
class DictionaryPipelineExecutor
|
||||
{
|
||||
public:
|
||||
DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async);
|
||||
bool pull(Block & block);
|
||||
|
||||
~DictionaryPipelineExecutor();
|
||||
private:
|
||||
std::unique_ptr<PullingAsyncPipelineExecutor> async_executor;
|
||||
std::unique_ptr<PullingPipelineExecutor> executor;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -41,6 +41,33 @@ enum class AttributeUnderlyingType : TypeIndexUnderlying
|
||||
|
||||
#undef map_item
|
||||
|
||||
|
||||
#define CALL_FOR_ALL_DICTIONARY_ATTRIBUTE_TYPES(M) \
|
||||
M(UInt8) \
|
||||
M(UInt16) \
|
||||
M(UInt32) \
|
||||
M(UInt64) \
|
||||
M(UInt128) \
|
||||
M(UInt256) \
|
||||
M(Int8) \
|
||||
M(Int16) \
|
||||
M(Int32) \
|
||||
M(Int64) \
|
||||
M(Int128) \
|
||||
M(Int256) \
|
||||
M(Decimal32) \
|
||||
M(Decimal64) \
|
||||
M(Decimal128) \
|
||||
M(Decimal256) \
|
||||
M(DateTime64) \
|
||||
M(Float32) \
|
||||
M(Float64) \
|
||||
M(UUID) \
|
||||
M(IPv4) \
|
||||
M(IPv6) \
|
||||
M(String) \
|
||||
M(Array)
|
||||
|
||||
/// Min and max lifetimes for a dictionary or its entry
|
||||
using DictionaryLifetime = ExternalLoadableLifetime;
|
||||
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user