Merge remote-tracking branch 'blessed/master' into materialization_log

This commit is contained in:
Raúl Marín 2021-08-05 17:24:10 +02:00
commit 479f053f2e
185 changed files with 1866 additions and 810 deletions

View File

@ -7,6 +7,6 @@ assignees: ''
---
Make sure to check documentation https://clickhouse.yandex/docs/en/ first. If the question is concise and probably has a short answer, asking it in Telegram chat https://telegram.me/clickhouse_en is probably the fastest way to find the answer. For more complicated questions, consider asking them on StackOverflow with "clickhouse" tag https://stackoverflow.com/questions/tagged/clickhouse
> Make sure to check documentation https://clickhouse.yandex/docs/en/ first. If the question is concise and probably has a short answer, asking it in Telegram chat https://telegram.me/clickhouse_en is probably the fastest way to find the answer. For more complicated questions, consider asking them on StackOverflow with "clickhouse" tag https://stackoverflow.com/questions/tagged/clickhouse
If you still prefer GitHub issues, remove all this text and ask your question here.
> If you still prefer GitHub issues, remove all this text and ask your question here.

View File

@ -7,16 +7,20 @@ assignees: ''
---
(you don't have to strictly follow this form)
> (you don't have to strictly follow this form)
**Use case**
A clear and concise description of what is the intended usage scenario is.
> A clear and concise description of what is the intended usage scenario is.
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
> A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
> A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.
> Add any other context or screenshots about the feature request here.

View File

@ -7,11 +7,11 @@ assignees: ''
---
You have to provide the following information whenever possible.
> You have to provide the following information whenever possible.
**Describe the bug**
A clear and concise description of what works not as it is supposed to.
> A clear and concise description of what works not as it is supposed to.
**Does it reproduce on recent release?**
@ -19,7 +19,7 @@ A clear and concise description of what works not as it is supposed to.
**Enable crash reporting**
If possible, change "enabled" to true in "send_crash_reports" section in `config.xml`:
> If possible, change "enabled" to true in "send_crash_reports" section in `config.xml`:
```
<send_crash_reports>
@ -39,12 +39,12 @@ If possible, change "enabled" to true in "send_crash_reports" section in `config
**Expected behavior**
A clear and concise description of what you expected to happen.
> A clear and concise description of what you expected to happen.
**Error message and/or stacktrace**
If applicable, add screenshots to help explain your problem.
> If applicable, add screenshots to help explain your problem.
**Additional context**
Add any other context about the problem here.
> Add any other context about the problem here.

View File

@ -7,10 +7,11 @@ assignees: ''
---
Make sure that `git diff` result is empty and you've just pulled fresh master. Try cleaning up cmake cache. Just in case, official build instructions are published here: https://clickhouse.yandex/docs/en/development/build/
> Make sure that `git diff` result is empty and you've just pulled fresh master. Try cleaning up cmake cache. Just in case, official build instructions are published here: https://clickhouse.yandex/docs/en/development/build/
**Operating system**
OS kind or distribution, specific version/release, non-standard kernel if any. If you are trying to build inside virtual machine, please mention it too.
> OS kind or distribution, specific version/release, non-standard kernel if any. If you are trying to build inside virtual machine, please mention it too.
**Cmake version**

2
contrib/AMQP-CPP vendored

@ -1 +1 @@
Subproject commit 03781aaff0f10ef41f902b8cf865fe0067180c10
Subproject commit 1a6c51f4ac51ac56610fa95081bd2f349911375a

View File

@ -10,11 +10,12 @@ set (SRCS
"${LIBRARY_DIR}/src/deferredconsumer.cpp"
"${LIBRARY_DIR}/src/deferredextreceiver.cpp"
"${LIBRARY_DIR}/src/deferredget.cpp"
"${LIBRARY_DIR}/src/deferredpublisher.cpp"
"${LIBRARY_DIR}/src/deferredrecall.cpp"
"${LIBRARY_DIR}/src/deferredreceiver.cpp"
"${LIBRARY_DIR}/src/field.cpp"
"${LIBRARY_DIR}/src/flags.cpp"
"${LIBRARY_DIR}/src/linux_tcp/openssl.cpp"
"${LIBRARY_DIR}/src/linux_tcp/sslerrorprinter.cpp"
"${LIBRARY_DIR}/src/linux_tcp/tcpconnection.cpp"
"${LIBRARY_DIR}/src/inbuffer.cpp"
"${LIBRARY_DIR}/src/receivedframe.cpp"

2
contrib/arrow vendored

@ -1 +1 @@
Subproject commit debf751a129bdda9ff4d1e895e08957ff77000a1
Subproject commit 078e21bad344747b7656ef2d7a4f7410a0a303eb

View File

@ -194,9 +194,18 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/compute/cast.cc"
"${LIBRARY_DIR}/compute/exec.cc"
"${LIBRARY_DIR}/compute/function.cc"
"${LIBRARY_DIR}/compute/function_internal.cc"
"${LIBRARY_DIR}/compute/kernel.cc"
"${LIBRARY_DIR}/compute/registry.cc"
"${LIBRARY_DIR}/compute/exec/exec_plan.cc"
"${LIBRARY_DIR}/compute/exec/expression.cc"
"${LIBRARY_DIR}/compute/exec/key_compare.cc"
"${LIBRARY_DIR}/compute/exec/key_encode.cc"
"${LIBRARY_DIR}/compute/exec/key_hash.cc"
"${LIBRARY_DIR}/compute/exec/key_map.cc"
"${LIBRARY_DIR}/compute/exec/util.cc"
"${LIBRARY_DIR}/compute/kernels/aggregate_basic.cc"
"${LIBRARY_DIR}/compute/kernels/aggregate_mode.cc"
"${LIBRARY_DIR}/compute/kernels/aggregate_quantile.cc"
@ -207,6 +216,7 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/compute/kernels/scalar_arithmetic.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_boolean.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_cast_boolean.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_cast_dictionary.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_cast_internal.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_cast_nested.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_cast_numeric.cc"
@ -214,15 +224,18 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/compute/kernels/scalar_cast_temporal.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_compare.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_fill_null.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_if_else.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_nested.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_set_lookup.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_string.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_temporal.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_validity.cc"
"${LIBRARY_DIR}/compute/kernels/util_internal.cc"
"${LIBRARY_DIR}/compute/kernels/vector_hash.cc"
"${LIBRARY_DIR}/compute/kernels/vector_nested.cc"
"${LIBRARY_DIR}/compute/kernels/vector_replace.cc"
"${LIBRARY_DIR}/compute/kernels/vector_selection.cc"
"${LIBRARY_DIR}/compute/kernels/vector_sort.cc"
"${LIBRARY_DIR}/compute/kernels/util_internal.cc"
"${LIBRARY_DIR}/csv/chunker.cc"
"${LIBRARY_DIR}/csv/column_builder.cc"
@ -231,6 +244,7 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/csv/options.cc"
"${LIBRARY_DIR}/csv/parser.cc"
"${LIBRARY_DIR}/csv/reader.cc"
"${LIBRARY_DIR}/csv/writer.cc"
"${LIBRARY_DIR}/ipc/dictionary.cc"
"${LIBRARY_DIR}/ipc/feather.cc"
@ -247,6 +261,7 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/io/interfaces.cc"
"${LIBRARY_DIR}/io/memory.cc"
"${LIBRARY_DIR}/io/slow.cc"
"${LIBRARY_DIR}/io/stdio.cc"
"${LIBRARY_DIR}/io/transform.cc"
"${LIBRARY_DIR}/tensor/coo_converter.cc"
@ -257,9 +272,9 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/util/bit_block_counter.cc"
"${LIBRARY_DIR}/util/bit_run_reader.cc"
"${LIBRARY_DIR}/util/bit_util.cc"
"${LIBRARY_DIR}/util/bitmap.cc"
"${LIBRARY_DIR}/util/bitmap_builders.cc"
"${LIBRARY_DIR}/util/bitmap_ops.cc"
"${LIBRARY_DIR}/util/bitmap.cc"
"${LIBRARY_DIR}/util/bpacking.cc"
"${LIBRARY_DIR}/util/cancel.cc"
"${LIBRARY_DIR}/util/compression.cc"

View File

@ -0,0 +1,13 @@
version: '2.3'
services:
mongo1:
image: mongo:3.6
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: clickhouse
volumes:
- ${MONGO_CONFIG_PATH}:/mongo/
ports:
- ${MONGO_EXTERNAL_PORT}:${MONGO_INTERNAL_PORT}
command: --config /mongo/mongo_secure.conf --profile=2 --verbose

View File

@ -183,6 +183,10 @@ for conn_index, c in enumerate(all_connections):
# requires clickhouse-driver >= 1.1.5 to accept arbitrary new settings
# (https://github.com/mymarilyn/clickhouse-driver/pull/142)
c.settings[s.tag] = s.text
# We have to perform a query to make sure the settings work. Otherwise an
# unknown setting will lead to failing precondition check, and we will skip
# the test, which is wrong.
c.execute("select 1")
reportStageEnd('settings')

View File

@ -15,7 +15,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
name1 [type1],
name2 [type2],
...
) ENGINE = MongoDB(host:port, database, collection, user, password);
) ENGINE = MongoDB(host:port, database, collection, user, password [, options]);
```
**Engine Parameters**
@ -30,9 +30,11 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
- `password` — User password.
- `options` — MongoDB connection string options (optional parameter).
## Usage Example {#usage-example}
Table in ClickHouse which allows to read data from MongoDB collection:
Create a table in ClickHouse which allows to read data from MongoDB collection:
``` text
CREATE TABLE mongo_table
@ -42,6 +44,16 @@ CREATE TABLE mongo_table
) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'testuser', 'clickhouse');
```
To read from an SSL secured MongoDB server:
``` text
CREATE TABLE mongo_table_ssl
(
key UInt64,
data String
) ENGINE = MongoDB('mongo2:27017', 'test', 'simple_table', 'testuser', 'clickhouse', 'ssl=true');
```
Query:
``` sql

View File

@ -84,6 +84,8 @@ Features:
- Table data preview.
- Full-text search.
By default, DBeaver does not connect using a session (the CLI for example does). If you require session support (for example to set settings for your session), edit the driver connection properties and set session_id to a random string (it uses the http connection under the hood). Then you can use any setting from the query window
### clickhouse-cli {#clickhouse-cli}
[clickhouse-cli](https://github.com/hatarist/clickhouse-cli) is an alternative command-line client for ClickHouse, written in Python 3.

View File

@ -28,7 +28,7 @@ Structure of the `users` section:
<profile>profile_name</profile>
<quota>default</quota>
<default_database>default<default_database>
<databases>
<database_name>
<table_name>

View File

@ -82,6 +82,7 @@ The next 4 columns have a non-zero value only where there is an active session w
- `absolute_delay` (`UInt64`) - How big lag in seconds the current replica has.
- `total_replicas` (`UInt8`) - The total number of known replicas of this table.
- `active_replicas` (`UInt8`) - The number of replicas of this table that have a session in ZooKeeper (i.e., the number of functioning replicas).
- `replica_is_active` ([Map(String, UInt8)](../../sql-reference/data-types/map.md)) — Map between replica name and is replica active.
If you request all the columns, the table may work a bit slowly, since several reads from ZooKeeper are made for each row.
If you do not request the last 4 columns (log_max_index, log_pointer, total_replicas, active_replicas), the table works quickly.

View File

@ -275,9 +275,13 @@ The dictionary is stored in a cache that has a fixed number of cells. These cell
When searching for a dictionary, the cache is searched first. For each block of data, all keys that are not found in the cache or are outdated are requested from the source using `SELECT attrs... FROM db.table WHERE id IN (k1, k2, ...)`. The received data is then written to the cache.
For cache dictionaries, the expiration [lifetime](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-lifetime.md) of data in the cache can be set. If more time than `lifetime` has passed since loading the data in a cell, the cells value is not used, and it is re-requested the next time it needs to be used.
If keys are not found in dictionary, then update cache task is created and added into update queue. Update queue properties can be controlled with settings `max_update_queue_size`, `update_queue_push_timeout_milliseconds`, `query_wait_timeout_milliseconds`, `max_threads_for_updates`.
For cache dictionaries, the expiration [lifetime](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-lifetime.md) of data in the cache can be set. If more time than `lifetime` has passed since loading the data in a cell, the cells value is not used and key becomes expired, and it is re-requested the next time it needs to be used this behaviour can be configured with setting `allow_read_expired_keys`.
This is the least effective of all the ways to store dictionaries. The speed of the cache depends strongly on correct settings and the usage scenario. A cache type dictionary performs well only when the hit rates are high enough (recommended 99% and higher). You can view the average hit rate in the `system.dictionaries` table.
If setting `allow_read_expired_keys` is set to 1, by default 0. Then dictionary can support asynchronous updates. If a client requests keys and all of them are in cache, but some of them are expired, then dictionary will return expired keys for a client and request them asynchronously from the source.
To improve cache performance, use a subquery with `LIMIT`, and call the function with the dictionary externally.
Supported [sources](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md): MySQL, ClickHouse, executable, HTTP.
@ -289,6 +293,16 @@ Example of settings:
<cache>
<!-- The size of the cache, in number of cells. Rounded up to a power of two. -->
<size_in_cells>1000000000</size_in_cells>
<!-- Allows to read expired keys. -->
<allow_read_expired_keys>0</allow_read_expired_keys>
<!-- Max size of update queue. -->
<max_update_queue_size>100000</max_update_queue_size>
<!-- Max timeout in milliseconds for push update task into queue. -->
<update_queue_push_timeout_milliseconds>10</update_queue_push_timeout_milliseconds>
<!-- Max wait timeout in milliseconds for update task to complete. -->
<query_wait_timeout_milliseconds>60000</query_wait_timeout_milliseconds>
<!-- Max threads for cache dictionary update. -->
<max_threads_for_updates>4</max_threads_for_updates>
</cache>
</layout>
```
@ -315,7 +329,7 @@ This type of storage is for use with composite [keys](../../../sql-reference/dic
### ssd_cache {#ssd-cache}
Similar to `cache`, but stores data on SSD and index in RAM.
Similar to `cache`, but stores data on SSD and index in RAM. All cache dictionary settings related to update queue can also be applied to SSD cache dictionaries.
``` xml
<layout>

View File

@ -3,11 +3,14 @@ toc_priority: 67
toc_title: NLP
---
# Natural Language Processing functions {#nlp-functions}
# [experimental] Natural Language Processing functions {#nlp-functions}
!!! warning "Warning"
This is an experimental feature that is currently in development and is not ready for general use. It will change in unpredictable backwards-incompatible ways in future releases. Set `allow_experimental_nlp_functions = 1` to enable it.
## stem {#stem}
Performs stemming on a previously tokenized text.
Performs stemming on a given word.
**Syntax**
@ -38,7 +41,7 @@ Result:
## lemmatize {#lemmatize}
Performs lemmatization on a given word.
Performs lemmatization on a given word. Needs dictionaries to operate, which can be obtained [here](https://github.com/vpodpecan/lemmagen3/tree/master/src/lemmagen3/models).
**Syntax**
@ -79,7 +82,11 @@ Configuration:
## synonyms {#synonyms}
Finds synonyms to a given word.
Finds synonyms to a given word. There are two types of synonym extensions: `plain` and `wordnet`.
With the `plain` extension type we need to provide a path to a simple text file, where each line corresponds to a certain synonym set. Words in this line must be separated with space or tab characters.
With the `wordnet` extension type we need to provide a path to a directory with WordNet thesaurus in it. Thesaurus must contain a WordNet sense index.
**Syntax**
@ -89,7 +96,7 @@ synonyms('extension_name', word)
**Arguments**
- `extension_name` — Name of the extention in which search will be performed. [String](../../sql-reference/data-types/string.md#string).
- `extension_name` — Name of the extension in which search will be performed. [String](../../sql-reference/data-types/string.md#string).
- `word` — Word that will be searched in extension. [String](../../sql-reference/data-types/string.md#string).
**Examples**
@ -122,4 +129,4 @@ Configuration:
<path>en/</path>
</extension>
</synonyms_extensions>
```
```

View File

@ -15,6 +15,7 @@ CREATE USER [IF NOT EXISTS | OR REPLACE] name1 [ON CLUSTER cluster_name1]
[NOT IDENTIFIED | IDENTIFIED {[WITH {no_password | plaintext_password | sha256_password | sha256_hash | double_sha1_password | double_sha1_hash}] BY {'password' | 'hash'}} | {WITH ldap SERVER 'server_name'} | {WITH kerberos [REALM 'realm']}]
[HOST {LOCAL | NAME 'name' | REGEXP 'name_regexp' | IP 'address' | LIKE 'pattern'} [,...] | ANY | NONE]
[DEFAULT ROLE role [,...]]
[DEFAULT DATABASE database | NONE]
[GRANTEES {user | role | ANY | NONE} [,...] [EXCEPT {user | role} [,...]]]
[SETTINGS variable [= value] [MIN [=] min_value] [MAX [=] max_value] [READONLY | WRITABLE] | PROFILE 'profile_name'] [,...]
```

View File

@ -274,28 +274,28 @@ This modifier also can be combined with [LIMIT … WITH TIES modifier](../../../
`WITH FILL` modifier can be set after `ORDER BY expr` with optional `FROM expr`, `TO expr` and `STEP expr` parameters.
All missed values of `expr` column will be filled sequentially and other columns will be filled as defaults.
Use following syntax for filling multiple columns add `WITH FILL` modifier with optional parameters after each field name in `ORDER BY` section.
To fill multiple columns, add `WITH FILL` modifier with optional parameters after each field name in `ORDER BY` section.
``` sql
ORDER BY expr [WITH FILL] [FROM const_expr] [TO const_expr] [STEP const_numeric_expr], ... exprN [WITH FILL] [FROM expr] [TO expr] [STEP numeric_expr]
```
`WITH FILL` can be applied only for fields with Numeric (all kind of float, decimal, int) or Date/DateTime types.
`WITH FILL` can be applied for fields with Numeric (all kinds of float, decimal, int) or Date/DateTime types. When applied for `String` fields, missed values are filled with empty strings.
When `FROM const_expr` not defined sequence of filling use minimal `expr` field value from `ORDER BY`.
When `TO const_expr` not defined sequence of filling use maximum `expr` field value from `ORDER BY`.
When `STEP const_numeric_expr` defined then `const_numeric_expr` interprets `as is` for numeric types as `days` for Date type and as `seconds` for DateTime type.
When `STEP const_numeric_expr` omitted then sequence of filling use `1.0` for numeric type, `1 day` for Date type and `1 second` for DateTime type.
For example, the following query
Example of a query without `WITH FILL`:
``` sql
SELECT n, source FROM (
SELECT toFloat32(number % 10) AS n, 'original' AS source
FROM numbers(10) WHERE number % 3 = 1
) ORDER BY n
) ORDER BY n;
```
returns
Result:
``` text
┌─n─┬─source───┐
@ -305,16 +305,16 @@ returns
└───┴──────────┘
```
but after apply `WITH FILL` modifier
Same query after applying `WITH FILL` modifier:
``` sql
SELECT n, source FROM (
SELECT toFloat32(number % 10) AS n, 'original' AS source
FROM numbers(10) WHERE number % 3 = 1
) ORDER BY n WITH FILL FROM 0 TO 5.51 STEP 0.5
) ORDER BY n WITH FILL FROM 0 TO 5.51 STEP 0.5;
```
returns
Result:
``` text
┌───n─┬─source───┐
@ -334,7 +334,7 @@ returns
└─────┴──────────┘
```
For the case when we have multiple fields `ORDER BY field2 WITH FILL, field1 WITH FILL` order of filling will follow the order of fields in `ORDER BY` clause.
For the case with multiple fields `ORDER BY field2 WITH FILL, field1 WITH FILL` order of filling will follow the order of fields in the `ORDER BY` clause.
Example:
@ -350,7 +350,7 @@ ORDER BY
d1 WITH FILL STEP 5;
```
returns
Result:
``` text
┌───d1───────┬───d2───────┬─source───┐
@ -364,9 +364,9 @@ returns
└────────────┴────────────┴──────────┘
```
Field `d1` does not fill and use default value cause we do not have repeated values for `d2` value, and sequence for `d1` cant be properly calculated.
Field `d1` does not fill in and use the default value cause we do not have repeated values for `d2` value, and the sequence for `d1` cant be properly calculated.
The following query with a changed field in `ORDER BY`
The following query with the changed field in `ORDER BY`:
``` sql
SELECT
@ -380,7 +380,7 @@ ORDER BY
d2 WITH FILL;
```
returns
Result:
``` text
┌───d1───────┬───d2───────┬─source───┐

View File

@ -5,9 +5,6 @@ toc_title: Window Functions
# [experimental] Window Functions
!!! warning "Warning"
This is an experimental feature that is currently in development and is not ready for general use. It will change in unpredictable backwards-incompatible ways in the future releases. Set `allow_experimental_window_functions = 1` to enable it.
ClickHouse supports the standard grammar for defining windows and window functions. The following features are currently supported:
| Feature | Support or workaround |

View File

@ -87,7 +87,7 @@ toc_title: "Введение"
Виртуальный столбец — это неотъемлемый атрибут движка таблиц, определенный в исходном коде движка.
Виртуальные столбцы не надо указывать в запросе `CREATE TABLE` и их не отображаются в результатах запросов `SHOW CREATE TABLE` и `DESCRIBE TABLE`. Также виртуальные столбцы доступны только для чтения, поэтому вы не можете вставлять в них данные.
Виртуальные столбцы не надо указывать в запросе `CREATE TABLE` и они не отображаются в результатах запросов `SHOW CREATE TABLE` и `DESCRIBE TABLE`. Также виртуальные столбцы доступны только для чтения, поэтому вы не можете вставлять в них данные.
Чтобы получить данные из виртуального столбца, необходимо указать его название в запросе `SELECT`. `SELECT *` не отображает данные из виртуальных столбцов.

View File

@ -111,7 +111,7 @@ toc_title: "Визуальные интерфейсы от сторонних р
### DataGrip {#datagrip}
[DataGrip](https://www.jetbrains.com/datagrip/) — это IDE для баз данных о JetBrains с выделенной поддержкой ClickHouse. Он также встроен в другие инструменты на основе IntelliJ: PyCharm, IntelliJ IDEA, GoLand, PhpStorm и другие.
[DataGrip](https://www.jetbrains.com/datagrip/) — это IDE для баз данных от JetBrains с выделенной поддержкой ClickHouse. Он также встроен в другие инструменты на основе IntelliJ: PyCharm, IntelliJ IDEA, GoLand, PhpStorm и другие.
Основные возможности:

View File

@ -3,7 +3,10 @@ toc_priority: 67
toc_title: NLP
---
# Функции для работы с ествественным языком {#nlp-functions}
# [экспериментально] Функции для работы с ествественным языком {#nlp-functions}
!!! warning "Предупреждение"
Сейчас использование функций для работы с ествественным языком является экспериментальной возможностью. Чтобы использовать данные функции, включите настройку `allow_experimental_nlp_functions = 1`.
## stem {#stem}
@ -38,7 +41,7 @@ Result:
## lemmatize {#lemmatize}
Данная функция проводит лемматизацию для заданного слова.
Данная функция проводит лемматизацию для заданного слова. Для работы лемматизатора необходимы словари, которые можно найти [здесь](https://github.com/vpodpecan/lemmagen3/tree/master/src/lemmagen3/models).
**Синтаксис**
@ -79,7 +82,11 @@ SELECT lemmatize('en', 'wolves');
## synonyms {#synonyms}
Находит синонимы к заданному слову.
Находит синонимы к заданному слову. Представлены два типа расширений словарей: `plain` и `wordnet`.
Для работы расширения типа `plain` необходимо указать путь до простого текстового файла, где каждая строка соотвествует одному набору синонимов. Слова в данной строке должны быть разделены с помощью пробела или знака табуляции.
Для работы расширения типа `plain` необходимо указать путь до WordNet тезауруса. Тезаурус должен содержать WordNet sense index.
**Синтаксис**

View File

@ -271,8 +271,8 @@ SELECT * FROM collate_test ORDER BY s ASC COLLATE 'en';
Этот модификатор также может быть скобинирован с модификатором [LIMIT ... WITH TIES](../../../sql-reference/statements/select/limit.md#limit-with-ties)
`WITH FILL` модификатор может быть установлен после `ORDER BY expr` с опциональными параметрами `FROM expr`, `TO expr` и `STEP expr`.
Все пропущенные значнеия для колонки `expr` будут заполненые значениями соответсвующими предполагаемой последовательности значений колонки, другие колонки будут заполнены значенями по умолчанию.
Модификатор `WITH FILL` может быть установлен после `ORDER BY expr` с опциональными параметрами `FROM expr`, `TO expr` и `STEP expr`.
Все пропущенные значения для колонки `expr` будут заполнены значениями, соответствующими предполагаемой последовательности значений колонки, другие колонки будут заполнены значениями по умолчанию.
Используйте следующую конструкцию для заполнения нескольких колонок с модификатором `WITH FILL` с необязательными параметрами после каждого имени поля в секции `ORDER BY`.
@ -280,22 +280,22 @@ SELECT * FROM collate_test ORDER BY s ASC COLLATE 'en';
ORDER BY expr [WITH FILL] [FROM const_expr] [TO const_expr] [STEP const_numeric_expr], ... exprN [WITH FILL] [FROM expr] [TO expr] [STEP numeric_expr]
```
`WITH FILL` может быть применене только к полям с числовыми (все разновидности float, int, decimal) или временными (все разновидности Date, DateTime) типами.
`WITH FILL` может быть применен к полям с числовыми (все разновидности float, int, decimal) или временными (все разновидности Date, DateTime) типами. В случае применения к полям типа `String` недостающие значения заполняются пустой строкой.
Когда не определен `FROM const_expr`, последовательность заполнения использует минимальное значение поля `expr` из `ORDER BY`.
Когда не определен `TO const_expr`, последовательность заполнения использует максимальное значение поля `expr` из `ORDER BY`.
Когда `STEP const_numeric_expr` определен, тогда `const_numeric_expr` интерпретируется `как есть` для числовых типов, как `дни` для типа Date и как `секунды` для типа DateTime.
Когда `STEP const_numeric_expr` определен, `const_numeric_expr` интерпретируется "как есть" для числовых типов, как "дни" для типа `Date` и как "секунды" для типа `DateTime`.
Когда `STEP const_numeric_expr` не указан, тогда используется `1.0` для числовых типов, `1 день` для типа Date и `1 секунда` для типа DateTime.
Для примера, следующий запрос
Пример запроса без использования `WITH FILL`:
```sql
SELECT n, source FROM (
SELECT toFloat32(number % 10) AS n, 'original' AS source
FROM numbers(10) WHERE number % 3 = 1
) ORDER BY n
) ORDER BY n;
```
возвращает
Результат:
```text
┌─n─┬─source───┐
│ 1 │ original │
@ -304,7 +304,7 @@ SELECT n, source FROM (
└───┴──────────┘
```
но после применения модификатора `WITH FILL`
Тот же запрос после применения модификатора `WITH FILL`:
```sql
SELECT n, source FROM (
SELECT toFloat32(number % 10) AS n, 'original' AS source
@ -312,7 +312,8 @@ SELECT n, source FROM (
) ORDER BY n WITH FILL FROM 0 TO 5.51 STEP 0.5
```
возвращает
Результат:
```text
┌───n─┬─source───┐
│ 0 │ │
@ -331,13 +332,13 @@ SELECT n, source FROM (
└─────┴──────────┘
```
Для случая когда у нас есть несколько полей `ORDER BY field2 WITH FILL, field1 WITH FILL` порядок заполнения будет следовать порядку полей в секции `ORDER BY`.
Для случая с несколькими полями `ORDER BY field2 WITH FILL, field1 WITH FILL` порядок заполнения будет соответствовать порядку полей в секции `ORDER BY`.
Пример:
```sql
SELECT
toDate((number * 10) * 86400) AS d1,
toDate(number * 86400) AS d2,
```sql
SELECT
toDate((number * 10) * 86400) AS d1,
toDate(number * 86400) AS d2,
'original' AS source
FROM numbers(10)
WHERE (number % 3) = 1
@ -346,7 +347,7 @@ ORDER BY
d1 WITH FILL STEP 5;
```
возвращает
Результат:
```text
┌───d1───────┬───d2───────┬─source───┐
│ 1970-01-11 │ 1970-01-02 │ original │
@ -359,9 +360,9 @@ ORDER BY
└────────────┴────────────┴──────────┘
```
Поле `d1` не заполняет и используется значение по умолчанию поскольку у нас нет повторяющихся значения для `d2` поэтому мы не можем правильно рассчитать последователность заполнения для`d1`.
Поле `d1` не заполняется и использует значение по умолчанию. Поскольку у нас нет повторяющихся значений для `d2`, мы не можем правильно рассчитать последователность заполнения для `d1`.
едующий запрос (с измененым порядком в ORDER BY)
едующий запрос (с измененым порядком в ORDER BY):
```sql
SELECT
toDate((number * 10) * 86400) AS d1,
@ -374,7 +375,7 @@ ORDER BY
d2 WITH FILL;
```
возвращает
Результат:
```text
┌───d1───────┬───d2───────┬─source───┐
│ 1970-01-11 │ 1970-01-02 │ original │

View File

@ -197,7 +197,7 @@ private:
std::unique_ptr<ShellCommand> pager_cmd;
/// The user can specify to redirect query output to a file.
std::optional<WriteBufferFromFile> out_file_buf;
std::unique_ptr<WriteBuffer> out_file_buf;
BlockOutputStreamPtr block_out_stream;
/// The user could specify special file for server logs (stderr by default)
@ -2238,8 +2238,11 @@ private:
const auto & out_file_node = query_with_output->out_file->as<ASTLiteral &>();
const auto & out_file = out_file_node.value.safeGet<std::string>();
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
out_buf = &*out_file_buf;
out_file_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
chooseCompressionMethod(out_file, ""),
/* compression level = */ 3
);
// We are writing to file, so default format is the same as in non-interactive mode.
if (is_interactive && is_default_format)
@ -2259,9 +2262,9 @@ private:
/// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly.
if (!need_render_progress)
block_out_stream = context->getOutputStreamParallelIfPossible(current_format, *out_buf, block);
block_out_stream = context->getOutputStreamParallelIfPossible(current_format, out_file_buf ? *out_file_buf : *out_buf, block);
else
block_out_stream = context->getOutputStream(current_format, *out_buf, block);
block_out_stream = context->getOutputStream(current_format, out_file_buf ? *out_file_buf : *out_buf, block);
block_out_stream->writePrefix();
}

View File

@ -12,8 +12,8 @@ namespace DB
Poco::URI uri{request.getURI()};
LOG_DEBUG(log, "Request URI: {}", uri.toString());
if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return std::make_unique<PingHandler>(keep_alive_timeout);
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return std::make_unique<LibraryExistsHandler>(keep_alive_timeout, getContext());
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, getContext());

View File

@ -17,8 +17,24 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_REQUEST_PARAMETER;
}
namespace
{
void processError(HTTPServerResponse & response, const std::string & message)
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
LOG_WARNING(&Poco::Logger::get("LibraryBridge"), message);
}
std::shared_ptr<Block> parseColumns(std::string && column_string)
{
auto sample_block = std::make_shared<Block>();
@ -30,9 +46,8 @@ namespace
return sample_block;
}
std::vector<uint64_t> parseIdsFromBinary(const std::string & ids_string)
std::vector<uint64_t> parseIdsFromBinary(ReadBuffer & buf)
{
ReadBufferFromString buf(ids_string);
std::vector<uint64_t> ids;
readVectorBinary(ids, buf);
return ids;
@ -67,13 +82,36 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
std::string method = params.get("method");
std::string dictionary_id = params.get("dictionary_id");
LOG_TRACE(log, "Library method: '{}', dictionary id: {}", method, dictionary_id);
LOG_TRACE(log, "Library method: '{}', dictionary id: {}", method, dictionary_id);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try
{
if (method == "libNew")
bool lib_new = (method == "libNew");
if (method == "libClone")
{
if (!params.has("from_dictionary_id"))
{
processError(response, "No 'from_dictionary_id' in request URL");
return;
}
std::string from_dictionary_id = params.get("from_dictionary_id");
bool cloned = false;
cloned = SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
if (cloned)
{
writeStringBinary("1", out);
}
else
{
LOG_TRACE(log, "Cannot clone from dictionary with id: {}, will call libNew instead");
lib_new = true;
}
}
if (lib_new)
{
auto & read_buf = request.getStream();
params.read(read_buf);
@ -92,6 +130,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
std::string library_path = params.get("library_path");
const auto & settings_string = params.get("library_settings");
LOG_DEBUG(log, "Parsing library settings from binary string");
std::vector<std::string> library_settings = parseNamesFromBinary(settings_string);
/// Needed for library dictionary
@ -102,6 +142,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
}
const auto & attributes_string = params.get("attributes_names");
LOG_DEBUG(log, "Parsing attributes names from binary string");
std::vector<std::string> attributes_names = parseNamesFromBinary(attributes_string);
/// Needed to parse block from binary string format
@ -140,54 +182,63 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
SharedLibraryHandlerFactory::instance().create(dictionary_id, library_path, library_settings, sample_block_with_nulls, attributes_names);
writeStringBinary("1", out);
}
else if (method == "libClone")
{
if (!params.has("from_dictionary_id"))
{
processError(response, "No 'from_dictionary_id' in request URL");
return;
}
std::string from_dictionary_id = params.get("from_dictionary_id");
LOG_TRACE(log, "Calling libClone from {} to {}", from_dictionary_id, dictionary_id);
SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
writeStringBinary("1", out);
}
else if (method == "libDelete")
{
SharedLibraryHandlerFactory::instance().remove(dictionary_id);
auto deleted = SharedLibraryHandlerFactory::instance().remove(dictionary_id);
/// Do not throw, a warning is ok.
if (!deleted)
LOG_WARNING(log, "Cannot delete library for with dictionary id: {}, because such id was not found.", dictionary_id);
writeStringBinary("1", out);
}
else if (method == "isModified")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
bool res = library_handler->isModified();
writeStringBinary(std::to_string(res), out);
}
else if (method == "supportsSelectiveLoad")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
bool res = library_handler->supportsSelectiveLoad();
writeStringBinary(std::to_string(res), out);
}
else if (method == "loadAll")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadAll() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadAll();
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
copyData(*input, *output);
}
else if (method == "loadIds")
{
LOG_DEBUG(log, "Getting diciontary ids for dictionary with id: {}", dictionary_id);
String ids_string;
readString(ids_string, request.getStream());
std::vector<uint64_t> ids = parseIdsFromBinary(ids_string);
std::vector<uint64_t> ids = parseIdsFromBinary(request.getStream());
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadIds() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadIds(ids);
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
copyData(*input, *output);
}
@ -219,8 +270,14 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
auto block = reader->read();
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadKeys() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadKeys(block.getColumns());
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
copyData(*input, *output);
}
@ -228,8 +285,9 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
catch (...)
{
auto message = getCurrentExceptionMessage(true);
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
LOG_ERROR(log, "Failed to process request for dictionary_id: {}. Error: {}", dictionary_id, message);
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
try
{
writeStringBinary(message, out);
@ -239,8 +297,6 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
{
tryLogCurrentException(log);
}
tryLogCurrentException(log);
}
try
@ -254,24 +310,30 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
}
void LibraryRequestHandler::processError(HTTPServerResponse & response, const std::string & message)
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
LOG_WARNING(log, message);
}
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
void LibraryExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
try
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
if (!params.has("dictionary_id"))
{
processError(response, "No 'dictionary_id' in request URL");
return;
}
std::string dictionary_id = params.get("dictionary_id");
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
String res;
if (library_handler)
res = "1";
else
res = "0";
setResponseDefaultHeaders(response, keep_alive_timeout);
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
LOG_TRACE(log, "Senging ping response: {} (dictionary id: {})", res, dictionary_id);
response.sendBuffer(res.data(), res.size());
}
catch (...)
{

View File

@ -22,8 +22,7 @@ class LibraryRequestHandler : public HTTPRequestHandler, WithContext
public:
LibraryRequestHandler(
size_t keep_alive_timeout_,
ContextPtr context_)
size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, log(&Poco::Logger::get("LibraryRequestHandler"))
, keep_alive_timeout(keep_alive_timeout_)
@ -35,18 +34,18 @@ public:
private:
static constexpr inline auto FORMAT = "RowBinary";
void processError(HTTPServerResponse & response, const std::string & message);
Poco::Logger * log;
size_t keep_alive_timeout;
};
class PingHandler : public HTTPRequestHandler
class LibraryExistsHandler : public HTTPRequestHandler, WithContext
{
public:
explicit PingHandler(size_t keep_alive_timeout_)
: keep_alive_timeout(keep_alive_timeout_)
explicit LibraryExistsHandler(size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, keep_alive_timeout(keep_alive_timeout_)
, log(&Poco::Logger::get("LibraryRequestHandler"))
{
}
@ -54,6 +53,8 @@ public:
private:
const size_t keep_alive_timeout;
Poco::Logger * log;
};
}

View File

@ -4,12 +4,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
SharedLibraryHandlerPtr SharedLibraryHandlerFactory::get(const std::string & dictionary_id)
{
std::lock_guard lock(mutex);
@ -18,7 +12,7 @@ SharedLibraryHandlerPtr SharedLibraryHandlerFactory::get(const std::string & dic
if (library_handler != library_handlers.end())
return library_handler->second;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found dictionary with id: {}", dictionary_id);
return nullptr;
}
@ -30,32 +24,32 @@ void SharedLibraryHandlerFactory::create(
const std::vector<std::string> & attributes_names)
{
std::lock_guard lock(mutex);
library_handlers[dictionary_id] = std::make_shared<SharedLibraryHandler>(library_path, library_settings, sample_block, attributes_names);
if (!library_handlers.count(dictionary_id))
library_handlers.emplace(std::make_pair(dictionary_id, std::make_shared<SharedLibraryHandler>(library_path, library_settings, sample_block, attributes_names)));
else
LOG_WARNING(&Poco::Logger::get("SharedLibraryHandlerFactory"), "Library handler with dictionary id {} already exists", dictionary_id);
}
void SharedLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
bool SharedLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
{
std::lock_guard lock(mutex);
auto from_library_handler = library_handlers.find(from_dictionary_id);
/// This is not supposed to happen as libClone is called from copy constructor of LibraryDictionarySource
/// object, and shared library handler of from_dictionary is removed only in its destructor.
/// And if for from_dictionary there was no shared library handler, it would have received and exception in
/// its constructor, so no libClone would be made from it.
if (from_library_handler == library_handlers.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No shared library handler found");
return false;
/// libClone method will be called in copy constructor
library_handlers[to_dictionary_id] = std::make_shared<SharedLibraryHandler>(*from_library_handler->second);
return true;
}
void SharedLibraryHandlerFactory::remove(const std::string & dictionary_id)
bool SharedLibraryHandlerFactory::remove(const std::string & dictionary_id)
{
std::lock_guard lock(mutex);
/// libDelete is called in destructor.
library_handlers.erase(dictionary_id);
return library_handlers.erase(dictionary_id);
}

View File

@ -24,9 +24,9 @@ public:
const Block & sample_block,
const std::vector<std::string> & attributes_names);
void clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id);
bool clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id);
void remove(const std::string & dictionary_id);
bool remove(const std::string & dictionary_id);
private:
/// map: dict_id -> sharedLibraryHandler

View File

@ -361,23 +361,22 @@
function postImpl(posted_request_num, query)
{
/// TODO: Check if URL already contains query string (append parameters).
const user = document.getElementById('user').value;
const password = document.getElementById('password').value;
let user = document.getElementById('user').value;
let password = document.getElementById('password').value;
const server_address = document.getElementById('url').value;
let server_address = document.getElementById('url').value;
let url = server_address +
const url = server_address +
(server_address.indexOf('?') >= 0 ? '&' : '?') +
/// Ask server to allow cross-domain requests.
'?add_http_cors_header=1' +
'add_http_cors_header=1' +
'&user=' + encodeURIComponent(user) +
'&password=' + encodeURIComponent(password) +
'&default_format=JSONCompact' +
/// Safety settings to prevent results that browser cannot display.
'&max_result_rows=1000&max_result_bytes=10000000&result_overflow_mode=break';
let xhr = new XMLHttpRequest;
const xhr = new XMLHttpRequest;
xhr.open('POST', url, true);
@ -391,12 +390,12 @@
/// The query is saved in browser history (in state JSON object)
/// as well as in URL fragment identifier.
if (query != previous_query) {
let state = {
const state = {
query: query,
status: this.status,
response: this.response.length > 100000 ? null : this.response /// Lower than the browser's limit.
};
let title = "ClickHouse Query: " + query;
const title = "ClickHouse Query: " + query;
let history_url = window.location.pathname + '?user=' + encodeURIComponent(user);
if (server_address != location.origin) {

View File

@ -33,24 +33,9 @@ Poco::URI IBridgeHelper::getPingURI() const
}
bool IBridgeHelper::checkBridgeIsRunning() const
void IBridgeHelper::startBridgeSync()
{
try
{
ReadWriteBufferFromHTTP buf(
getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
return checkString(PING_OK_ANSWER, buf);
}
catch (...)
{
return false;
}
}
void IBridgeHelper::startBridgeSync() const
{
if (!checkBridgeIsRunning())
if (!bridgeHandShake())
{
LOG_TRACE(getLog(), "{} is not running, will try to start it", serviceAlias());
startBridge(startBridgeCommand());
@ -64,7 +49,7 @@ void IBridgeHelper::startBridgeSync() const
++counter;
LOG_TRACE(getLog(), "Checking {} is running, try {}", serviceAlias(), counter);
if (checkBridgeIsRunning())
if (bridgeHandShake())
{
started = true;
break;
@ -81,7 +66,7 @@ void IBridgeHelper::startBridgeSync() const
}
std::unique_ptr<ShellCommand> IBridgeHelper::startBridgeCommand() const
std::unique_ptr<ShellCommand> IBridgeHelper::startBridgeCommand()
{
if (startBridgeManually())
throw Exception(serviceAlias() + " is not running. Please, start it manually", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);

View File

@ -28,16 +28,19 @@ public:
static const inline std::string MAIN_METHOD = Poco::Net::HTTPRequest::HTTP_POST;
explicit IBridgeHelper(ContextPtr context_) : WithContext(context_) {}
virtual ~IBridgeHelper() = default;
void startBridgeSync() const;
virtual ~IBridgeHelper() = default;
Poco::URI getMainURI() const;
Poco::URI getPingURI() const;
void startBridgeSync();
protected:
/// Check bridge is running. Can also check something else in the mean time.
virtual bool bridgeHandShake() = 0;
/// clickhouse-odbc-bridge, clickhouse-library-bridge
virtual String serviceAlias() const = 0;
@ -61,9 +64,7 @@ protected:
private:
bool checkBridgeIsRunning() const;
std::unique_ptr<ShellCommand> startBridgeCommand() const;
std::unique_ptr<ShellCommand> startBridgeCommand();
};
}

View File

@ -1,6 +1,5 @@
#include "LibraryBridgeHelper.h"
#include <IO/ReadHelpers.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/formatBlock.h>
@ -8,6 +7,8 @@
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
@ -20,16 +21,25 @@
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_LIBRARY_ERROR;
extern const int LOGICAL_ERROR;
}
LibraryBridgeHelper::LibraryBridgeHelper(
ContextPtr context_,
const Block & sample_block_,
const Field & dictionary_id_)
const Field & dictionary_id_,
const LibraryInitData & library_data_)
: IBridgeHelper(context_->getGlobalContext())
, log(&Poco::Logger::get("LibraryBridgeHelper"))
, sample_block(sample_block_)
, config(context_->getConfigRef())
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
, library_data(library_data_)
, dictionary_id(dictionary_id_)
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_))
{
bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT);
bridge_host = config.getString("library_bridge.host", DEFAULT_HOST);
@ -61,26 +71,91 @@ void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
}
bool LibraryBridgeHelper::initLibrary(const std::string & library_path, const std::string library_settings, const std::string attributes_names)
bool LibraryBridgeHelper::bridgeHandShake()
{
startBridgeSync();
auto uri = createRequestURI(LIB_NEW_METHOD);
String result;
try
{
ReadWriteBufferFromHTTP buf(createRequestURI(PING), Poco::Net::HTTPRequest::HTTP_GET, {}, http_timeouts);
readString(result, buf);
}
catch (...)
{
return false;
}
/*
* When pinging bridge we also pass current dicionary_id. The bridge will check if there is such
* dictionary. It is possible that such dictionary_id is not present only in two cases:
* 1. It is dictionary source creation and initialization of library handler on bridge side did not happen yet.
* 2. Bridge crashed or restarted for some reason while server did not.
**/
if (result.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {}. Check bridge and server have the same version.", result);
UInt8 dictionary_id_exists;
auto parsed = tryParse<UInt8>(dictionary_id_exists, result);
if (!parsed || (dictionary_id_exists != 0 && dictionary_id_exists != 1))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {} ({}). Check bridge and server have the same version.",
result, parsed ? toString(dictionary_id_exists) : "failed to parse");
LOG_TRACE(log, "dictionary_id: {}, dictionary_id_exists on bridge side: {}, library confirmed to be initialized on server side: {}",
toString(dictionary_id), toString(dictionary_id_exists), library_initialized);
if (dictionary_id_exists && !library_initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Library was not initialized, but bridge responded to already have dictionary id: {}", dictionary_id);
/// Here we want to say bridge to recreate a new library handler for current dictionary,
/// because it responded to have lost it, but we know that it has already been created. (It is a direct result of bridge crash).
if (!dictionary_id_exists && library_initialized)
{
LOG_WARNING(log, "Library bridge does not have library handler with dictionaty id: {}. It will be reinitialized.", dictionary_id);
bool reinitialized = false;
try
{
auto uri = createRequestURI(LIB_NEW_METHOD);
reinitialized = executeRequest(uri, getInitLibraryCallback());
}
catch (...)
{
tryLogCurrentException(log);
return false;
}
if (!reinitialized)
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR,
"Failed to reinitialize library handler on bridge side for dictionary with id: {}", dictionary_id);
}
return true;
}
ReadWriteBufferFromHTTP::OutStreamCallback LibraryBridgeHelper::getInitLibraryCallback() const
{
/// Sample block must contain null values
WriteBufferFromOwnString out;
auto output_stream = getContext()->getOutputStream(LibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block);
formatBlock(output_stream, sample_block);
auto block_string = out.str();
auto out_stream_callback = [library_path, library_settings, attributes_names, block_string, this](std::ostream & os)
return [block_string, this](std::ostream & os)
{
os << "library_path=" << escapeForFileName(library_path) << "&";
os << "library_settings=" << escapeForFileName(library_settings) << "&";
os << "attributes_names=" << escapeForFileName(attributes_names) << "&";
os << "library_path=" << escapeForFileName(library_data.library_path) << "&";
os << "library_settings=" << escapeForFileName(library_data.library_settings) << "&";
os << "attributes_names=" << escapeForFileName(library_data.dict_attributes) << "&";
os << "sample_block=" << escapeForFileName(sample_block.getNamesAndTypesList().toString()) << "&";
os << "null_values=" << escapeForFileName(block_string);
};
return executeRequest(uri, out_stream_callback);
}
bool LibraryBridgeHelper::initLibrary()
{
startBridgeSync();
auto uri = createRequestURI(LIB_NEW_METHOD);
library_initialized = executeRequest(uri, getInitLibraryCallback());
return library_initialized;
}
@ -89,15 +164,23 @@ bool LibraryBridgeHelper::cloneLibrary(const Field & other_dictionary_id)
startBridgeSync();
auto uri = createRequestURI(LIB_CLONE_METHOD);
uri.addQueryParameter("from_dictionary_id", toString(other_dictionary_id));
return executeRequest(uri);
/// We also pass initialization settings in order to create a library handler
/// in case from_dictionary_id does not exist in bridge side (possible in case of bridge crash).
library_initialized = executeRequest(uri, getInitLibraryCallback());
return library_initialized;
}
bool LibraryBridgeHelper::removeLibrary()
{
startBridgeSync();
auto uri = createRequestURI(LIB_DELETE_METHOD);
return executeRequest(uri);
/// Do not force bridge restart if it is not running in case of removeLibrary
/// because in this case after restart it will not have this dictionaty id in memory anyway.
if (bridgeHandShake())
{
auto uri = createRequestURI(LIB_DELETE_METHOD);
return executeRequest(uri);
}
return true;
}
@ -125,10 +208,12 @@ BlockInputStreamPtr LibraryBridgeHelper::loadAll()
}
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string ids_string)
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::vector<uint64_t> & ids)
{
startBridgeSync();
auto uri = createRequestURI(LOAD_IDS_METHOD);
uri.addQueryParameter("ids_num", toString(ids.size())); /// Not used parameter, but helpful
auto ids_string = getDictIdsString(ids);
return loadBase(uri, [ids_string](std::ostream & os) { os << ids_string; });
}
@ -149,13 +234,13 @@ BlockInputStreamPtr LibraryBridgeHelper::loadKeys(const Block & requested_block)
}
bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback)
bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback) const
{
ReadWriteBufferFromHTTP buf(
uri,
Poco::Net::HTTPRequest::HTTP_POST,
std::move(out_stream_callback),
ConnectionTimeouts::getHTTPTimeouts(getContext()));
http_timeouts);
bool res;
readBoolText(res, buf);
@ -169,7 +254,7 @@ BlockInputStreamPtr LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWri
uri,
Poco::Net::HTTPRequest::HTTP_POST,
std::move(out_stream_callback),
ConnectionTimeouts::getHTTPTimeouts(getContext()),
http_timeouts,
0,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
@ -179,4 +264,13 @@ BlockInputStreamPtr LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWri
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(read_buf_ptr));
}
String LibraryBridgeHelper::getDictIdsString(const std::vector<UInt64> & ids)
{
WriteBufferFromOwnString out;
writeVectorBinary(ids, out);
return out.str();
}
}

View File

@ -15,11 +15,18 @@ class LibraryBridgeHelper : public IBridgeHelper
{
public:
struct LibraryInitData
{
String library_path;
String library_settings;
String dict_attributes;
};
static constexpr inline size_t DEFAULT_PORT = 9012;
LibraryBridgeHelper(ContextPtr context_, const Block & sample_block, const Field & dictionary_id_);
LibraryBridgeHelper(ContextPtr context_, const Block & sample_block, const Field & dictionary_id_, const LibraryInitData & library_data_);
bool initLibrary(const std::string & library_path, std::string library_settings, std::string attributes_names);
bool initLibrary();
bool cloneLibrary(const Field & other_dictionary_id);
@ -31,16 +38,19 @@ public:
BlockInputStreamPtr loadAll();
BlockInputStreamPtr loadIds(std::string ids_string);
BlockInputStreamPtr loadIds(const std::vector<uint64_t> & ids);
BlockInputStreamPtr loadKeys(const Block & requested_block);
BlockInputStreamPtr loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {}) const;
LibraryInitData getLibraryData() const { return library_data; }
protected:
bool bridgeHandShake() override;
void startBridge(std::unique_ptr<ShellCommand> cmd) const override;
String serviceAlias() const override { return "clickhouse-library-bridge"; }
@ -61,6 +71,8 @@ protected:
Poco::URI createBaseURI() const override;
ReadWriteBufferFromHTTP::OutStreamCallback getInitLibraryCallback() const;
private:
static constexpr inline auto LIB_NEW_METHOD = "libNew";
static constexpr inline auto LIB_CLONE_METHOD = "libClone";
@ -69,18 +81,24 @@ private:
static constexpr inline auto LOAD_IDS_METHOD = "loadIds";
static constexpr inline auto LOAD_KEYS_METHOD = "loadKeys";
static constexpr inline auto IS_MODIFIED_METHOD = "isModified";
static constexpr inline auto PING = "ping";
static constexpr inline auto SUPPORTS_SELECTIVE_LOAD_METHOD = "supportsSelectiveLoad";
Poco::URI createRequestURI(const String & method) const;
static String getDictIdsString(const std::vector<UInt64> & ids);
Poco::Logger * log;
const Block sample_block;
const Poco::Util::AbstractConfiguration & config;
const Poco::Timespan http_timeout;
LibraryInitData library_data;
Field dictionary_id;
std::string bridge_host;
size_t bridge_port;
bool library_initialized = false;
ConnectionTimeouts http_timeouts;
};
}

View File

@ -60,20 +60,33 @@ public:
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
XDBCBridgeHelper(
ContextPtr context_,
Poco::Timespan http_timeout_,
const std::string & connection_string_)
: IXDBCBridgeHelper(context_->getGlobalContext())
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
, connection_string(connection_string_)
, http_timeout(http_timeout_)
, config(context_->getGlobalContext()->getConfigRef())
{
bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
}
ContextPtr context_,
Poco::Timespan http_timeout_,
const std::string & connection_string_)
: IXDBCBridgeHelper(context_->getGlobalContext())
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
, connection_string(connection_string_)
, http_timeout(http_timeout_)
, config(context_->getGlobalContext()->getConfigRef())
{
bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
}
protected:
bool bridgeHandShake() override
{
try
{
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
return checkString(PING_OK_ANSWER, buf);
}
catch (...)
{
return false;
}
}
auto getConnectionString() const { return connection_string; }
String getName() const override { return BridgeHelperMixin::getName(); }

View File

@ -450,7 +450,6 @@ class IColumn;
M(Bool, optimize_skip_merged_partitions, false, "Skip partitions with one part with level > 0 in optimize final", 0) \
M(Bool, optimize_on_insert, true, "Do the same transformation for inserted block of data as if merge was done on this block.", 0) \
M(Bool, allow_experimental_map_type, true, "Obsolete setting, does nothing.", 0) \
M(Bool, allow_experimental_window_functions, false, "Allow experimental window functions", 0) \
M(Bool, allow_experimental_projection_optimization, false, "Enable projection optimization when processing SELECT queries", 0) \
M(Bool, force_optimize_projection, false, "If projection optimization is enabled, SELECT queries need to use projection", 0) \
M(Bool, async_socket_for_remote, true, "Asynchronously read from socket executing remote query", 0) \

View File

@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
DEFAULT_MERGE_BLOCK_SIZE, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform));
@ -128,7 +128,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
DEFAULT_MERGE_BLOCK_SIZE, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform));

View File

@ -40,6 +40,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
extern const int INCORRECT_QUERY;
extern const int ALL_CONNECTION_TRIES_FAILED;
extern const int NO_ACTIVE_REPLICAS;
}
static constexpr const char * DROPPED_MARK = "DROPPED";
@ -137,7 +138,9 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
Coordination::Stat stat;
hosts = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
if (hosts.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No hosts found");
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No replicas of database {} found. "
"It's possible if the first replica is not fully created yet "
"or if the last replica was just dropped or due to logical error", database_name);
Int32 cversion = stat.cversion;
std::sort(hosts.begin(), hosts.end());
@ -514,6 +517,19 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
}
}
auto make_query_context = [this, current_zookeeper]()
{
auto query_context = Context::createCopy(getContext());
query_context->makeQueryContext();
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->getClientInfo().is_replicated_database_internal = true;
query_context->setCurrentDatabase(database_name);
query_context->setCurrentQueryId("");
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(current_zookeeper, zookeeper_path, false);
query_context->initZooKeeperMetadataTransaction(txn);
return query_context;
};
String db_name = getDatabaseName();
String to_db_name = getDatabaseName() + BROKEN_TABLES_SUFFIX;
if (total_tables * db_settings.max_broken_tables_ratio < tables_to_detach.size())
@ -548,7 +564,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
dropped_dictionaries += table->isDictionary();
table->flushAndShutdown();
DatabaseAtomic::dropTable(getContext(), table_name, true);
DatabaseAtomic::dropTable(make_query_context(), table_name, true);
}
else
{
@ -558,7 +574,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
assert(db_name < to_db_name);
DDLGuardPtr to_table_guard = DatabaseCatalog::instance().getDDLGuard(to_db_name, to_name);
auto to_db_ptr = DatabaseCatalog::instance().getDatabase(to_db_name);
DatabaseAtomic::renameTable(getContext(), table_name, *to_db_ptr, to_name, false, false);
DatabaseAtomic::renameTable(make_query_context(), table_name, *to_db_ptr, to_name, false, false);
++moved_tables;
}
}
@ -577,7 +593,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
/// TODO Maybe we should do it in two steps: rename all tables to temporary names and then rename them to actual names?
DDLGuardPtr table_guard = DatabaseCatalog::instance().getDDLGuard(db_name, std::min(from, to));
DDLGuardPtr to_table_guard = DatabaseCatalog::instance().getDDLGuard(db_name, std::max(from, to));
DatabaseAtomic::renameTable(getContext(), from, *this, to, false, false);
DatabaseAtomic::renameTable(make_query_context(), from, *this, to, false, false);
}
for (const auto & id : dropped_tables)
@ -592,15 +608,9 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
}
auto query_ast = parseQueryFromMetadataInZooKeeper(name_and_meta.first, name_and_meta.second);
auto query_context = Context::createCopy(getContext());
query_context->makeQueryContext();
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->setCurrentDatabase(database_name);
query_context->setCurrentQueryId(""); // generate random query_id
LOG_INFO(log, "Executing {}", serializeAST(*query_ast));
InterpreterCreateQuery(query_ast, query_context).execute();
auto create_query_context = make_query_context();
InterpreterCreateQuery(query_ast, create_query_context).execute();
}
current_zookeeper->set(replica_path + "/log_ptr", toString(max_log_ptr));

View File

@ -60,12 +60,13 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
/// Check if we need to recover replica.
/// Invariant: replica is lost if it's log_ptr value is less then max_log_ptr - logs_to_keep.
String log_ptr_str = current_zookeeper->get(database->replica_path + "/log_ptr");
auto zookeeper = getAndSetZooKeeper();
String log_ptr_str = zookeeper->get(database->replica_path + "/log_ptr");
UInt32 our_log_ptr = parse<UInt32>(log_ptr_str);
UInt32 max_log_ptr = parse<UInt32>(current_zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
logs_to_keep = parse<UInt32>(current_zookeeper->get(database->zookeeper_path + "/logs_to_keep"));
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
logs_to_keep = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/logs_to_keep"));
if (our_log_ptr == 0 || our_log_ptr + logs_to_keep < max_log_ptr)
database->recoverLostReplica(current_zookeeper, our_log_ptr, max_log_ptr);
database->recoverLostReplica(zookeeper, our_log_ptr, max_log_ptr);
else
last_skipped_entry_name.emplace(DDLTaskBase::getLogEntryName(our_log_ptr));
}
@ -198,7 +199,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
}
}
UInt32 our_log_ptr = parse<UInt32>(current_zookeeper->get(fs::path(database->replica_path) / "log_ptr"));
UInt32 our_log_ptr = parse<UInt32>(zookeeper->get(fs::path(database->replica_path) / "log_ptr"));
UInt32 entry_num = DatabaseReplicatedTask::getLogEntryNumber(entry_name);
if (entry_num <= our_log_ptr)

View File

@ -41,6 +41,9 @@ LibraryDictionarySource::LibraryDictionarySource(
, sample_block{sample_block_}
, context(Context::createCopy(context_))
{
if (fs::path(path).is_relative())
path = fs::canonical(path);
if (created_from_ddl && !pathStartsWith(path, context->getDictionariesLibPath()))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", path, context->getDictionariesLibPath());
@ -48,17 +51,32 @@ LibraryDictionarySource::LibraryDictionarySource(
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "LibraryDictionarySource: Can't load library {}: file doesn't exist", path);
description.init(sample_block);
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id);
auto res = bridge_helper->initLibrary(path, getLibrarySettingsString(config, config_prefix + ".settings"), getDictAttributesString());
if (!res)
LibraryBridgeHelper::LibraryInitData library_data
{
.library_path = path,
.library_settings = getLibrarySettingsString(config, config_prefix + ".settings"),
.dict_attributes = getDictAttributesString()
};
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, library_data);
if (!bridge_helper->initLibrary())
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to create shared library from path: {}", path);
}
LibraryDictionarySource::~LibraryDictionarySource()
{
bridge_helper->removeLibrary();
try
{
bridge_helper->removeLibrary();
}
catch (...)
{
tryLogCurrentException("LibraryDictionarySource");
}
}
@ -72,8 +90,9 @@ LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource &
, context(other.context)
, description{other.description}
{
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id);
bridge_helper->cloneLibrary(other.dictionary_id);
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, other.bridge_helper->getLibraryData());
if (!bridge_helper->cloneLibrary(other.dictionary_id))
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to clone library");
}
@ -99,7 +118,7 @@ BlockInputStreamPtr LibraryDictionarySource::loadAll()
BlockInputStreamPtr LibraryDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
return bridge_helper->loadIds(getDictIdsString(ids));
return bridge_helper->loadIds(ids);
}
@ -147,14 +166,6 @@ String LibraryDictionarySource::getLibrarySettingsString(const Poco::Util::Abstr
}
String LibraryDictionarySource::getDictIdsString(const std::vector<UInt64> & ids)
{
WriteBufferFromOwnString out;
writeVectorBinary(ids, out);
return out.str();
}
String LibraryDictionarySource::getDictAttributesString()
{
std::vector<String> attributes_names(dict_struct.attributes.size());

View File

@ -70,8 +70,6 @@ public:
std::string toString() const override;
private:
static String getDictIdsString(const std::vector<UInt64> & ids);
String getDictAttributesString();
static String getLibrarySettingsString(const Poco::Util::AbstractConfiguration & config, const std::string & config_root);
@ -82,7 +80,7 @@ private:
const DictionaryStructure dict_struct;
const std::string config_prefix;
const std::string path;
std::string path;
const Field dictionary_id;
Block sample_block;

View File

@ -31,6 +31,56 @@ std::mutex DiskLocal::reservation_mutex;
using DiskLocalPtr = std::shared_ptr<DiskLocal>;
static void loadDiskLocalConfig(const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
String & path,
UInt64 & keep_free_space_bytes)
{
path = config.getString(config_prefix + ".path", "");
if (name == "default")
{
if (!path.empty())
throw Exception(
"\"default\" disk path should be provided in <path> not it <storage_configuration>",
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
path = context->getPath();
}
else
{
if (path.empty())
throw Exception("Disk path can not be empty. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (path.back() != '/')
throw Exception("Disk path must end with /. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (!FS::canRead(path) || !FS::canWrite(path))
throw Exception("There is no RW access to the disk " + name + " (" + path + ")", ErrorCodes::PATH_ACCESS_DENIED);
bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio");
if (config.has(config_prefix + ".keep_free_space_bytes") && has_space_ratio)
throw Exception(
"Only one of 'keep_free_space_bytes' and 'keep_free_space_ratio' can be specified",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
keep_free_space_bytes = config.getUInt64(config_prefix + ".keep_free_space_bytes", 0);
if (has_space_ratio)
{
auto ratio = config.getDouble(config_prefix + ".keep_free_space_ratio");
if (ratio < 0 || ratio > 1)
throw Exception("'keep_free_space_ratio' have to be between 0 and 1", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
String tmp_path = path;
if (tmp_path.empty())
tmp_path = context->getPath();
// Create tmp disk for getting total disk space.
keep_free_space_bytes = static_cast<UInt64>(DiskLocal("tmp", tmp_path, 0).getTotalSpace() * ratio);
}
}
class DiskLocalReservation : public IReservation
{
public:
@ -317,6 +367,21 @@ SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
return std::make_unique<LocalDirectorySyncGuard>(fs::path(disk_path) / path);
}
void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &)
{
String new_disk_path;
UInt64 new_keep_free_space_bytes;
loadDiskLocalConfig(name, config, config_prefix, context, new_disk_path, new_keep_free_space_bytes);
if (disk_path != new_disk_path)
throw Exception("Disk path can't be updated from config " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (keep_free_space_bytes != new_keep_free_space_bytes)
keep_free_space_bytes = new_keep_free_space_bytes;
}
DiskPtr DiskLocalReservation::getDisk(size_t i) const
{
if (i != 0)
@ -334,7 +399,6 @@ void DiskLocalReservation::update(UInt64 new_size)
disk->reserved_bytes += size;
}
DiskLocalReservation::~DiskLocalReservation()
{
try
@ -369,48 +433,9 @@ void registerDiskLocal(DiskFactory & factory)
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr {
String path = config.getString(config_prefix + ".path", "");
if (name == "default")
{
if (!path.empty())
throw Exception(
"\"default\" disk path should be provided in <path> not it <storage_configuration>",
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
path = context->getPath();
}
else
{
if (path.empty())
throw Exception("Disk path can not be empty. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (path.back() != '/')
throw Exception("Disk path must end with /. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (!FS::canRead(path) || !FS::canWrite(path))
throw Exception("There is no RW access to the disk " + name + " (" + path + ")", ErrorCodes::PATH_ACCESS_DENIED);
bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio");
if (config.has(config_prefix + ".keep_free_space_bytes") && has_space_ratio)
throw Exception(
"Only one of 'keep_free_space_bytes' and 'keep_free_space_ratio' can be specified",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
UInt64 keep_free_space_bytes = config.getUInt64(config_prefix + ".keep_free_space_bytes", 0);
if (has_space_ratio)
{
auto ratio = config.getDouble(config_prefix + ".keep_free_space_ratio");
if (ratio < 0 || ratio > 1)
throw Exception("'keep_free_space_ratio' have to be between 0 and 1", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
String tmp_path = path;
if (tmp_path.empty())
tmp_path = context->getPath();
// Create tmp disk for getting total disk space.
keep_free_space_bytes = static_cast<UInt64>(DiskLocal("tmp", tmp_path, 0).getTotalSpace() * ratio);
}
String path;
UInt64 keep_free_space_bytes;
loadDiskLocalConfig(name, config, config_prefix, context, path, keep_free_space_bytes);
return std::make_shared<DiskLocal>(name, path, keep_free_space_bytes);
};
factory.registerDiskType("local", creator);

View File

@ -5,6 +5,7 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
@ -104,13 +105,15 @@ public:
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &) override;
private:
bool tryReserve(UInt64 bytes);
private:
const String name;
const String disk_path;
const UInt64 keep_free_space_bytes;
std::atomic<UInt64> keep_free_space_bytes;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
@ -120,4 +123,5 @@ private:
Poco::Logger * log = &Poco::Logger::get("DiskLocal");
};
}

View File

@ -32,7 +32,7 @@ public:
/// Get all disks with names
const DisksMap & getDisksMap() const { return disks; }
void addToDiskMap(String name, DiskPtr disk)
void addToDiskMap(const String & name, DiskPtr disk)
{
disks.emplace(name, disk);
}

View File

@ -13,9 +13,9 @@
#include <mutex>
#include <utility>
#include <boost/noncopyable.hpp>
#include "Poco/Util/AbstractConfiguration.h"
#include <Poco/Timestamp.h>
#include <filesystem>
#include "Poco/Util/AbstractConfiguration.h"
namespace fs = std::filesystem;

View File

@ -363,7 +363,8 @@ int DiskS3::readSchemaVersion(const String & source_bucket, const String & sourc
settings->client,
source_bucket,
source_path + SCHEMA_VERSION_OBJECT,
settings->s3_max_single_read_retries);
settings->s3_max_single_read_retries,
DBMS_DEFAULT_BUFFER_SIZE);
readIntText(version, buffer);

View File

@ -1218,17 +1218,36 @@ public:
{
return res;
}
else if ((isColumnedAsDecimal(left_type) || isColumnedAsDecimal(right_type))
// Comparing Date and DateTime64 requires implicit conversion,
// otherwise Date is treated as number.
&& !(date_and_datetime && (isDate(left_type) || isDate(right_type))))
else if ((isColumnedAsDecimal(left_type) || isColumnedAsDecimal(right_type)))
{
// compare
if (!allowDecimalComparison(left_type, right_type) && !date_and_datetime)
throw Exception("No operation " + getName() + " between " + left_type->getName() + " and " + right_type->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
// Comparing Date and DateTime64 requires implicit conversion,
if (date_and_datetime && (isDate(left_type) || isDate(right_type)))
{
DataTypePtr common_type = getLeastSupertype({left_type, right_type});
ColumnPtr c0_converted = castColumn(col_with_type_and_name_left, common_type);
ColumnPtr c1_converted = castColumn(col_with_type_and_name_right, common_type);
return executeDecimal({c0_converted, common_type, "left"}, {c1_converted, common_type, "right"});
}
else
{
// compare
if (!allowDecimalComparison(left_type, right_type) && !date_and_datetime)
throw Exception(
"No operation " + getName() + " between " + left_type->getName() + " and " + right_type->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return executeDecimal(col_with_type_and_name_left, col_with_type_and_name_right);
}
return executeDecimal(col_with_type_and_name_left, col_with_type_and_name_right);
}
else if (date_and_datetime)
{
DataTypePtr common_type = getLeastSupertype({left_type, right_type});
ColumnPtr c0_converted = castColumn(col_with_type_and_name_left, common_type);
ColumnPtr c1_converted = castColumn(col_with_type_and_name_right, common_type);
if (!((res = executeNumLeftType<UInt32>(c0_converted.get(), c1_converted.get()))
|| (res = executeNumLeftType<UInt64>(c0_converted.get(), c1_converted.get()))))
throw Exception("Date related common types can only be UInt32 or UInt64", ErrorCodes::LOGICAL_ERROR);
return res;
}
else if (left_type->equals(*right_type))
{

View File

@ -113,12 +113,34 @@ namespace MultiRegexps
ScratchPtr scratch;
};
class RegexpsConstructor
{
public:
RegexpsConstructor() = default;
void setConstructor(std::function<Regexps()> constructor_) { constructor = std::move(constructor_); }
Regexps * operator()()
{
std::unique_lock lock(mutex);
if (regexp)
return &*regexp;
regexp = constructor();
return &*regexp;
}
private:
std::function<Regexps()> constructor;
std::optional<Regexps> regexp;
std::mutex mutex;
};
struct Pool
{
/// Mutex for finding in map.
std::mutex mutex;
/// Patterns + possible edit_distance to database and scratch.
std::map<std::pair<std::vector<String>, std::optional<UInt32>>, Regexps> storage;
std::map<std::pair<std::vector<String>, std::optional<UInt32>>, RegexpsConstructor> storage;
};
template <bool save_indices, bool CompileForEditDistance>
@ -250,15 +272,19 @@ namespace MultiRegexps
/// If not found, compile and let other threads wait.
if (known_regexps.storage.end() == it)
{
it = known_regexps.storage
.emplace(
std::pair{str_patterns, edit_distance},
constructRegexps<save_indices, CompileForEditDistance>(str_patterns, edit_distance))
.emplace(std::piecewise_construct, std::make_tuple(std::move(str_patterns), edit_distance), std::make_tuple())
.first;
/// If found, unlock and return the database.
lock.unlock();
it->second.setConstructor([&str_patterns = it->first.first, edit_distance]()
{
return constructRegexps<save_indices, CompileForEditDistance>(str_patterns, edit_distance);
});
}
return &it->second;
/// Unlock before possible construction.
lock.unlock();
return it->second();
}
}

View File

@ -43,7 +43,7 @@ public:
const String & bucket_,
const String & key_,
UInt64 max_single_read_retries_,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
size_t buffer_size_);
bool nextImpl() override;

View File

@ -403,7 +403,6 @@ bool tryReadIntText(T & x, ReadBuffer & buf) // -V1071
* Differs in following:
* - for numbers starting with zero, parsed only zero;
* - symbol '+' before number is not supported;
* - symbols :;<=>? are parsed as some numbers.
*/
template <typename T, bool throw_on_error = true>
void readIntTextUnsafe(T & x, ReadBuffer & buf)
@ -437,15 +436,12 @@ void readIntTextUnsafe(T & x, ReadBuffer & buf)
while (!buf.eof())
{
/// This check is suddenly faster than
/// unsigned char c = *buf.position() - '0';
/// if (c < 10)
/// for unknown reason on Xeon E5645.
unsigned char value = *buf.position() - '0';
if ((*buf.position() & 0xF0) == 0x30) /// It makes sense to have this condition inside loop.
if (value < 10)
{
res *= 10;
res += *buf.position() & 0x0F;
res += value;
++buf.position();
}
else

View File

@ -5,7 +5,7 @@ LIBRARY()
ADDINCL(
contrib/libs/zstd/include
contrib/restricted/fast_float
contrib/restricted/fast_float/include
)
PEERDIR(

View File

@ -4,7 +4,7 @@ LIBRARY()
ADDINCL(
contrib/libs/zstd/include
contrib/restricted/fast_float
contrib/restricted/fast_float/include
)
PEERDIR(

View File

@ -77,6 +77,7 @@ AsynchronousMetrics::AsynchronousMetrics(
, update_period(update_period_seconds)
, servers_to_start_before_tables(servers_to_start_before_tables_)
, servers(servers_)
, log(&Poco::Logger::get("AsynchronousMetrics"))
{
#if defined(OS_LINUX)
openFileIfExists("/proc/meminfo", meminfo);
@ -174,26 +175,39 @@ AsynchronousMetrics::AsynchronousMetrics(
edac.back().second = openFileIfExists(edac_uncorrectable_file);
}
if (std::filesystem::exists("/sys/block"))
{
for (const auto & device_dir : std::filesystem::directory_iterator("/sys/block"))
{
String device_name = device_dir.path().filename();
/// We are not interested in loopback devices.
if (device_name.starts_with("loop"))
continue;
std::unique_ptr<ReadBufferFromFilePRead> file = openFileIfExists(device_dir.path() / "stat");
if (!file)
continue;
block_devs[device_name] = std::move(file);
}
}
openBlockDevices();
#endif
}
#if defined(OS_LINUX)
void AsynchronousMetrics::openBlockDevices()
{
LOG_TRACE(log, "Scanning /sys/block");
if (!std::filesystem::exists("/sys/block"))
return;
block_devices_rescan_delay.restart();
block_devs.clear();
for (const auto & device_dir : std::filesystem::directory_iterator("/sys/block"))
{
String device_name = device_dir.path().filename();
/// We are not interested in loopback devices.
if (device_name.starts_with("loop"))
continue;
std::unique_ptr<ReadBufferFromFilePRead> file = openFileIfExists(device_dir.path() / "stat");
if (!file)
continue;
block_devs[device_name] = std::move(file);
}
}
#endif
void AsynchronousMetrics::start()
{
/// Update once right now, to make metrics available just after server start
@ -550,7 +564,7 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
/// Log only if difference is high. This is for convenience. The threshold is arbitrary.
if (difference >= 1048576 || difference <= -1048576)
LOG_TRACE(&Poco::Logger::get("AsynchronousMetrics"),
LOG_TRACE(log,
"MemoryTracking: was {}, peak {}, will set to {} (RSS), difference: {}",
ReadableSize(amount),
ReadableSize(peak),
@ -877,6 +891,11 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
}
}
/// Update list of block devices periodically
/// (i.e. someone may add new disk to RAID array)
if (block_devices_rescan_delay.elapsedSeconds() >= 300)
openBlockDevices();
for (auto & [name, device] : block_devs)
{
try
@ -930,6 +949,16 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
}
catch (...)
{
/// Try to reopen block devices in case of error
/// (i.e. ENOENT means that some disk had been replaced, and it may apperas with a new name)
try
{
openBlockDevices();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -1303,9 +1332,9 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
new_values["AsynchronousMetricsCalculationTimeSpent"] = watch.elapsedSeconds();
/// Log the new metrics.
if (auto log = getContext()->getAsynchronousMetricLog())
if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog())
{
log->addValues(new_values);
asynchronous_metric_log->addValues(new_values);
}
first_run = false;

View File

@ -3,6 +3,7 @@
#include <Interpreters/Context_fwd.h>
#include <Common/MemoryStatisticsOS.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <IO/ReadBufferFromFile.h>
#include <condition_variable>
@ -15,6 +16,11 @@
#include <unordered_map>
namespace Poco
{
class Logger;
}
namespace DB
{
@ -175,12 +181,17 @@ private:
std::unordered_map<String /* device name */, NetworkInterfaceStatValues> network_interface_stats;
Stopwatch block_devices_rescan_delay;
void openBlockDevices();
#endif
std::unique_ptr<ThreadFromGlobalPool> thread;
void run();
void update(std::chrono::system_clock::time_point update_time);
Poco::Logger * log;
};
}

View File

@ -100,6 +100,8 @@ public:
UInt64 distributed_depth = 0;
bool is_replicated_database_internal = false;
bool empty() const { return query_kind == QueryKind::NO_QUERY; }
/** Serialization and deserialization.

View File

@ -359,6 +359,7 @@ ContextMutablePtr DatabaseReplicatedTask::makeQueryContext(ContextPtr from_conte
{
auto query_context = DDLTaskBase::makeQueryContext(from_context, zookeeper);
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->getClientInfo().is_replicated_database_internal = true;
query_context->setCurrentDatabase(database->getDatabaseName());
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(zookeeper, database->zookeeper_path, is_initial_query);

View File

@ -196,7 +196,7 @@ public:
void commit();
~ZooKeeperMetadataTransaction() { assert(isExecuted() || std::uncaught_exceptions()); }
~ZooKeeperMetadataTransaction() { assert(isExecuted() || std::uncaught_exceptions() || ops.empty()); }
};
ClusterPtr tryGetReplicatedDatabaseCluster(const String & cluster_name);

View File

@ -613,18 +613,6 @@ void makeWindowDescriptionFromAST(const Context & context,
void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr actions)
{
// Convenient to check here because at least we have the Context.
if (!syntax->window_function_asts.empty() &&
!getContext()->getSettingsRef().allow_experimental_window_functions)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"The support for window functions is experimental and will change"
" in backwards-incompatible ways in the future releases. Set"
" allow_experimental_window_functions = 1 to enable it."
" While processing '{}'",
syntax->window_function_asts[0]->formatForErrorMessage());
}
// Window definitions from the WINDOW clause
const auto * select_query = query->as<ASTSelectQuery>();
if (select_query && select_query->window())

View File

@ -63,7 +63,7 @@ public:
return;
bool is_table = false;
ASTPtr subquery_or_table_name = ast; /// ASTTableIdentifier | ASTSubquery | ASTTableExpression
ASTPtr subquery_or_table_name; /// ASTTableIdentifier | ASTSubquery | ASTTableExpression
if (const auto * ast_table_expr = ast->as<ASTTableExpression>())
{
@ -76,7 +76,14 @@ public:
}
}
else if (ast->as<ASTTableIdentifier>())
{
subquery_or_table_name = ast;
is_table = true;
}
else if (ast->as<ASTSubquery>())
{
subquery_or_table_name = ast;
}
if (!subquery_or_table_name)
throw Exception("Global subquery requires subquery or table name", ErrorCodes::WRONG_GLOBAL_SUBQUERY);

View File

@ -37,7 +37,7 @@ public:
virtual size_t getTotalRowCount() const = 0;
virtual size_t getTotalByteCount() const = 0;
virtual bool alwaysReturnsEmptySet() const { return false; }
virtual bool alwaysReturnsEmptySet() const = 0;
/// StorageJoin/Dictionary is already filled. No need to call addJoinedBlock.
/// Different query plan is used for such joins.

View File

@ -54,7 +54,7 @@ BlockIO InterpreterAlterQuery::execute()
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
&& getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
&& !getContext()->getClientInfo().is_replicated_database_internal)
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
@ -100,7 +100,8 @@ BlockIO InterpreterAlterQuery::execute()
if (typeid_cast<DatabaseReplicated *>(database.get()))
{
int command_types_count = !mutation_commands.empty() + !partition_commands.empty() + !live_view_commands.empty() + !alter_commands.empty();
if (1 < command_types_count)
bool mixed_settings_amd_metadata_alter = alter_commands.hasSettingsAlterCommand() && !alter_commands.isSettingsAlter();
if (1 < command_types_count || mixed_settings_amd_metadata_alter)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "For Replicated databases it's not allowed "
"to execute ALTERs of different types in single query");
}

View File

@ -856,7 +856,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, create.table);
if (auto* ptr = typeid_cast<DatabaseReplicated *>(database.get());
ptr && getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
ptr && !getContext()->getClientInfo().is_replicated_database_internal)
{
create.database = database_name;
guard->releaseTableLock();
@ -950,7 +950,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
auto guard = DatabaseCatalog::instance().getDDLGuard(create.database, create.table);
if (auto * ptr = typeid_cast<DatabaseReplicated *>(database.get());
ptr && getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
ptr && !getContext()->getClientInfo().is_replicated_database_internal)
{
assertOrSetUUID(create, database);
guard->releaseTableLock();

View File

@ -133,7 +133,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
/// Prevents recursive drop from drop database query. The original query must specify a table.
bool is_drop_or_detach_database = query_ptr->as<ASTDropQuery>()->table.empty();
bool is_replicated_ddl_query = typeid_cast<DatabaseReplicated *>(database.get()) &&
getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY &&
!getContext()->getClientInfo().is_replicated_database_internal &&
!is_drop_or_detach_database;
AccessFlags drop_storage;
@ -426,6 +426,7 @@ void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr
if (auto txn = current_context->getZooKeeperMetadataTransaction())
{
/// For Replicated database
drop_context->getClientInfo().is_replicated_database_internal = true;
drop_context->setQueryContext(std::const_pointer_cast<Context>(current_context));
drop_context->initZooKeeperMetadataTransaction(txn, true);
}

View File

@ -81,7 +81,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c
DatabasePtr database = database_catalog.getDatabase(elem.from_database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
&& getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
&& !getContext()->getClientInfo().is_replicated_database_internal)
{
if (1 < descriptions.size())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Database {} is Replicated, "

View File

@ -1928,11 +1928,13 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
}
}
/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
UInt64 limit = (query.hasFiltration() || query.groupBy()) ? 0 : getLimitForSorting(query, context);
if (query_info.projection)
query_info.projection->input_order_info
= query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context);
= query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context, limit);
else
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context);
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit);
}
StreamLocalLimits limits;
@ -2292,8 +2294,14 @@ void InterpreterSelectQuery::executeOrderOptimized(QueryPlan & query_plan, Input
{
const Settings & settings = context->getSettingsRef();
const auto & query = getSelectQuery();
auto finish_sorting_step = std::make_unique<FinishSortingStep>(
query_plan.getCurrentDataStream(), input_sorting_info->order_key_prefix_descr, output_order_descr, settings.max_block_size, limit);
query_plan.getCurrentDataStream(),
input_sorting_info->order_key_prefix_descr,
output_order_descr,
settings.max_block_size,
limit,
query.hasFiltration());
query_plan.addStep(std::move(finish_sorting_step));
}

View File

@ -551,8 +551,6 @@ std::vector<TableNeededColumns> normalizeColumnNamesExtractNeeded(
else
needed_columns[*table_pos].no_clashes.emplace(ident->shortName());
}
else if (!got_alias)
throw Exception("Unknown column name '" + ident->name() + "'", ErrorCodes::UNKNOWN_IDENTIFIER);
}
return needed_columns;

View File

@ -32,6 +32,8 @@ public:
size_t getTotalRowCount() const override { return right_blocks.row_count; }
size_t getTotalByteCount() const override { return right_blocks.bytes; }
/// Has to be called only after setTotals()/mergeRightBlocks()
bool alwaysReturnsEmptySet() const override { return (is_right || is_inner) && min_max_right_blocks.empty(); }
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTKillQueryQuery.h>
#include <Parsers/queryNormalization.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Common/CurrentThread.h>
@ -297,7 +298,10 @@ QueryStatus::QueryStatus(
{
}
QueryStatus::~QueryStatus() = default;
QueryStatus::~QueryStatus()
{
assert(executors.empty());
}
void QueryStatus::setQueryStreams(const BlockIO & io)
{
@ -351,6 +355,11 @@ CancellationCode QueryStatus::cancelQuery(bool kill)
BlockInputStreamPtr input_stream;
BlockOutputStreamPtr output_stream;
SCOPE_EXIT({
std::lock_guard lock(query_streams_mutex);
for (auto * e : executors)
e->cancel();
});
if (tryGetQueryStreams(input_stream, output_stream))
{
@ -366,6 +375,20 @@ CancellationCode QueryStatus::cancelQuery(bool kill)
return CancellationCode::CancelSent;
}
void QueryStatus::addPipelineExecutor(PipelineExecutor * e)
{
std::lock_guard lock(query_streams_mutex);
assert(std::find(executors.begin(), executors.end(), e) == executors.end());
executors.push_back(e);
}
void QueryStatus::removePipelineExecutor(PipelineExecutor * e)
{
std::lock_guard lock(query_streams_mutex);
assert(std::find(executors.begin(), executors.end(), e) != executors.end());
std::erase_if(executors, [e](PipelineExecutor * x) { return x == e; });
}
void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
{

View File

@ -22,6 +22,7 @@
#include <mutex>
#include <shared_mutex>
#include <unordered_map>
#include <vector>
namespace CurrentMetrics
@ -34,6 +35,7 @@ namespace DB
struct Settings;
class IAST;
class PipelineExecutor;
struct ProcessListForUser;
class QueryStatus;
@ -109,6 +111,9 @@ protected:
BlockInputStreamPtr query_stream_in;
BlockOutputStreamPtr query_stream_out;
/// Array of PipelineExecutors to be cancelled when a cancelQuery is received
std::vector<PipelineExecutor *> executors;
enum QueryStreamsStatus
{
NotInitialized,
@ -183,6 +188,12 @@ public:
CancellationCode cancelQuery(bool kill);
bool isKilled() const { return is_killed; }
/// Adds a pipeline to the QueryStatus
void addPipelineExecutor(PipelineExecutor * e);
/// Removes a pipeline to the QueryStatus
void removePipelineExecutor(PipelineExecutor * e);
};

View File

@ -262,7 +262,11 @@ static void onExceptionBeforeStart(const String & query_for_logging, ContextPtr
elem.query = query_for_logging;
elem.normalized_query_hash = normalizedQueryHash<false>(query_for_logging);
// We don't calculate query_kind, databases, tables and columns when the query isn't able to start
// Try log query_kind if ast is valid
if (ast)
elem.query_kind = ast->getQueryKindString();
// We don't calculate databases, tables and columns when the query isn't able to start
elem.exception_code = getCurrentExceptionCode();
elem.exception = getCurrentExceptionMessage(false);
@ -672,7 +676,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
/// Common code for finish and exception callbacks
auto status_info_to_query_log = [](QueryLogElement &element, const QueryStatusInfo &info, const ASTPtr query_ast, ContextMutablePtr context_ptr) mutable
auto status_info_to_query_log = [](QueryLogElement & element, const QueryStatusInfo & info, const ASTPtr query_ast, const ContextPtr context_ptr) mutable
{
DB::UInt64 query_time = info.elapsed_seconds * 1000000;
ProfileEvents::increment(ProfileEvents::QueryTimeMicroseconds, query_time);
@ -706,6 +710,17 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
element.query_columns.insert(access_info.columns.begin(), access_info.columns.end());
element.query_projections.insert(access_info.projections.begin(), access_info.projections.end());
element.query_views.insert(access_info.views.begin(), access_info.views.end());
const auto & factories_info = context_ptr->getQueryFactoriesInfo();
element.used_aggregate_functions = factories_info.aggregate_functions;
element.used_aggregate_function_combinators = factories_info.aggregate_function_combinators;
element.used_database_engines = factories_info.database_engines;
element.used_data_type_families = factories_info.data_type_families;
element.used_dictionaries = factories_info.dictionaries;
element.used_formats = factories_info.formats;
element.used_functions = factories_info.functions;
element.used_storages = factories_info.storages;
element.used_table_functions = factories_info.table_functions;
};
/// Also make possible for caller to log successful query finish and exception during execution.
@ -777,20 +792,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ReadableSize(elem.read_bytes / elapsed_seconds));
}
elem.thread_ids = std::move(info.thread_ids);
elem.profile_counters = std::move(info.profile_counters);
const auto & factories_info = context->getQueryFactoriesInfo();
elem.used_aggregate_functions = factories_info.aggregate_functions;
elem.used_aggregate_function_combinators = factories_info.aggregate_function_combinators;
elem.used_database_engines = factories_info.database_engines;
elem.used_data_type_families = factories_info.data_type_families;
elem.used_dictionaries = factories_info.dictionaries;
elem.used_formats = factories_info.formats;
elem.used_functions = factories_info.functions;
elem.used_storages = factories_info.storages;
elem.used_table_functions = factories_info.table_functions;
if (log_queries && elem.type >= log_queries_min_type && Int64(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context->getQueryLog())
@ -1020,22 +1021,31 @@ void executeQuery(
const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
WriteBuffer * out_buf = &ostr;
std::optional<WriteBufferFromFile> out_file_buf;
std::unique_ptr<WriteBuffer> compressed_buffer;
if (ast_query_with_output && ast_query_with_output->out_file)
{
if (!allow_into_outfile)
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
const auto & out_file = ast_query_with_output->out_file->as<ASTLiteral &>().value.safeGet<std::string>();
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
out_buf = &*out_file_buf;
compressed_buffer = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
chooseCompressionMethod(out_file, ""),
/* compression level = */ 3
);
}
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
auto out = FormatFactory::instance().getOutputStreamParallelIfPossible(format_name, *out_buf, streams.in->getHeader(), context, {}, output_format_settings);
auto out = FormatFactory::instance().getOutputStreamParallelIfPossible(
format_name,
compressed_buffer ? *compressed_buffer : *out_buf,
streams.in->getHeader(),
context,
{},
output_format_settings);
/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context->getProgressCallback();
@ -1059,15 +1069,18 @@ void executeQuery(
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
WriteBuffer * out_buf = &ostr;
std::optional<WriteBufferFromFile> out_file_buf;
std::unique_ptr<WriteBuffer> compressed_buffer;
if (ast_query_with_output && ast_query_with_output->out_file)
{
if (!allow_into_outfile)
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>();
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
out_buf = &*out_file_buf;
compressed_buffer = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
chooseCompressionMethod(out_file, ""),
/* compression level = */ 3
);
}
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
@ -1081,7 +1094,14 @@ void executeQuery(
return std::make_shared<MaterializingTransform>(header);
});
auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name, *out_buf, pipeline.getHeader(), context, {}, output_format_settings);
auto out = FormatFactory::instance().getOutputFormatParallelIfPossible(
format_name,
compressed_buffer ? *compressed_buffer : *out_buf,
pipeline.getHeader(),
context,
{},
output_format_settings);
out->setAutoFlush();
/// Save previous progress callback if any. TODO Do it more conveniently.

View File

@ -225,6 +225,8 @@ public:
return removeOnCluster<ASTAlterQuery>(clone(), new_database);
}
const char * getQueryKindString() const override { return "Alter"; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -102,6 +102,8 @@ public:
bool isView() const { return is_ordinary_view || is_materialized_view || is_live_view; }
const char * getQueryKindString() const override { return "Create"; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};

View File

@ -45,6 +45,8 @@ public:
return removeOnCluster<ASTDropQuery>(clone(), new_database);
}
const char * getQueryKindString() const override { return "Drop"; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override;
};

View File

@ -34,5 +34,6 @@ public:
void replaceEmptyDatabase(const String & current_database);
void replaceCurrentUserTag(const String & current_user_name) const;
ASTPtr getRewrittenASTWithoutOnCluster(const std::string &) const override { return removeOnCluster<ASTGrantQuery>(clone()); }
const char * getQueryKindString() const override { return is_revoke ? "Revoke" : "Grant"; }
};
}

View File

@ -47,6 +47,8 @@ public:
return res;
}
const char * getQueryKindString() const override { return "Insert"; }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};

View File

@ -61,6 +61,8 @@ public:
return query_ptr;
}
const char * getQueryKindString() const override { return "Rename"; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override
{

View File

@ -69,6 +69,8 @@ public:
const ASTPtr limitLength() const { return getExpression(Expression::LIMIT_LENGTH); }
const ASTPtr settings() const { return getExpression(Expression::SETTINGS); }
bool hasFiltration() const { return where() || prewhere() || having(); }
/// Set/Reset/Remove expression.
void setExpression(Expression expr, ASTPtr && ast);
@ -95,6 +97,8 @@ public:
void setFinal();
const char * getQueryKindString() const override { return "Select"; }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -16,6 +16,8 @@ public:
ASTPtr clone() const override;
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
const char * getQueryKindString() const override { return "Select"; }
enum class Mode
{
Unspecified,

View File

@ -86,6 +86,8 @@ public:
return removeOnCluster<ASTSystemQuery>(clone(), new_database);
}
const char * getQueryKindString() const override { return "System"; }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -231,6 +231,9 @@ public:
void cloneChildren();
// Return query_kind string representation of this AST query.
virtual const char * getQueryKindString() const { return ""; }
public:
/// For syntax highlighting.
static const char * hilite_keyword;

View File

@ -45,6 +45,8 @@ PipelineExecutor::PipelineExecutor(Processors & processors_, QueryStatus * elem)
try
{
graph = std::make_unique<ExecutingGraph>(processors);
if (process_list_element)
process_list_element->addPipelineExecutor(this);
}
catch (Exception & exception)
{
@ -59,6 +61,12 @@ PipelineExecutor::PipelineExecutor(Processors & processors_, QueryStatus * elem)
}
}
PipelineExecutor::~PipelineExecutor()
{
if (process_list_element)
process_list_element->removePipelineExecutor(this);
}
void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack)
{
UInt64 num_processors = processors.size();

View File

@ -31,6 +31,7 @@ public:
///
/// Explicit graph representation is built in constructor. Throws if graph is not correct.
explicit PipelineExecutor(Processors & processors_, QueryStatus * elem = nullptr);
~PipelineExecutor();
/// Execute pipeline in multiple threads. Must be called once.
/// In case of exception during execution throws any occurred.
@ -127,7 +128,7 @@ private:
ProcessorsMap processors_map;
/// Now it's used to check if query was killed.
QueryStatus * process_list_element = nullptr;
QueryStatus * const process_list_element = nullptr;
/// Graph related methods.
bool expandPipeline(Stack & stack, UInt64 pid);

View File

@ -16,7 +16,7 @@ public:
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -20,7 +20,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -19,7 +19,7 @@ public:
SortDescription description,
size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
params,

View File

@ -15,7 +15,7 @@ public:
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -14,9 +14,11 @@ IMergingTransformBase::IMergingTransformBase(
size_t num_inputs,
const Block & input_header,
const Block & output_header,
bool have_all_inputs_)
bool have_all_inputs_,
bool has_limit_below_one_block_)
: IProcessor(InputPorts(num_inputs, input_header), {output_header})
, have_all_inputs(have_all_inputs_)
, has_limit_below_one_block(has_limit_below_one_block_)
{
}
@ -64,10 +66,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
continue;
if (input_states[i].is_initialized)
{
// input.setNotNeeded();
continue;
}
input.setNeeded();
@ -77,12 +76,17 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
continue;
}
auto chunk = input.pull();
/// setNotNeeded after reading first chunk, because in optimismtic case
/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
/// we won't have to read any chunks anymore;
auto chunk = input.pull(has_limit_below_one_block);
if (!chunk.hasRows())
{
if (!input.isFinished())
{
input.setNeeded();
all_inputs_has_data = false;
}
continue;
}

View File

@ -16,7 +16,8 @@ public:
size_t num_inputs,
const Block & input_header,
const Block & output_header,
bool have_all_inputs_);
bool have_all_inputs_,
bool has_limit_below_one_block_);
OutputPort & getOutputPort() { return outputs.front(); }
@ -66,6 +67,7 @@ private:
std::vector<InputState> input_states;
std::atomic<bool> have_all_inputs;
bool is_initialized = false;
bool has_limit_below_one_block = false;
IProcessor::Status prepareInitializeInputs();
};
@ -81,8 +83,9 @@ public:
const Block & input_header,
const Block & output_header,
bool have_all_inputs_,
bool has_limit_below_one_block_,
Args && ... args)
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_)
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, has_limit_below_one_block_)
, algorithm(std::forward<Args>(args) ...)
{
}

View File

@ -13,12 +13,13 @@ MergingSortedTransform::MergingSortedTransform(
SortDescription description_,
size_t max_block_size,
UInt64 limit_,
bool has_limit_below_one_block_,
WriteBuffer * out_row_sources_buf_,
bool quiet_,
bool use_average_block_sizes,
bool have_all_inputs_)
: IMergingTransform(
num_inputs, header, header, have_all_inputs_,
num_inputs, header, header, have_all_inputs_, has_limit_below_one_block_,
header,
num_inputs,
std::move(description_),

View File

@ -17,6 +17,7 @@ public:
SortDescription description,
size_t max_block_size,
UInt64 limit_ = 0,
bool has_limit_below_one_block_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool quiet_ = false,
bool use_average_block_sizes = false,

View File

@ -18,7 +18,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -19,7 +19,7 @@ public:
const Names & partition_key_columns,
size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -19,7 +19,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -31,12 +31,14 @@ FinishSortingStep::FinishSortingStep(
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size_,
UInt64 limit_)
UInt64 limit_,
bool has_filtration_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits(limit_))
, prefix_description(std::move(prefix_description_))
, result_description(std::move(result_description_))
, max_block_size(max_block_size_)
, limit(limit_)
, has_filtration(has_filtration_)
{
/// TODO: check input_stream is sorted by prefix_description.
output_stream->sort_description = result_description;
@ -58,11 +60,14 @@ void FinishSortingStep::transformPipeline(QueryPipeline & pipeline, const BuildQ
if (pipeline.getNumStreams() > 1)
{
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
bool has_limit_below_one_block = !has_filtration && limit_for_merging && limit_for_merging < max_block_size;
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
prefix_description,
max_block_size, limit_for_merging);
max_block_size,
limit_for_merging,
has_limit_below_one_block);
pipeline.addTransform(std::move(transform));
}

View File

@ -13,8 +13,9 @@ public:
const DataStream & input_stream_,
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size,
UInt64 limit);
size_t max_block_size_,
UInt64 limit_,
bool has_filtration_);
String getName() const override { return "FinishSorting"; }
@ -31,6 +32,7 @@ private:
SortDescription result_description;
size_t max_block_size;
UInt64 limit;
bool has_filtration;
};
}

View File

@ -13,7 +13,7 @@
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
@ -179,26 +179,32 @@ template<typename TSource>
ProcessorPtr ReadFromMergeTree::createSource(
const RangesInDataPart & part,
const Names & required_columns,
bool use_uncompressed_cache)
bool use_uncompressed_cache,
bool has_limit_below_one_block)
{
return std::make_shared<TSource>(
data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache,
prewhere_info, actions_settings, true, reader_settings, virt_column_names, part.part_index_in_query);
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, prewhere_info,
actions_settings, true, reader_settings, virt_column_names, part.part_index_in_query, has_limit_below_one_block);
}
Pipe ReadFromMergeTree::readInOrder(
RangesInDataParts parts_with_range,
Names required_columns,
ReadType read_type,
bool use_uncompressed_cache)
bool use_uncompressed_cache,
UInt64 limit)
{
Pipes pipes;
/// For reading in order it makes sense to read only
/// one range per task to reduce number of read rows.
bool has_limit_below_one_block = read_type != ReadType::Default && limit && limit < max_block_size;
for (const auto & part : parts_with_range)
{
auto source = read_type == ReadType::InReverseOrder
? createSource<MergeTreeReverseSelectProcessor>(part, required_columns, use_uncompressed_cache)
: createSource<MergeTreeSelectProcessor>(part, required_columns, use_uncompressed_cache);
? createSource<MergeTreeReverseSelectProcessor>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block)
: createSource<MergeTreeInOrderSelectProcessor>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block);
pipes.emplace_back(std::move(source));
}
@ -224,7 +230,7 @@ Pipe ReadFromMergeTree::read(
return readFromPool(parts_with_range, required_columns, max_streams,
min_marks_for_concurrent_read, use_uncompressed_cache);
auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache);
auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache, 0);
/// Use ConcatProcessor to concat sources together.
/// It is needed to read in parts order (and so in PK order) if single thread is used.
@ -403,7 +409,6 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
{
RangesInDataPart part = parts_with_ranges.back();
parts_with_ranges.pop_back();
size_t & marks_in_part = info.sum_marks_in_parts.back();
/// We will not take too few rows from a part.
@ -418,8 +423,13 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
MarkRanges ranges_to_get_from_part;
/// We take full part if it contains enough marks or
/// if we know limit and part contains less than 'limit' rows.
bool take_full_part = marks_in_part <= need_marks
|| (input_order_info->limit && input_order_info->limit < part.getRowsCount());
/// We take the whole part if it is small enough.
if (marks_in_part <= need_marks)
if (take_full_part)
{
ranges_to_get_from_part = part.ranges;
@ -449,6 +459,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
}
parts_with_ranges.emplace_back(part);
}
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part));
}
@ -457,8 +468,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
? ReadFromMergeTree::ReadType::InOrder
: ReadFromMergeTree::ReadType::InReverseOrder;
pipes.emplace_back(read(std::move(new_parts), column_names, read_type,
requested_num_streams, info.min_marks_for_concurrent_read, info.use_uncompressed_cache));
pipes.emplace_back(readInOrder(std::move(new_parts), column_names, read_type,
info.use_uncompressed_cache, input_order_info->limit));
}
if (need_preliminary_merge)
@ -486,7 +497,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
pipe.getHeader(),
pipe.numOutputPorts(),
sort_description,
max_block_size);
max_block_size,
0, true);
pipe.addTransform(std::move(transform));
}

View File

@ -116,10 +116,10 @@ private:
Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe readFromPool(RangesInDataParts parts_with_ranges, Names required_columns, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe readInOrder(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, bool use_uncompressed_cache);
Pipe readInOrder(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, bool use_uncompressed_cache, UInt64 limit);
template<typename TSource>
ProcessorPtr createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache);
ProcessorPtr createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache, bool has_limit_below_one_block);
Pipe spreadMarkRangesAmongStreams(
RangesInDataParts && parts_with_ranges,

View File

@ -200,6 +200,7 @@ void MergeSortingTransform::consume(Chunk chunk)
description,
max_merged_block_size,
limit,
false,
nullptr,
quiet,
use_average_block_sizes,

View File

@ -5,6 +5,7 @@
#include <Columns/ColumnsNumber.h>
#include <Common/CurrentThread.h>
#include <Common/SettingsChanges.h>
#include <Common/setThreadName.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
@ -480,6 +481,40 @@ namespace
};
/// A boolean state protected by mutex able to wait until other thread sets it to a specific value.
class BoolState
{
public:
explicit BoolState(bool initial_value) : value(initial_value) {}
bool get() const
{
std::lock_guard lock{mutex};
return value;
}
void set(bool new_value)
{
std::lock_guard lock{mutex};
if (value == new_value)
return;
value = new_value;
changed.notify_all();
}
void wait(bool wanted_value) const
{
std::unique_lock lock{mutex};
changed.wait(lock, [this, wanted_value]() { return value == wanted_value; });
}
private:
bool value;
mutable std::mutex mutex;
mutable std::condition_variable changed;
};
/// Handles a connection after a responder is started (i.e. after getting a new call).
class Call
{
@ -564,18 +599,15 @@ namespace
UInt64 waited_for_client_writing = 0;
/// The following fields are accessed both from call_thread and queue_thread.
std::atomic<bool> reading_query_info = false;
BoolState reading_query_info{false};
std::atomic<bool> failed_to_read_query_info = false;
GRPCQueryInfo next_query_info_while_reading;
std::atomic<bool> want_to_cancel = false;
std::atomic<bool> check_query_info_contains_cancel_only = false;
std::atomic<bool> sending_result = false;
BoolState sending_result{false};
std::atomic<bool> failed_to_send_result = false;
ThreadFromGlobalPool call_thread;
std::condition_variable read_finished;
std::condition_variable write_finished;
std::mutex dummy_mutex; /// Doesn't protect anything.
};
Call::Call(CallType call_type_, std::unique_ptr<BaseResponder> responder_, IServer & iserver_, Poco::Logger * log_)
@ -610,6 +642,7 @@ namespace
{
try
{
setThreadName("GRPCServerCall");
receiveQuery();
executeQuery();
processInput();
@ -1230,8 +1263,7 @@ namespace
{
auto start_reading = [&]
{
assert(!reading_query_info);
reading_query_info = true;
reading_query_info.set(true);
responder->read(next_query_info_while_reading, [this](bool ok)
{
/// Called on queue_thread.
@ -1256,18 +1288,16 @@ namespace
/// on queue_thread.
failed_to_read_query_info = true;
}
reading_query_info = false;
read_finished.notify_one();
reading_query_info.set(false);
});
};
auto finish_reading = [&]
{
if (reading_query_info)
if (reading_query_info.get())
{
Stopwatch client_writing_watch;
std::unique_lock lock{dummy_mutex};
read_finished.wait(lock, [this] { return !reading_query_info; });
reading_query_info.wait(false);
waited_for_client_writing += client_writing_watch.elapsedNanoseconds();
}
throwIfFailedToReadQueryInfo();
@ -1430,11 +1460,10 @@ namespace
/// Wait for previous write to finish.
/// (gRPC doesn't allow to start sending another result while the previous is still being sending.)
if (sending_result)
if (sending_result.get())
{
Stopwatch client_reading_watch;
std::unique_lock lock{dummy_mutex};
write_finished.wait(lock, [this] { return !sending_result; });
sending_result.wait(false);
waited_for_client_reading += client_reading_watch.elapsedNanoseconds();
}
throwIfFailedToSendResult();
@ -1445,14 +1474,13 @@ namespace
if (write_buffer)
write_buffer->finalize();
sending_result = true;
sending_result.set(true);
auto callback = [this](bool ok)
{
/// Called on queue_thread.
if (!ok)
failed_to_send_result = true;
sending_result = false;
write_finished.notify_one();
sending_result.set(false);
};
Stopwatch client_reading_final_watch;
@ -1472,8 +1500,7 @@ namespace
if (send_final_message)
{
/// Wait until the result is actually sent.
std::unique_lock lock{dummy_mutex};
write_finished.wait(lock, [this] { return !sending_result; });
sending_result.wait(false);
waited_for_client_reading += client_reading_final_watch.elapsedNanoseconds();
throwIfFailedToSendResult();
LOG_TRACE(log, "Final result has been sent to the client");
@ -1584,7 +1611,7 @@ private:
{
/// Called on call_thread. That's why we can't destroy the `call` right now
/// (thread can't join to itself). Thus here we only move the `call` from
/// `current_call` to `finished_calls` and run() will actually destroy the `call`.
/// `current_calls` to `finished_calls` and run() will actually destroy the `call`.
std::lock_guard lock{mutex};
auto it = current_calls.find(call);
finished_calls.push_back(std::move(it->second));
@ -1593,6 +1620,7 @@ private:
void run()
{
setThreadName("GRPCServerQueue");
while (true)
{
{

View File

@ -1276,6 +1276,11 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
validateColumnsDefaultsAndGetSampleBlock(default_expr_list, all_columns.getAll(), context);
}
bool AlterCommands::hasSettingsAlterCommand() const
{
return std::any_of(begin(), end(), [](const AlterCommand & c) { return c.isSettingsAlter(); });
}
bool AlterCommands::isSettingsAlter() const
{
return std::all_of(begin(), end(), [](const AlterCommand & c) { return c.isSettingsAlter(); });

Some files were not shown because too many files have changed in this diff Show More