Merge remote-tracking branch 'blessed/master' into i58727

This commit is contained in:
Raúl Marín 2024-01-12 16:10:43 +00:00
commit 0684cc7b76
470 changed files with 5648 additions and 2880 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit b7ea89b817a18dc0eafc1f909d568869f02d2d04
Subproject commit 1278e32bb0d5dc489f947e002bdf8c71b0ddaa63

2
contrib/azure vendored

@ -1 +1 @@
Subproject commit 060c54dfb0abe869c065143303a9d3e9c54c29e3
Subproject commit e71395e44f309f97b5a486f5c2c59b82f85dd2d2

View File

@ -33,7 +33,6 @@ set(SRCS
"${LIBCXX_SOURCE_DIR}/src/optional.cpp"
"${LIBCXX_SOURCE_DIR}/src/random.cpp"
"${LIBCXX_SOURCE_DIR}/src/random_shuffle.cpp"
"${LIBCXX_SOURCE_DIR}/src/regex.cpp"
"${LIBCXX_SOURCE_DIR}/src/ryu/d2fixed.cpp"
"${LIBCXX_SOURCE_DIR}/src/ryu/d2s.cpp"
"${LIBCXX_SOURCE_DIR}/src/ryu/f2s.cpp"

@ -1 +1 @@
Subproject commit 1834e42289c58402c804a87be4d489892b88f3ec
Subproject commit 2568a7cd1297c7c3044b0f3cc0c23a6f6444d856

2
contrib/rocksdb vendored

@ -1 +1 @@
Subproject commit 66e3cbec31400ed3a23deb878c5d7f56f990f0ae
Subproject commit dead55e60b873d5f70f0e9458fbbba2b2180f430

2
contrib/sqids-cpp vendored

@ -1 +1 @@
Subproject commit 3756e537d4d48cc0dd4176801fe19f99601439b0
Subproject commit a471f53672e98d49223f598528a533b07b085c61

View File

@ -193,7 +193,7 @@ stop
# Let's enable S3 storage by default
export USE_S3_STORAGE_FOR_MERGE_TREE=1
export $RANDOMIZE_OBJECT_KEY_TYPE=1
export RANDOMIZE_OBJECT_KEY_TYPE=1
export ZOOKEEPER_FAULT_INJECTION=1
configure

View File

@ -504,24 +504,25 @@ Indexes of type `set` can be utilized by all functions. The other index types ar
| Function (operator) / Index | primary key | minmax | ngrambf_v1 | tokenbf_v1 | bloom_filter | inverted |
|------------------------------------------------------------------------------------------------------------|-------------|--------|------------|------------|--------------|----------|
| [equals (=, ==)](/docs/en/sql-reference/functions/comparison-functions.md/#equals) | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ |
| [notEquals(!=, <>)](/docs/en/sql-reference/functions/comparison-functions.md/#notequals) | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ |
| [like](/docs/en/sql-reference/functions/string-search-functions.md/#function-like) | ✔ | ✔ | ✔ | ✔ | ✗ | ✔ |
| [notLike](/docs/en/sql-reference/functions/string-search-functions.md/#function-notlike) | ✔ | ✔ | ✔ | ✔ | ✗ | ✔ |
| [equals (=, ==)](/docs/en/sql-reference/functions/comparison-functions.md/#equals) | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ |
| [notEquals(!=, <>)](/docs/en/sql-reference/functions/comparison-functions.md/#notequals) | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ |
| [like](/docs/en/sql-reference/functions/string-search-functions.md/#like) | ✔ | ✔ | ✔ | ✔ | ✗ | ✔ |
| [notLike](/docs/en/sql-reference/functions/string-search-functions.md/#notlike) | ✔ | ✔ | ✔ | ✔ | ✗ | ✔ |
| [match](/docs/en/sql-reference/functions/string-search-functions.md/#match) | ✗ | ✗ | ✔ | ✔ | ✗ | ✗ |
| [startsWith](/docs/en/sql-reference/functions/string-functions.md/#startswith) | ✔ | ✔ | ✔ | ✔ | ✗ | ✔ |
| [endsWith](/docs/en/sql-reference/functions/string-functions.md/#endswith) | ✗ | ✗ | ✔ | ✔ | ✗ | ✔ |
| [multiSearchAny](/docs/en/sql-reference/functions/string-search-functions.md/#function-multisearchany) | ✗ | ✗ | ✔ | ✗ | ✗ | ✔ |
| [in](/docs/en/sql-reference/functions/in-functions#in-functions) | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ |
| [notIn](/docs/en/sql-reference/functions/in-functions#in-functions) | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ |
| [less (<)](/docs/en/sql-reference/functions/comparison-functions.md/#less) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [greater (>)](/docs/en/sql-reference/functions/comparison-functions.md/#greater) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [lessOrEquals (<=)](/docs/en/sql-reference/functions/comparison-functions.md/#lessorequals) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [greaterOrEquals (>=)](/docs/en/sql-reference/functions/comparison-functions.md/#greaterorequals) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [empty](/docs/en/sql-reference/functions/array-functions#function-empty) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [notEmpty](/docs/en/sql-reference/functions/array-functions#function-notempty) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [has](/docs/en/sql-reference/functions/array-functions#function-has) | ✗ | ✗ | ✔ | ✔ | ✔ | ✔ |
| [hasAny](/docs/en/sql-reference/functions/array-functions#function-hasAny) | ✗ | ✗ | ✔ | ✔ | ✔ | ✗ |
| [hasAll](/docs/en/sql-reference/functions/array-functions#function-hasAll) | ✗ | ✗ | ✗ | ✗ | ✔ | ✗ |
| [multiSearchAny](/docs/en/sql-reference/functions/string-search-functions.md/#multisearchany) | ✗ | ✗ | ✔ | ✗ | ✗ | ✔ |
| [in](/docs/en/sql-reference/functions/in-functions) | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ |
| [notIn](/docs/en/sql-reference/functions/in-functions) | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ |
| [less (<)](/docs/en/sql-reference/functions/comparison-functions.md/#less) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [greater (>)](/docs/en/sql-reference/functions/comparison-functions.md/#greater) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [lessOrEquals (<=)](/docs/en/sql-reference/functions/comparison-functions.md/#lessorequals) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [greaterOrEquals (>=)](/docs/en/sql-reference/functions/comparison-functions.md/#greaterorequals) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [empty](/docs/en/sql-reference/functions/array-functions/#empty) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [notEmpty](/docs/en/sql-reference/functions/array-functions/#notempty) | ✔ | ✔ | ✗ | ✗ | ✗ | ✗ |
| [has](/docs/en/sql-reference/functions/array-functions/#has) | ✗ | ✗ | ✔ | ✔ | ✔ | ✔ |
| [hasAny](/docs/en/sql-reference/functions/array-functions/#hasany) | ✗ | ✗ | ✔ | ✔ | ✔ | ✗ |
| [hasAll](/docs/en/sql-reference/functions/array-functions/#hasall) | ✗ | ✗ | ✗ | ✗ | ✔ | ✗ |
| hasToken | ✗ | ✗ | ✗ | ✔ | ✗ | ✔ |
| hasTokenOrNull | ✗ | ✗ | ✗ | ✔ | ✗ | ✔ |
| hasTokenCaseInsensitive (*) | ✗ | ✗ | ✗ | ✔ | ✗ | ✗ |

View File

@ -29,10 +29,6 @@ Transactionally inconsistent caching is traditionally provided by client tools o
the same caching logic and configuration is often duplicated. With ClickHouse's query cache, the caching logic moves to the server side.
This reduces maintenance effort and avoids redundancy.
:::note
Security consideration: The cached query result is tied to the user executing it. Authorization checks are performed when the query is executed. This means that if there are any alterations to the user's role or permissions between the time the query is cached and when the cache is accessed, the result will not reflect these changes. We recommend using different users to distinguish between different levels of access, instead of actively toggling roles for a single user between queries, as this practice may lead to unexpected query results.
:::
## Configuration Settings and Usage
Setting [use_query_cache](settings/settings.md#use-query-cache) can be used to control whether a specific query or all queries of the

View File

@ -1,5 +1,5 @@
---
sidebar_label: Settings Overview
title: "Settings Overview"
sidebar_position: 1
slug: /en/operations/settings/
pagination_next: en/operations/settings/settings
@ -16,11 +16,34 @@ There are two main groups of ClickHouse settings:
- Global server settings
- Query-level settings
The main distinction between global server settings and query-level settings is that
global server settings must be set in configuration files while query-level settings
can be set in configuration files or with SQL queries.
The main distinction between global server settings and query-level settings is that global server settings must be set in configuration files, while query-level settings can be set in configuration files or with SQL queries.
Read about [global server settings](/docs/en/operations/server-configuration-parameters/settings.md) to learn more about configuring your ClickHouse server at the global server level.
Read about [query-level settings](/docs/en/operations/settings/settings-query-level.md) to learn more about configuring your ClickHouse server at the query-level.
Read about [query-level settings](/docs/en/operations/settings/settings-query-level.md) to learn more about configuring your ClickHouse server at the query level.
## See non-default settings
To view which settings have been changed from their default value:
```sql
SELECT name, value FROM system.settings WHERE changed
```
If you haven't changed any settings from their default value, then ClickHouse will return nothing.
To check the value of a particular setting, specify the `name` of the setting in your query:
```sql
SELECT name, value FROM system.settings WHERE name = 'max_threads'
```
This command should return something like:
```response
┌─name────────┬─value─────┐
│ max_threads │ 'auto(8)' │
└─────────────┴───────────┘
1 row in set. Elapsed: 0.002 sec.
```

View File

@ -42,7 +42,7 @@ Columns:
- `'ExceptionWhileProcessing' = 4` — Exception during the query execution.
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Query starting date.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Query starting time.
- `event_time_microseconds` ([DateTime](../../sql-reference/data-types/datetime.md)) — Query starting time with microseconds precision.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Query starting time with microseconds precision.
- `query_start_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Start time of query execution.
- `query_start_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Start time of query execution with microsecond precision.
- `query_duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Duration of query execution in milliseconds.

View File

@ -10,7 +10,7 @@ Columns:
- `hostname` ([LowCardinality(String)](../../sql-reference/data-types/string.md)) — Hostname of the server executing the query.
- `event_date` (Date) — Date of the entry.
- `event_time` (DateTime) — Time of the entry.
- `event_time_microseconds` (DateTime) — Time of the entry with microseconds precision.
- `event_time_microseconds` (DateTime64) — Time of the entry with microseconds precision.
- `microseconds` (UInt32) — Microseconds of the entry.
- `thread_name` (String) — Name of the thread from which the logging was done.
- `thread_id` (UInt64) — OS thread ID.

View File

@ -18,6 +18,12 @@ Supported range of values: \[1970-01-01 00:00:00, 2106-02-07 06:28:15\].
Resolution: 1 second.
## Speed
The `Date` datatype is faster than `DateTime` under _most_ conditions.
The `Date` type requires 2 bytes of storage, while `DateTime` requires 4. However, when the database compresses the database, this difference is amplified. This amplification is due to the minutes and seconds in `DateTime` being less compressible. Filtering and aggregating `Date` instead of `DateTime` is also faster.
## Usage Remarks
The point in time is saved as a [Unix timestamp](https://en.wikipedia.org/wiki/Unix_time), regardless of the time zone or daylight saving time. The time zone affects how the values of the `DateTime` type values are displayed in text format and how the values specified as strings are parsed (2020-01-01 05:00:01).

View File

@ -6,7 +6,7 @@ sidebar_label: Arrays
# Array Functions
## empty
## empty {#empty}
Checks whether the input array is empty.
@ -50,7 +50,7 @@ Result:
└────────────────┘
```
## notEmpty
## notEmpty {#notempty}
Checks whether the input array is non-empty.
@ -221,7 +221,7 @@ SELECT has([1, 2, NULL], NULL)
└─────────────────────────┘
```
## hasAll
## hasAll {#hasall}
Checks whether one array is a subset of another.
@ -261,7 +261,7 @@ Raises an exception `NO_COMMON_TYPE` if the set and subset elements do not share
`SELECT hasAll([[1, 2], [3, 4]], [[1, 2], [3, 5]])` returns 0.
## hasAny
## hasAny {#hasany}
Checks whether two arrays have intersection by some elements.

View File

@ -1777,34 +1777,67 @@ Result:
└────────────────────────────────────────────────────────────────────────┘
```
## sqid
## sqidEncode
Transforms numbers into a [Sqid](https://sqids.org/) which is a YouTube-like ID string.
Encodes numbers as a [Sqid](https://sqids.org/) which is a YouTube-like ID string.
The output alphabet is `abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789`.
Do not use this function for hashing - the generated IDs can be decoded back into numbers.
Do not use this function for hashing - the generated IDs can be decoded back into the original numbers.
**Syntax**
```sql
sqid(number1, ...)
sqidEncode(number1, ...)
```
Alias: `sqid`
**Arguments**
- A variable number of UInt8, UInt16, UInt32 or UInt64 numbers.
**Returned Value**
A hash id [String](/docs/en/sql-reference/data-types/string.md).
A sqid [String](/docs/en/sql-reference/data-types/string.md).
**Example**
```sql
SELECT sqid(1, 2, 3, 4, 5);
SELECT sqidEncode(1, 2, 3, 4, 5);
```
```response
┌─sqid(1, 2, 3, 4, 5)─┐
│ gXHfJ1C6dN │
└─────────────────────┘
┌─sqidEncode(1, 2, 3, 4, 5)─┐
│ gXHfJ1C6dN │
└───────────────────────────┘
```
## sqidDecode
Decodes a [Sqid](https://sqids.org/) back into its original numbers.
Returns an empty array in case the input string is not a valid sqid.
**Syntax**
```sql
sqidDecode(sqid)
```
**Arguments**
- A sqid - [String](/docs/en/sql-reference/data-types/string.md)
**Returned Value**
The sqid transformed to numbers [Array(UInt64)](/docs/en/sql-reference/data-types/array.md).
**Example**
```sql
SELECT sqidDecode('gXHfJ1C6dN');
```
```response
┌─sqidDecode('gXHfJ1C6dN')─┐
│ [1,2,3,4,5] │
└──────────────────────────┘
```

View File

@ -731,7 +731,7 @@ Alias: `FROM_BASE64`.
Like `base64Decode` but returns an empty string in case of error.
## endsWith
## endsWith {#endswith}
Returns whether string `str` ends with `suffix`.
@ -765,7 +765,7 @@ Result:
└──────────────────────────┴──────────────────────┘
```
## startsWith
## startsWith {#startswith}
Returns whether string `str` starts with `prefix`.

View File

@ -207,7 +207,7 @@ Functions `multiSearchFirstIndexCaseInsensitive`, `multiSearchFirstIndexUTF8` an
multiSearchFirstIndex(haystack, \[needle<sub>1</sub>, needle<sub>2</sub>, …, needle<sub>n</sub>\])
```
## multiSearchAny
## multiSearchAny {#multisearchany}
Returns 1, if at least one string needle<sub>i</sub> matches the string `haystack` and 0 otherwise.
@ -219,7 +219,7 @@ Functions `multiSearchAnyCaseInsensitive`, `multiSearchAnyUTF8` and `multiSearch
multiSearchAny(haystack, \[needle<sub>1</sub>, needle<sub>2</sub>, …, needle<sub>n</sub>\])
```
## match
## match {#match}
Returns whether string `haystack` matches the regular expression `pattern` in [re2 regular syntax](https://github.com/google/re2/wiki/Syntax).
@ -414,7 +414,7 @@ Result:
└────────────────────────────────────────────────────────────────────────────────────────┘
```
## like
## like {#like}
Returns whether string `haystack` matches the LIKE expression `pattern`.
@ -445,7 +445,7 @@ like(haystack, pattern)
Alias: `haystack LIKE pattern` (operator)
## notLike
## notLike {#notlike}
Like `like` but negates the result.

View File

@ -293,6 +293,8 @@ You can't combine both ways in one query.
Along with columns descriptions constraints could be defined:
### CONSTRAINT
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
@ -307,6 +309,30 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
Adding large amount of constraints can negatively affect performance of big `INSERT` queries.
### ASSUME
The `ASSUME` clause is used to define a `CONSTRAINT` on a table that is assumed to be true. This constraint can then be used by the optimizer to enhance the performance of SQL queries.
Take this example where `ASSUME CONSTRAINT` is used in the creation of the `users_a` table:
```sql
CREATE TABLE users_a (
uid Int16,
name String,
age Int16,
name_len UInt8 MATERIALIZED length(name),
CONSTRAINT c1 ASSUME length(name) = name_len
)
ENGINE=MergeTree
ORDER BY (name_len, name);
```
Here, `ASSUME CONSTRAINT` is used to assert that the `length(name)` function always equals the value of the `name_len` column. This means that whenever `length(name)` is called in a query, ClickHouse can replace it with `name_len`, which should be faster because it avoids calling the `length()` function.
Then, when executing the query `SELECT name FROM users_a WHERE length(name) < 5;`, ClickHouse can optimize it to `SELECT name FROM users_a WHERE name_len < 5`; because of the `ASSUME CONSTRAINT`. This can make the query run faster because it avoids calculating the length of `name` for each row.
`ASSUME CONSTRAINT` **does not enforce the constraint**, it merely informs the optimizer that the constraint holds true. If the constraint is not actually true, the results of the queries may be incorrect. Therefore, you should only use `ASSUME CONSTRAINT` if you are sure that the constraint is true.
## TTL Expression
Defines storage time for values. Can be specified only for MergeTree-family tables. For the detailed description, see [TTL for columns and tables](../../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-ttl).

View File

@ -11,7 +11,7 @@ Its name comes from the fact that it can be looked at as executing `JOIN` with a
Syntax:
``` sql
```sql
SELECT <expr_list>
FROM <left_subquery>
[LEFT] ARRAY JOIN <array>
@ -30,7 +30,7 @@ Supported types of `ARRAY JOIN` are listed below:
The examples below demonstrate the usage of the `ARRAY JOIN` and `LEFT ARRAY JOIN` clauses. Lets create a table with an [Array](../../../sql-reference/data-types/array.md) type column and insert values into it:
``` sql
```sql
CREATE TABLE arrays_test
(
s String,
@ -41,7 +41,7 @@ INSERT INTO arrays_test
VALUES ('Hello', [1,2]), ('World', [3,4,5]), ('Goodbye', []);
```
``` text
```response
┌─s───────────┬─arr─────┐
│ Hello │ [1,2] │
│ World │ [3,4,5] │
@ -51,13 +51,13 @@ VALUES ('Hello', [1,2]), ('World', [3,4,5]), ('Goodbye', []);
The example below uses the `ARRAY JOIN` clause:
``` sql
```sql
SELECT s, arr
FROM arrays_test
ARRAY JOIN arr;
```
``` text
```response
┌─s─────┬─arr─┐
│ Hello │ 1 │
│ Hello │ 2 │
@ -69,13 +69,13 @@ ARRAY JOIN arr;
The next example uses the `LEFT ARRAY JOIN` clause:
``` sql
```sql
SELECT s, arr
FROM arrays_test
LEFT ARRAY JOIN arr;
```
``` text
```response
┌─s───────────┬─arr─┐
│ Hello │ 1 │
│ Hello │ 2 │
@ -90,13 +90,13 @@ LEFT ARRAY JOIN arr;
An alias can be specified for an array in the `ARRAY JOIN` clause. In this case, an array item can be accessed by this alias, but the array itself is accessed by the original name. Example:
``` sql
```sql
SELECT s, arr, a
FROM arrays_test
ARRAY JOIN arr AS a;
```
``` text
```response
┌─s─────┬─arr─────┬─a─┐
│ Hello │ [1,2] │ 1 │
│ Hello │ [1,2] │ 2 │
@ -108,13 +108,13 @@ ARRAY JOIN arr AS a;
Using aliases, you can perform `ARRAY JOIN` with an external array. For example:
``` sql
```sql
SELECT s, arr_external
FROM arrays_test
ARRAY JOIN [1, 2, 3] AS arr_external;
```
``` text
```response
┌─s───────────┬─arr_external─┐
│ Hello │ 1 │
│ Hello │ 2 │
@ -130,13 +130,13 @@ ARRAY JOIN [1, 2, 3] AS arr_external;
Multiple arrays can be comma-separated in the `ARRAY JOIN` clause. In this case, `JOIN` is performed with them simultaneously (the direct sum, not the cartesian product). Note that all the arrays must have the same size by default. Example:
``` sql
```sql
SELECT s, arr, a, num, mapped
FROM arrays_test
ARRAY JOIN arr AS a, arrayEnumerate(arr) AS num, arrayMap(x -> x + 1, arr) AS mapped;
```
``` text
```response
┌─s─────┬─arr─────┬─a─┬─num─┬─mapped─┐
│ Hello │ [1,2] │ 1 │ 1 │ 2 │
│ Hello │ [1,2] │ 2 │ 2 │ 3 │
@ -148,13 +148,13 @@ ARRAY JOIN arr AS a, arrayEnumerate(arr) AS num, arrayMap(x -> x + 1, arr) AS ma
The example below uses the [arrayEnumerate](../../../sql-reference/functions/array-functions.md#array_functions-arrayenumerate) function:
``` sql
```sql
SELECT s, arr, a, num, arrayEnumerate(arr)
FROM arrays_test
ARRAY JOIN arr AS a, arrayEnumerate(arr) AS num;
```
``` text
```response
┌─s─────┬─arr─────┬─a─┬─num─┬─arrayEnumerate(arr)─┐
│ Hello │ [1,2] │ 1 │ 1 │ [1,2] │
│ Hello │ [1,2] │ 2 │ 2 │ [1,2] │
@ -163,6 +163,7 @@ ARRAY JOIN arr AS a, arrayEnumerate(arr) AS num;
│ World │ [3,4,5] │ 5 │ 3 │ [1,2,3] │
└───────┴─────────┴───┴─────┴─────────────────────┘
```
Multiple arrays with different sizes can be joined by using: `SETTINGS enable_unaligned_array_join = 1`. Example:
```sql
@ -171,7 +172,7 @@ FROM arrays_test ARRAY JOIN arr as a, [['a','b'],['c']] as b
SETTINGS enable_unaligned_array_join = 1;
```
```text
```response
┌─s───────┬─arr─────┬─a─┬─b─────────┐
│ Hello │ [1,2] │ 1 │ ['a','b'] │
│ Hello │ [1,2] │ 2 │ ['c'] │
@ -187,7 +188,7 @@ SETTINGS enable_unaligned_array_join = 1;
`ARRAY JOIN` also works with [nested data structures](../../../sql-reference/data-types/nested-data-structures/index.md):
``` sql
```sql
CREATE TABLE nested_test
(
s String,
@ -200,7 +201,7 @@ INSERT INTO nested_test
VALUES ('Hello', [1,2], [10,20]), ('World', [3,4,5], [30,40,50]), ('Goodbye', [], []);
```
``` text
```response
┌─s───────┬─nest.x──┬─nest.y─────┐
│ Hello │ [1,2] │ [10,20] │
│ World │ [3,4,5] │ [30,40,50] │
@ -208,13 +209,13 @@ VALUES ('Hello', [1,2], [10,20]), ('World', [3,4,5], [30,40,50]), ('Goodbye', []
└─────────┴─────────┴────────────┘
```
``` sql
```sql
SELECT s, `nest.x`, `nest.y`
FROM nested_test
ARRAY JOIN nest;
```
``` text
```response
┌─s─────┬─nest.x─┬─nest.y─┐
│ Hello │ 1 │ 10 │
│ Hello │ 2 │ 20 │
@ -226,13 +227,13 @@ ARRAY JOIN nest;
When specifying names of nested data structures in `ARRAY JOIN`, the meaning is the same as `ARRAY JOIN` with all the array elements that it consists of. Examples are listed below:
``` sql
```sql
SELECT s, `nest.x`, `nest.y`
FROM nested_test
ARRAY JOIN `nest.x`, `nest.y`;
```
``` text
```response
┌─s─────┬─nest.x─┬─nest.y─┐
│ Hello │ 1 │ 10 │
│ Hello │ 2 │ 20 │
@ -244,13 +245,13 @@ ARRAY JOIN `nest.x`, `nest.y`;
This variation also makes sense:
``` sql
```sql
SELECT s, `nest.x`, `nest.y`
FROM nested_test
ARRAY JOIN `nest.x`;
```
``` text
```response
┌─s─────┬─nest.x─┬─nest.y─────┐
│ Hello │ 1 │ [10,20] │
│ Hello │ 2 │ [10,20] │
@ -262,13 +263,13 @@ ARRAY JOIN `nest.x`;
An alias may be used for a nested data structure, in order to select either the `JOIN` result or the source array. Example:
``` sql
```sql
SELECT s, `n.x`, `n.y`, `nest.x`, `nest.y`
FROM nested_test
ARRAY JOIN nest AS n;
```
``` text
```response
┌─s─────┬─n.x─┬─n.y─┬─nest.x──┬─nest.y─────┐
│ Hello │ 1 │ 10 │ [1,2] │ [10,20] │
│ Hello │ 2 │ 20 │ [1,2] │ [10,20] │
@ -280,13 +281,13 @@ ARRAY JOIN nest AS n;
Example of using the [arrayEnumerate](../../../sql-reference/functions/array-functions.md#array_functions-arrayenumerate) function:
``` sql
```sql
SELECT s, `n.x`, `n.y`, `nest.x`, `nest.y`, num
FROM nested_test
ARRAY JOIN nest AS n, arrayEnumerate(`nest.x`) AS num;
```
``` text
```response
┌─s─────┬─n.x─┬─n.y─┬─nest.x──┬─nest.y─────┬─num─┐
│ Hello │ 1 │ 10 │ [1,2] │ [10,20] │ 1 │
│ Hello │ 2 │ 20 │ [1,2] │ [10,20] │ 2 │
@ -300,6 +301,11 @@ ARRAY JOIN nest AS n, arrayEnumerate(`nest.x`) AS num;
The query execution order is optimized when running `ARRAY JOIN`. Although `ARRAY JOIN` must always be specified before the [WHERE](../../../sql-reference/statements/select/where.md)/[PREWHERE](../../../sql-reference/statements/select/prewhere.md) clause in a query, technically they can be performed in any order, unless result of `ARRAY JOIN` is used for filtering. The processing order is controlled by the query optimizer.
### Incompatibility with short-circuit function evaluation
[Short-circuit function evaluation](../../../operations/settings/index.md#short-circuit-function-evaluation) is a feature that optimizes the execution of complex expressions in specific functions such as `if`, `multiIf`, `and`, and `or`. It prevents potential exceptions, such as division by zero, from occurring during the execution of these functions.
`arrayJoin` is always executed and not supported for short circuit function evaluation. That's because it's a unique function processed separately from all other functions during query analysis and execution and requires additional logic that doesn't work with short circuit function execution. The reason is that the number of rows in the result depends on the arrayJoin result, and it's too complex and expensive to implement lazy execution of `arrayJoin`.
## Related content

View File

@ -12,7 +12,7 @@ Join produces a new table by combining columns from one or multiple tables by us
``` sql
SELECT <expr_list>
FROM <left_table>
[GLOBAL] [INNER|LEFT|RIGHT|FULL|CROSS] [OUTER|SEMI|ANTI|ANY|ASOF] JOIN <right_table>
[GLOBAL] [INNER|LEFT|RIGHT|FULL|CROSS] [OUTER|SEMI|ANTI|ANY|ALL|ASOF] JOIN <right_table>
(ON <expr_list>)|(USING <column_list>) ...
```
@ -296,6 +296,34 @@ PASTE JOIN
│ 1 │ 0 │
└───┴──────┘
```
Note: In this case result can be nondeterministic if the reading is parallel. Example:
```SQL
SELECT *
FROM
(
SELECT number AS a
FROM numbers_mt(5)
) AS t1
PASTE JOIN
(
SELECT number AS a
FROM numbers(10)
ORDER BY a DESC
) AS t2
SETTINGS max_block_size = 2;
┌─a─┬─t2.a─┐
│ 2 │ 9 │
│ 3 │ 8 │
└───┴──────┘
┌─a─┬─t2.a─┐
│ 0 │ 7 │
│ 1 │ 6 │
└───┴──────┘
┌─a─┬─t2.a─┐
│ 4 │ 5 │
└───┴──────┘
```
## Distributed JOIN

View File

@ -17,15 +17,7 @@
#include <Common/Config/ConfigProcessor.h>
#include <Common/Exception.h>
#include <Common/parseGlobs.h>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
#include <Common/re2.h>
static void setupLogging(const std::string & log_level)
{

View File

@ -16,6 +16,7 @@
#include <Common/SipHash.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ShellCommand.h>
#include <Common/re2.h>
#include <base/find_symbols.h>
#include <IO/copyData.h>
@ -24,15 +25,6 @@
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
static constexpr auto documentation = R"(
A tool to extract information from Git repository for analytics.

View File

@ -95,6 +95,7 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/CurrentThread.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/NamedCollections/NamedCollections.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/NamedCollections/NamedCollectionConfiguration.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/Jemalloc.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/IKeeper.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/TestKeeper.cpp

View File

@ -2,6 +2,7 @@
#include "CatBoostLibraryHandler.h"
#include "CatBoostLibraryHandlerFactory.h"
#include "Common/ProfileEvents.h"
#include "ExternalDictionaryLibraryHandler.h"
#include "ExternalDictionaryLibraryHandlerFactory.h"
@ -44,7 +45,7 @@ namespace
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(&Poco::Logger::get("LibraryBridge"), fmt::runtime(message));
}
@ -96,7 +97,7 @@ ExternalDictionaryLibraryBridgeRequestHandler::ExternalDictionaryLibraryBridgeRe
}
void ExternalDictionaryLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ExternalDictionaryLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
@ -384,7 +385,7 @@ ExternalDictionaryLibraryBridgeExistsHandler::ExternalDictionaryLibraryBridgeExi
}
void ExternalDictionaryLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ExternalDictionaryLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{
@ -423,7 +424,7 @@ CatBoostLibraryBridgeRequestHandler::CatBoostLibraryBridgeRequestHandler(
}
void CatBoostLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void CatBoostLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
@ -463,6 +464,9 @@ void CatBoostLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & requ
{
if (method == "catboost_list")
{
auto & read_buf = request.getStream();
params.read(read_buf);
ExternalModelInfos model_infos = CatBoostLibraryHandlerFactory::instance().getModelInfos();
writeIntBinary(static_cast<UInt64>(model_infos.size()), out);
@ -500,6 +504,9 @@ void CatBoostLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & requ
}
else if (method == "catboost_removeAllModels")
{
auto & read_buf = request.getStream();
params.read(read_buf);
CatBoostLibraryHandlerFactory::instance().removeAllModels();
String res = "1";
@ -621,7 +628,7 @@ CatBoostLibraryBridgeExistsHandler::CatBoostLibraryBridgeExistsHandler(size_t ke
}
void CatBoostLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void CatBoostLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{

View File

@ -20,7 +20,7 @@ class ExternalDictionaryLibraryBridgeRequestHandler : public HTTPRequestHandler,
public:
ExternalDictionaryLibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
static constexpr inline auto FORMAT = "RowBinary";
@ -36,7 +36,7 @@ class ExternalDictionaryLibraryBridgeExistsHandler : public HTTPRequestHandler,
public:
ExternalDictionaryLibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
const size_t keep_alive_timeout;
@ -65,7 +65,7 @@ class CatBoostLibraryBridgeRequestHandler : public HTTPRequestHandler, WithConte
public:
CatBoostLibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
const size_t keep_alive_timeout;
@ -79,7 +79,7 @@ class CatBoostLibraryBridgeExistsHandler : public HTTPRequestHandler, WithContex
public:
CatBoostLibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
const size_t keep_alive_timeout;

View File

@ -69,7 +69,7 @@ namespace
}
void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
@ -78,7 +78,7 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(log, fmt::runtime(message));
};

View File

@ -23,7 +23,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -21,7 +21,7 @@
namespace DB
{
void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
@ -30,7 +30,7 @@ void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServ
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
response.send()->writeln(message);
LOG_WARNING(log, fmt::runtime(message));
};

View File

@ -21,7 +21,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -46,12 +46,12 @@ void ODBCHandler::processError(HTTPServerResponse & response, const std::string
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(log, fmt::runtime(message));
}
void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request);
LOG_TRACE(log, "Request URI: {}", request.getURI());

View File

@ -30,7 +30,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -6,7 +6,7 @@
namespace DB
{
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{

View File

@ -10,7 +10,7 @@ class PingHandler : public HTTPRequestHandler
{
public:
explicit PingHandler(size_t keep_alive_timeout_) : keep_alive_timeout(keep_alive_timeout_) {}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
size_t keep_alive_timeout;

View File

@ -29,7 +29,7 @@ namespace
}
void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
@ -38,7 +38,7 @@ void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServer
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(log, fmt::runtime(message));
};

View File

@ -24,7 +24,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -153,6 +153,18 @@ namespace ProfileEvents
{
extern const Event MainConfigLoads;
extern const Event ServerStartupMilliseconds;
extern const Event InterfaceNativeSendBytes;
extern const Event InterfaceNativeReceiveBytes;
extern const Event InterfaceHTTPSendBytes;
extern const Event InterfaceHTTPReceiveBytes;
extern const Event InterfacePrometheusSendBytes;
extern const Event InterfacePrometheusReceiveBytes;
extern const Event InterfaceInterserverSendBytes;
extern const Event InterfaceInterserverReceiveBytes;
extern const Event InterfaceMySQLSendBytes;
extern const Event InterfaceMySQLReceiveBytes;
extern const Event InterfacePostgreSQLSendBytes;
extern const Event InterfacePostgreSQLReceiveBytes;
}
namespace fs = std::filesystem;
@ -2049,7 +2061,7 @@ std::unique_ptr<TCPProtocolStackFactory> Server::buildProtocolStackFromConfig(
auto create_factory = [&](const std::string & type, const std::string & conf_name) -> TCPServerConnectionFactory::Ptr
{
if (type == "tcp")
return TCPServerConnectionFactory::Ptr(new TCPHandlerFactory(*this, false, false));
return TCPServerConnectionFactory::Ptr(new TCPHandlerFactory(*this, false, false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes));
if (type == "tls")
#if USE_SSL
@ -2061,20 +2073,20 @@ std::unique_ptr<TCPProtocolStackFactory> Server::buildProtocolStackFromConfig(
if (type == "proxy1")
return TCPServerConnectionFactory::Ptr(new ProxyV1HandlerFactory(*this, conf_name));
if (type == "mysql")
return TCPServerConnectionFactory::Ptr(new MySQLHandlerFactory(*this));
return TCPServerConnectionFactory::Ptr(new MySQLHandlerFactory(*this, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes));
if (type == "postgres")
return TCPServerConnectionFactory::Ptr(new PostgreSQLHandlerFactory(*this));
return TCPServerConnectionFactory::Ptr(new PostgreSQLHandlerFactory(*this, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes));
if (type == "http")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"))
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes)
);
if (type == "prometheus")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"))
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), ProfileEvents::InterfacePrometheusReceiveBytes, ProfileEvents::InterfacePrometheusSendBytes)
);
if (type == "interserver")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"))
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"), ProfileEvents::InterfaceInterserverReceiveBytes, ProfileEvents::InterfaceInterserverSendBytes)
);
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol configuration error, unknown protocol name '{}'", type);
@ -2207,7 +2219,7 @@ void Server::createServers(
port_name,
"http://" + address.toString(),
std::make_unique<HTTPServer>(
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes));
});
}
@ -2227,7 +2239,7 @@ void Server::createServers(
port_name,
"https://" + address.toString(),
std::make_unique<HTTPServer>(
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes));
#else
UNUSED(port);
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "HTTPS protocol is disabled because Poco library was built without NetSSL support.");
@ -2250,7 +2262,7 @@ void Server::createServers(
port_name,
"native protocol (tcp): " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false),
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
@ -2272,7 +2284,7 @@ void Server::createServers(
port_name,
"native protocol (tcp) with PROXY: " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true),
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
@ -2295,7 +2307,7 @@ void Server::createServers(
port_name,
"secure native protocol (tcp_secure): " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false),
new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
@ -2319,7 +2331,7 @@ void Server::createServers(
listen_host,
port_name,
"MySQL compatibility protocol: " + address.toString(),
std::make_unique<TCPServer>(new MySQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
std::make_unique<TCPServer>(new MySQLHandlerFactory(*this, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes), server_pool, socket, new Poco::Net::TCPServerParams));
});
}
@ -2336,7 +2348,7 @@ void Server::createServers(
listen_host,
port_name,
"PostgreSQL compatibility protocol: " + address.toString(),
std::make_unique<TCPServer>(new PostgreSQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
std::make_unique<TCPServer>(new PostgreSQLHandlerFactory(*this, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes), server_pool, socket, new Poco::Net::TCPServerParams));
});
}
@ -2370,7 +2382,7 @@ void Server::createServers(
port_name,
"Prometheus: http://" + address.toString(),
std::make_unique<HTTPServer>(
httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfacePrometheusReceiveBytes, ProfileEvents::InterfacePrometheusSendBytes));
});
}
}
@ -2416,7 +2428,9 @@ void Server::createInterserverServers(
createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,
http_params));
http_params,
ProfileEvents::InterfaceInterserverReceiveBytes,
ProfileEvents::InterfaceInterserverSendBytes));
});
}
@ -2439,7 +2453,9 @@ void Server::createInterserverServers(
createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPSHandler-factory"),
server_pool,
socket,
http_params));
http_params,
ProfileEvents::InterfaceInterserverReceiveBytes,
ProfileEvents::InterfaceInterserverSendBytes));
#else
UNUSED(port);
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");

View File

@ -1,5 +1,6 @@
#include <Common/Exception.h>
#include <Common/TerminalSize.h>
#include <Common/re2.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
@ -12,15 +13,6 @@
#include <boost/program_options.hpp>
#include <filesystem>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
namespace fs = std::filesystem;
#define EXTRACT_PATH_PATTERN ".*\\/store/(.*)"

View File

@ -24,20 +24,12 @@
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <base/defines.h>
#include <IO/Operators.h>
#include <Common/re2.h>
#include <Poco/AccessExpireCache.h>
#include <boost/algorithm/string/join.hpp>
#include <filesystem>
#include <mutex>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
namespace DB
{
namespace ErrorCodes

View File

@ -200,6 +200,7 @@ enum class AccessType
M(SYSTEM_UNFREEZE, "SYSTEM UNFREEZE", GLOBAL, SYSTEM) \
M(SYSTEM_FAILPOINT, "SYSTEM ENABLE FAILPOINT, SYSTEM DISABLE FAILPOINT", GLOBAL, SYSTEM) \
M(SYSTEM_LISTEN, "SYSTEM START LISTEN, SYSTEM STOP LISTEN", GLOBAL, SYSTEM) \
M(SYSTEM_JEMALLOC, "SYSTEM JEMALLOC PURGE, SYSTEM JEMALLOC ENABLE PROFILE, SYSTEM JEMALLOC DISABLE PROFILE, SYSTEM JEMALLOC FLUSH PROFILE", GLOBAL, SYSTEM) \
M(SYSTEM, "", GROUP, ALL) /* allows to execute SYSTEM {SHUTDOWN|RELOAD CONFIG|...} */ \
\
M(dictGet, "dictHas, dictGetHierarchy, dictIsIn", DICTIONARY, ALL) /* allows to execute functions dictGet(), dictHas(), dictGetHierarchy(), dictIsIn() */\

View File

@ -3,15 +3,7 @@
#include <Analyzer/Identifier.h>
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/ListNode.h>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
#include <Common/re2.h>
namespace DB
{

View File

@ -4,15 +4,7 @@
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/ColumnTransformers.h>
#include <Parsers/ASTAsterisk.h>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
#include <Common/re2.h>
namespace DB
{

View File

@ -64,39 +64,43 @@ public:
auto lhs_argument_node_type = lhs_argument->getNodeType();
auto rhs_argument_node_type = rhs_argument->getNodeType();
QueryTreeNodePtr candidate;
if (lhs_argument_node_type == QueryTreeNodeType::FUNCTION && rhs_argument_node_type == QueryTreeNodeType::FUNCTION)
tryOptimizeComparisonTupleFunctions(node, lhs_argument, rhs_argument, comparison_function_name);
candidate = tryOptimizeComparisonTupleFunctions(lhs_argument, rhs_argument, comparison_function_name);
else if (lhs_argument_node_type == QueryTreeNodeType::FUNCTION && rhs_argument_node_type == QueryTreeNodeType::CONSTANT)
tryOptimizeComparisonTupleFunctionAndConstant(node, lhs_argument, rhs_argument, comparison_function_name);
candidate = tryOptimizeComparisonTupleFunctionAndConstant(lhs_argument, rhs_argument, comparison_function_name);
else if (lhs_argument_node_type == QueryTreeNodeType::CONSTANT && rhs_argument_node_type == QueryTreeNodeType::FUNCTION)
tryOptimizeComparisonTupleFunctionAndConstant(node, rhs_argument, lhs_argument, comparison_function_name);
candidate = tryOptimizeComparisonTupleFunctionAndConstant(rhs_argument, lhs_argument, comparison_function_name);
if (candidate != nullptr && node->getResultType()->equals(*candidate->getResultType()))
node = candidate;
}
private:
void tryOptimizeComparisonTupleFunctions(QueryTreeNodePtr & node,
QueryTreeNodePtr tryOptimizeComparisonTupleFunctions(
const QueryTreeNodePtr & lhs_function_node,
const QueryTreeNodePtr & rhs_function_node,
const std::string & comparison_function_name) const
{
const auto & lhs_function_node_typed = lhs_function_node->as<FunctionNode &>();
if (lhs_function_node_typed.getFunctionName() != "tuple")
return;
return {};
const auto & rhs_function_node_typed = rhs_function_node->as<FunctionNode &>();
if (rhs_function_node_typed.getFunctionName() != "tuple")
return;
return {};
const auto & lhs_tuple_function_arguments_nodes = lhs_function_node_typed.getArguments().getNodes();
size_t lhs_tuple_function_arguments_nodes_size = lhs_tuple_function_arguments_nodes.size();
const auto & rhs_tuple_function_arguments_nodes = rhs_function_node_typed.getArguments().getNodes();
if (lhs_tuple_function_arguments_nodes_size != rhs_tuple_function_arguments_nodes.size())
return;
return {};
if (lhs_tuple_function_arguments_nodes_size == 1)
{
node = makeComparisonFunction(lhs_tuple_function_arguments_nodes[0], rhs_tuple_function_arguments_nodes[0], comparison_function_name);
return;
return makeComparisonFunction(lhs_tuple_function_arguments_nodes[0], rhs_tuple_function_arguments_nodes[0], comparison_function_name);
}
QueryTreeNodes tuple_arguments_equals_functions;
@ -108,45 +112,44 @@ private:
tuple_arguments_equals_functions.push_back(std::move(equals_function));
}
node = makeEquivalentTupleComparisonFunction(std::move(tuple_arguments_equals_functions), comparison_function_name);
return makeEquivalentTupleComparisonFunction(std::move(tuple_arguments_equals_functions), comparison_function_name);
}
void tryOptimizeComparisonTupleFunctionAndConstant(QueryTreeNodePtr & node,
QueryTreeNodePtr tryOptimizeComparisonTupleFunctionAndConstant(
const QueryTreeNodePtr & function_node,
const QueryTreeNodePtr & constant_node,
const std::string & comparison_function_name) const
{
const auto & function_node_typed = function_node->as<FunctionNode &>();
if (function_node_typed.getFunctionName() != "tuple")
return;
return {};
auto & constant_node_typed = constant_node->as<ConstantNode &>();
const auto & constant_node_value = constant_node_typed.getValue();
if (constant_node_value.getType() != Field::Types::Which::Tuple)
return;
return {};
const auto & constant_tuple = constant_node_value.get<const Tuple &>();
const auto & function_arguments_nodes = function_node_typed.getArguments().getNodes();
size_t function_arguments_nodes_size = function_arguments_nodes.size();
if (function_arguments_nodes_size != constant_tuple.size())
return;
return {};
auto constant_node_result_type = constant_node_typed.getResultType();
const auto * tuple_data_type = typeid_cast<const DataTypeTuple *>(constant_node_result_type.get());
if (!tuple_data_type)
return;
return {};
const auto & tuple_data_type_elements = tuple_data_type->getElements();
if (tuple_data_type_elements.size() != function_arguments_nodes_size)
return;
return {};
if (function_arguments_nodes_size == 1)
{
auto comparison_argument_constant_value = std::make_shared<ConstantValue>(constant_tuple[0], tuple_data_type_elements[0]);
auto comparison_argument_constant_node = std::make_shared<ConstantNode>(std::move(comparison_argument_constant_value));
node = makeComparisonFunction(function_arguments_nodes[0], std::move(comparison_argument_constant_node), comparison_function_name);
return;
return makeComparisonFunction(function_arguments_nodes[0], std::move(comparison_argument_constant_node), comparison_function_name);
}
QueryTreeNodes tuple_arguments_equals_functions;
@ -160,7 +163,7 @@ private:
tuple_arguments_equals_functions.push_back(std::move(equals_function));
}
node = makeEquivalentTupleComparisonFunction(std::move(tuple_arguments_equals_functions), comparison_function_name);
return makeEquivalentTupleComparisonFunction(std::move(tuple_arguments_equals_functions), comparison_function_name);
}
QueryTreeNodePtr makeEquivalentTupleComparisonFunction(QueryTreeNodes tuple_arguments_equals_functions,

View File

@ -61,6 +61,8 @@ public:
return;
auto & count_distinct_argument_column = count_distinct_arguments_nodes[0];
if (count_distinct_argument_column->getNodeType() != QueryTreeNodeType::COLUMN)
return;
auto & count_distinct_argument_column_typed = count_distinct_argument_column->as<ColumnNode &>();
/// Build subquery SELECT count_distinct_argument_column FROM table_expression GROUP BY count_distinct_argument_column

View File

@ -396,7 +396,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
String backup_name_for_logging = backup_info.toStringForLogging();
String base_backup_name;
if (backup_settings.base_backup_info)
base_backup_name = backup_settings.base_backup_info->toString();
base_backup_name = backup_settings.base_backup_info->toStringForLogging();
try
{
@ -750,7 +750,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
String backup_name_for_logging = backup_info.toStringForLogging();
String base_backup_name;
if (restore_settings.base_backup_info)
base_backup_name = restore_settings.base_backup_info->toString();
base_backup_name = restore_settings.base_backup_info->toStringForLogging();
addInfo(restore_id, backup_name_for_logging, base_backup_name, restore_settings.internal, BackupStatus::RESTORING);

View File

@ -118,18 +118,18 @@ ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const
return result;
}
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
AsyncCallback async_callback,
std::optional<bool> skip_unavailable_endpoints)
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
AsyncCallback async_callback,
std::optional<bool> skip_unavailable_endpoints,
GetPriorityForLoadBalancing::Func priority_func)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback);
};
{ return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback); };
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
std::vector<Entry> entries;
entries.reserve(results.size());
@ -153,17 +153,17 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
const ConnectionTimeouts & timeouts,
const Settings & settings, PoolMode pool_mode,
const Settings & settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check,
AsyncCallback async_callback,
std::optional<bool> skip_unavailable_endpoints)
std::optional<bool> skip_unavailable_endpoints,
GetPriorityForLoadBalancing::Func priority_func)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback);
};
{ return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback); };
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
}
ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::makeGetPriorityFunc(const Settings & settings)
@ -175,14 +175,16 @@ ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::ma
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyImpl(
const Settings & settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints)
const Settings & settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints,
GetPriorityForLoadBalancing::Func priority_func)
{
if (nested_pools.empty())
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
"Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
throw DB::Exception(
DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
"Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
if (!skip_unavailable_endpoints.has_value())
skip_unavailable_endpoints = settings.skip_unavailable_shards;
@ -203,14 +205,13 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
else
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown pool allocation mode");
GetPriorityFunc get_priority = makeGetPriorityFunc(settings);
if (!priority_func)
priority_func = makeGetPriorityFunc(settings);
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
bool fallback_to_stale_replicas = settings.fallback_to_stale_replicas_for_distributed_queries.value;
return Base::getMany(min_entries, max_entries, max_tries,
max_ignored_errors, fallback_to_stale_replicas,
try_get_entry, get_priority);
return Base::getMany(min_entries, max_entries, max_tries, max_ignored_errors, fallback_to_stale_replicas, try_get_entry, priority_func);
}
ConnectionPoolWithFailover::TryResult
@ -251,11 +252,14 @@ ConnectionPoolWithFailover::tryGetEntry(
return result;
}
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> ConnectionPoolWithFailover::getShuffledPools(const Settings & settings)
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool>
ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPriorityForLoadBalancing::Func priority_func)
{
GetPriorityFunc get_priority = makeGetPriorityFunc(settings);
if (!priority_func)
priority_func = makeGetPriorityFunc(settings);
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
return Base::getShuffledPools(max_ignored_errors, get_priority);
return Base::getShuffledPools(max_ignored_errors, priority_func);
}
}

View File

@ -54,10 +54,13 @@ public:
/** Allocates up to the specified number of connections to work.
* Connections provide access to different replicas of one shard.
*/
std::vector<Entry> getMany(const ConnectionTimeouts & timeouts,
const Settings & settings, PoolMode pool_mode,
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
std::vector<Entry> getMany(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
/// The same as getMany(), but return std::vector<TryResult>.
std::vector<TryResult> getManyForTableFunction(const ConnectionTimeouts & timeouts,
@ -69,12 +72,13 @@ public:
/// The same as getMany(), but check that replication delay for table_to_check is acceptable.
/// Delay threshold is taken from settings.
std::vector<TryResult> getManyChecked(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check,
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check,
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
struct NestedPoolStatus
{
@ -87,7 +91,7 @@ public:
using Status = std::vector<NestedPoolStatus>;
Status getStatus() const;
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings);
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {});
size_t getMaxErrorCup() const { return Base::max_error_cap; }
@ -96,13 +100,16 @@ public:
Base::updateSharedErrorCounts(shuffled_pools);
}
size_t getPoolSize() const { return Base::getPoolSize(); }
private:
/// Get the values of relevant settings and call Base::getMany()
std::vector<TryResult> getManyImpl(
const Settings & settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
const Settings & settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
/// Try to get a connection from the pool and check that it is good.
/// If table_to_check is not null and the check is enabled in settings, check that replication delay
@ -115,7 +122,7 @@ private:
const QualifiedTableName * table_to_check = nullptr,
AsyncCallback async_callback = {});
GetPriorityFunc makeGetPriorityFunc(const Settings & settings);
GetPriorityForLoadBalancing::Func makeGetPriorityFunc(const Settings & settings);
GetPriorityForLoadBalancing get_priority_load_balancing;
};

View File

@ -28,16 +28,18 @@ HedgedConnections::HedgedConnections(
const ThrottlerPtr & throttler_,
PoolMode pool_mode,
std::shared_ptr<QualifiedTableName> table_to_check_,
AsyncCallback async_callback)
AsyncCallback async_callback,
GetPriorityForLoadBalancing::Func priority_func)
: hedged_connections_factory(
pool_,
context_->getSettingsRef(),
timeouts_,
context_->getSettingsRef().connections_with_failover_max_tries.value,
context_->getSettingsRef().fallback_to_stale_replicas_for_distributed_queries.value,
context_->getSettingsRef().max_parallel_replicas.value,
context_->getSettingsRef().skip_unavailable_shards.value,
table_to_check_)
pool_,
context_->getSettingsRef(),
timeouts_,
context_->getSettingsRef().connections_with_failover_max_tries.value,
context_->getSettingsRef().fallback_to_stale_replicas_for_distributed_queries.value,
context_->getSettingsRef().max_parallel_replicas.value,
context_->getSettingsRef().skip_unavailable_shards.value,
table_to_check_,
priority_func)
, context(std::move(context_))
, settings(context->getSettingsRef())
, throttler(throttler_)

View File

@ -70,13 +70,15 @@ public:
size_t index;
};
HedgedConnections(const ConnectionPoolWithFailoverPtr & pool_,
ContextPtr context_,
const ConnectionTimeouts & timeouts_,
const ThrottlerPtr & throttler,
PoolMode pool_mode,
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr,
AsyncCallback async_callback = {});
HedgedConnections(
const ConnectionPoolWithFailoverPtr & pool_,
ContextPtr context_,
const ConnectionTimeouts & timeouts_,
const ThrottlerPtr & throttler,
PoolMode pool_mode,
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr,
AsyncCallback async_callback = {},
GetPriorityForLoadBalancing::Func priority_func = {});
void sendScalarsData(Scalars & data) override;

View File

@ -29,7 +29,8 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
bool fallback_to_stale_replicas_,
UInt64 max_parallel_replicas_,
bool skip_unavailable_shards_,
std::shared_ptr<QualifiedTableName> table_to_check_)
std::shared_ptr<QualifiedTableName> table_to_check_,
GetPriorityForLoadBalancing::Func priority_func)
: pool(pool_)
, timeouts(timeouts_)
, table_to_check(table_to_check_)
@ -39,7 +40,7 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
, max_parallel_replicas(max_parallel_replicas_)
, skip_unavailable_shards(skip_unavailable_shards_)
{
shuffled_pools = pool->getShuffledPools(settings_);
shuffled_pools = pool->getShuffledPools(settings_, priority_func);
for (auto shuffled_pool : shuffled_pools)
replicas.emplace_back(std::make_unique<ConnectionEstablisherAsync>(shuffled_pool.pool, &timeouts, settings_, log, table_to_check.get()));
}
@ -323,8 +324,7 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::processFinishedConnect
else
{
ShuffledPool & shuffled_pool = shuffled_pools[index];
LOG_WARNING(
log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message);
LOG_INFO(log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message);
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1);

View File

@ -53,7 +53,8 @@ public:
bool fallback_to_stale_replicas_,
UInt64 max_parallel_replicas_,
bool skip_unavailable_shards_,
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr);
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr,
GetPriorityForLoadBalancing::Func priority_func = {});
/// Create and return active connections according to pool_mode.
std::vector<Connection *> getManyConnections(PoolMode pool_mode, AsyncCallback async_callback = {});

View File

@ -48,11 +48,11 @@ void prefaultPages([[maybe_unused]] void * buf_, [[maybe_unused]] size_t len_)
return;
auto [buf, len] = adjustToPageSize(buf_, len_, page_size);
if (auto res = ::madvise(buf, len, MADV_POPULATE_WRITE); res < 0)
if (::madvise(buf, len, MADV_POPULATE_WRITE) < 0)
LOG_TRACE(
LogFrequencyLimiter(&Poco::Logger::get("Allocator"), 1),
"Attempt to populate pages failed: {} (EINVAL is expected for kernels < 5.14)",
errnoToString(res));
errnoToString(errno));
#endif
}

View File

@ -5,15 +5,15 @@
#include <Common/LRUCachePolicy.h>
#include <Common/SLRUCachePolicy.h>
#include <base/UUID.h>
#include <base/defines.h>
#include <atomic>
#include <cassert>
#include <chrono>
#include <memory>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <base/defines.h>
namespace DB
{
@ -227,10 +227,10 @@ public:
cache_policy->setMaxSizeInBytes(max_size_in_bytes);
}
void setQuotaForUser(const String & user_name, size_t max_size_in_bytes, size_t max_entries)
void setQuotaForUser(const UUID & user_id, size_t max_size_in_bytes, size_t max_entries)
{
std::lock_guard lock(mutex);
cache_policy->setQuotaForUser(user_name, max_size_in_bytes, max_entries);
cache_policy->setQuotaForUser(user_id, max_size_in_bytes, max_entries);
}
virtual ~CacheBase() = default;

View File

@ -93,7 +93,10 @@ inline bool cpuid(UInt32 op, UInt32 * res) noexcept /// NOLINT
OP(CLFLUSHOPT) \
OP(CLWB) \
OP(XSAVE) \
OP(OSXSAVE)
OP(OSXSAVE) \
OP(AMXBF16) \
OP(AMXTILE) \
OP(AMXINT8)
union CpuInfo
{
@ -313,6 +316,35 @@ bool haveRDRAND() noexcept
return CpuInfo(0x0).registers.eax >= 0x7 && ((CpuInfo(0x1).registers.ecx >> 30) & 1u);
}
inline bool haveAMX() noexcept
{
#if defined(__x86_64__) || defined(__i386__)
// http://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
return haveOSXSAVE() // implies haveXSAVE()
&& ((our_xgetbv(0) >> 17) & 0x3) == 0x3; // AMX state are enabled by OS
#else
return false;
#endif
}
bool haveAMXBF16() noexcept
{
return haveAMX()
&& ((CpuInfo(0x7, 0).registers.edx >> 22) & 1u); // AMX-BF16 bit
}
bool haveAMXTILE() noexcept
{
return haveAMX()
&& ((CpuInfo(0x7, 0).registers.edx >> 24) & 1u); // AMX-TILE bit
}
bool haveAMXINT8() noexcept
{
return haveAMX()
&& ((CpuInfo(0x7, 0).registers.edx >> 25) & 1u); // AMX-INT8 bit
}
struct CpuFlagsCache
{
#define DEF_NAME(X) static inline bool have_##X = have##X();

View File

@ -2,6 +2,7 @@
#include <Common/DateLUT.h>
#include <Common/Exception.h>
#include <Common/re2.h>
#include <chrono>
#include <filesystem>
@ -11,15 +12,6 @@
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
namespace fs = std::filesystem;
namespace DB

View File

@ -9,7 +9,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
std::function<Priority(size_t index)> GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const
GetPriorityForLoadBalancing::Func
GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const
{
std::function<Priority(size_t index)> get_priority;
switch (load_balance)
@ -33,19 +34,26 @@ std::function<Priority(size_t index)> GetPriorityForLoadBalancing::getPriorityFu
get_priority = [offset](size_t i) { return i != offset ? Priority{1} : Priority{0}; };
break;
case LoadBalancing::ROUND_ROBIN:
if (last_used >= pool_size)
last_used = 0;
auto local_last_used = last_used % pool_size;
++last_used;
/* Consider pool_size equals to 5
* last_used = 1 -> get_priority: 0 1 2 3 4
* last_used = 2 -> get_priority: 4 0 1 2 3
* last_used = 3 -> get_priority: 4 3 0 1 2
* ...
* */
get_priority = [this, pool_size](size_t i)
// Example: pool_size = 5
// | local_last_used | i=0 | i=1 | i=2 | i=3 | i=4 |
// | 0 | 4 | 0 | 1 | 2 | 3 |
// | 1 | 3 | 4 | 0 | 1 | 2 |
// | 2 | 2 | 3 | 4 | 0 | 1 |
// | 3 | 1 | 2 | 3 | 4 | 0 |
// | 4 | 0 | 1 | 2 | 3 | 4 |
get_priority = [pool_size, local_last_used](size_t i)
{
++i; // To make `i` indexing start with 1 instead of 0 as `last_used` does
return Priority{static_cast<Int64>(i < last_used ? pool_size - i : i - last_used)};
size_t priority = pool_size - 1;
if (i < local_last_used)
priority = pool_size - 1 - (local_last_used - i);
if (i > local_last_used)
priority = i - local_last_used - 1;
return Priority{static_cast<Int64>(priority)};
};
break;
}

View File

@ -8,7 +8,12 @@ namespace DB
class GetPriorityForLoadBalancing
{
public:
explicit GetPriorityForLoadBalancing(LoadBalancing load_balancing_) : load_balancing(load_balancing_) {}
using Func = std::function<Priority(size_t index)>;
explicit GetPriorityForLoadBalancing(LoadBalancing load_balancing_, size_t last_used_ = 0)
: load_balancing(load_balancing_), last_used(last_used_)
{
}
GetPriorityForLoadBalancing() = default;
bool operator == (const GetPriorityForLoadBalancing & other) const
@ -23,7 +28,7 @@ public:
return !(*this == other);
}
std::function<Priority(size_t index)> getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const;
Func getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const;
std::vector<size_t> hostname_prefix_distance; /// Prefix distances from name of this host to the names of hosts of pools.
std::vector<size_t> hostname_levenshtein_distance; /// Levenshtein Distances from name of this host to the names of hosts of pools.

View File

@ -1,15 +1,7 @@
#include <Common/HTTPHeaderFilter.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/Exception.h>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
#include <Common/re2.h>
namespace DB
{

View File

@ -2,10 +2,11 @@
#include <Common/Exception.h>
#include <Common/ICachePolicyUserQuota.h>
#include <base/UUID.h>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
namespace DB
{
@ -43,7 +44,7 @@ public:
virtual void setMaxCount(size_t /*max_count*/) = 0;
virtual void setMaxSizeInBytes(size_t /*max_size_in_bytes*/) = 0;
virtual void setQuotaForUser(const String & user_name, size_t max_size_in_bytes, size_t max_entries) { user_quotas->setQuotaForUser(user_name, max_size_in_bytes, max_entries); }
virtual void setQuotaForUser(const UUID & user_id, size_t max_size_in_bytes, size_t max_entries) { user_quotas->setQuotaForUser(user_id, max_size_in_bytes, max_entries); }
/// HashFunction usually hashes the entire key and the found key will be equal the provided key. In such cases, use get(). It is also
/// possible to store other, non-hashed data in the key. In that case, the found key is potentially different from the provided key.

View File

@ -1,5 +1,6 @@
#pragma once
#include <base/UUID.h>
#include <base/types.h>
namespace DB
@ -15,14 +16,14 @@ class ICachePolicyUserQuota
{
public:
/// Register or update the user's quota for the given resource.
virtual void setQuotaForUser(const String & user_name, size_t max_size_in_bytes, size_t max_entries) = 0;
virtual void setQuotaForUser(const UUID & user_id, size_t max_size_in_bytes, size_t max_entries) = 0;
/// Update the actual resource usage for the given user.
virtual void increaseActual(const String & user_name, size_t entry_size_in_bytes) = 0;
virtual void decreaseActual(const String & user_name, size_t entry_size_in_bytes) = 0;
virtual void increaseActual(const UUID & user_id, size_t entry_size_in_bytes) = 0;
virtual void decreaseActual(const UUID & user_id, size_t entry_size_in_bytes) = 0;
/// Is the user allowed to write a new entry into the cache?
virtual bool approveWrite(const String & user_name, size_t entry_size_in_bytes) const = 0;
virtual bool approveWrite(const UUID & user_id, size_t entry_size_in_bytes) const = 0;
virtual ~ICachePolicyUserQuota() = default;
};
@ -33,10 +34,10 @@ using CachePolicyUserQuotaPtr = std::unique_ptr<ICachePolicyUserQuota>;
class NoCachePolicyUserQuota : public ICachePolicyUserQuota
{
public:
void setQuotaForUser(const String & /*user_name*/, size_t /*max_size_in_bytes*/, size_t /*max_entries*/) override {}
void increaseActual(const String & /*user_name*/, size_t /*entry_size_in_bytes*/) override {}
void decreaseActual(const String & /*user_name*/, size_t /*entry_size_in_bytes*/) override {}
bool approveWrite(const String & /*user_name*/, size_t /*entry_size_in_bytes*/) const override { return true; }
void setQuotaForUser(const UUID & /*user_id*/, size_t /*max_size_in_bytes*/, size_t /*max_entries*/) override {}
void increaseActual(const UUID & /*user_id*/, size_t /*entry_size_in_bytes*/) override {}
void decreaseActual(const UUID & /*user_id*/, size_t /*entry_size_in_bytes*/) override {}
bool approveWrite(const UUID & /*user_id*/, size_t /*entry_size_in_bytes*/) const override { return true; }
};

88
src/Common/Jemalloc.cpp Normal file
View File

@ -0,0 +1,88 @@
#include <Common/Jemalloc.h>
#if USE_JEMALLOC
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <jemalloc/jemalloc.h>
#define STRINGIFY_HELPER(x) #x
#define STRINGIFY(x) STRINGIFY_HELPER(x)
namespace ProfileEvents
{
extern const Event MemoryAllocatorPurge;
extern const Event MemoryAllocatorPurgeTimeMicroseconds;
}
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
void purgeJemallocArenas()
{
LOG_TRACE(&Poco::Logger::get("SystemJemalloc"), "Purging unused memory");
Stopwatch watch;
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, watch.elapsedMicroseconds());
}
void checkJemallocProfilingEnabled()
{
bool active = true;
size_t active_size = sizeof(active);
mallctl("opt.prof", &active, &active_size, nullptr, 0);
if (!active)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"ClickHouse was started without enabling profiling for jemalloc. To use jemalloc's profiler, following env variable should be "
"set: MALLOC_CONF=background_thread:true,prof:true");
}
void setJemallocProfileActive(bool value)
{
checkJemallocProfilingEnabled();
bool active = true;
size_t active_size = sizeof(active);
mallctl("prof.active", &active, &active_size, nullptr, 0);
if (active == value)
{
LOG_TRACE(&Poco::Logger::get("SystemJemalloc"), "Profiling is already {}", active ? "enabled" : "disabled");
return;
}
mallctl("prof.active", nullptr, nullptr, &value, sizeof(bool));
LOG_TRACE(&Poco::Logger::get("SystemJemalloc"), "Profiling is {}", value ? "enabled" : "disabled");
}
std::string flushJemallocProfile(const std::string & file_prefix)
{
checkJemallocProfilingEnabled();
char * prefix_buffer;
size_t prefix_size = sizeof(prefix_buffer);
int n = mallctl("opt.prof_prefix", &prefix_buffer, &prefix_size, nullptr, 0);
if (!n && std::string_view(prefix_buffer) != "jeprof")
{
LOG_TRACE(&Poco::Logger::get("SystemJemalloc"), "Flushing memory profile with prefix {}", prefix_buffer);
mallctl("prof.dump", nullptr, nullptr, nullptr, 0);
return prefix_buffer;
}
static std::atomic<size_t> profile_counter{0};
std::string profile_dump_path = fmt::format("{}.{}.{}.heap", file_prefix, getpid(), profile_counter.fetch_add(1));
const auto * profile_dump_path_str = profile_dump_path.c_str();
LOG_TRACE(&Poco::Logger::get("SystemJemalloc"), "Flushing memory profile to {}", profile_dump_path_str);
mallctl("prof.dump", nullptr, nullptr, &profile_dump_path_str, sizeof(profile_dump_path_str));
return profile_dump_path;
}
}
#endif

22
src/Common/Jemalloc.h Normal file
View File

@ -0,0 +1,22 @@
#pragma once
#include "config.h"
#if USE_JEMALLOC
#include <string>
namespace DB
{
void purgeJemallocArenas();
void checkJemallocProfilingEnabled();
void setJemallocProfileActive(bool value);
std::string flushJemallocProfile(const std::string & file_prefix);
}
#endif

View File

@ -484,7 +484,7 @@ OptimizedRegularExpression::OptimizedRegularExpression(const std::string & regex
if (!is_trivial)
{
/// Compile the re2 regular expression.
typename re2::RE2::Options regexp_options;
re2::RE2::Options regexp_options;
/// Never write error messages to stderr. It's ignorant to do it from library code.
regexp_options.set_log_errors(false);

View File

@ -5,17 +5,9 @@
#include <memory>
#include <optional>
#include <Common/StringSearcher.h>
#include <Common/re2.h>
#include "config.h"
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
/** Uses two ways to optimize a regular expression:
* 1. If the regular expression is trivial (reduces to finding a substring in a string),
* then replaces the search with strstr or strcasestr.

View File

@ -124,7 +124,9 @@ public:
size_t max_ignored_errors,
bool fallback_to_stale_replicas,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority = GetPriorityFunc());
const GetPriorityFunc & get_priority);
size_t getPoolSize() const { return nested_pools.size(); }
protected:
@ -147,7 +149,7 @@ protected:
return std::make_tuple(shared_pool_states, nested_pools, last_error_decrease_time);
}
NestedPools nested_pools;
const NestedPools nested_pools;
const time_t decrease_error_period;
const size_t max_error_cap;

View File

@ -602,6 +602,19 @@ The server successfully detected this situation and will download merged part fr
M(LogError, "Number of log messages with level Error") \
M(LogFatal, "Number of log messages with level Fatal") \
\
M(InterfaceHTTPSendBytes, "Number of bytes sent through HTTP interfaces") \
M(InterfaceHTTPReceiveBytes, "Number of bytes received through HTTP interfaces") \
M(InterfaceNativeSendBytes, "Number of bytes sent through native interfaces") \
M(InterfaceNativeReceiveBytes, "Number of bytes received through native interfaces") \
M(InterfacePrometheusSendBytes, "Number of bytes sent through Prometheus interfaces") \
M(InterfacePrometheusReceiveBytes, "Number of bytes received through Prometheus interfaces") \
M(InterfaceInterserverSendBytes, "Number of bytes sent through interserver interfaces") \
M(InterfaceInterserverReceiveBytes, "Number of bytes received through interserver interfaces") \
M(InterfaceMySQLSendBytes, "Number of bytes sent through MySQL interfaces") \
M(InterfaceMySQLReceiveBytes, "Number of bytes received through MySQL interfaces") \
M(InterfacePostgreSQLSendBytes, "Number of bytes sent through PostgreSQL interfaces") \
M(InterfacePostgreSQLReceiveBytes, "Number of bytes received through PostgreSQL interfaces") \
\
M(ParallelReplicasUsedCount, "Number of replicas used to execute a query with task-based parallel replicas") \
#ifdef APPLY_FOR_EXTERNAL_EVENTS

View File

@ -3,17 +3,9 @@
#include <Common/RemoteHostFilter.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/Exception.h>
#include <Common/re2.h>
#include <IO/WriteHelpers.h>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
namespace DB
{
namespace ErrorCodes

View File

@ -4,18 +4,10 @@
#include <string>
#include <atomic>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/logger_useful.h>
#include <Common/re2.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/ICachePolicy.h>
#include <base/UUID.h>
#include <limits>
#include <unordered_map>
@ -11,37 +12,37 @@ namespace DB
class PerUserTTLCachePolicyUserQuota : public ICachePolicyUserQuota
{
public:
void setQuotaForUser(const String & user_name, size_t max_size_in_bytes, size_t max_entries) override
void setQuotaForUser(const UUID & user_id, size_t max_size_in_bytes, size_t max_entries) override
{
quotas[user_name] = {max_size_in_bytes, max_entries};
quotas[user_id] = {max_size_in_bytes, max_entries};
}
void increaseActual(const String & user_name, size_t entry_size_in_bytes) override
void increaseActual(const UUID & user_id, size_t entry_size_in_bytes) override
{
auto & actual_for_user = actual[user_name];
auto & actual_for_user = actual[user_id];
actual_for_user.size_in_bytes += entry_size_in_bytes;
actual_for_user.num_items += 1;
}
void decreaseActual(const String & user_name, size_t entry_size_in_bytes) override
void decreaseActual(const UUID & user_id, size_t entry_size_in_bytes) override
{
chassert(actual.contains(user_name));
chassert(actual.contains(user_id));
chassert(actual[user_name].size_in_bytes >= entry_size_in_bytes);
actual[user_name].size_in_bytes -= entry_size_in_bytes;
chassert(actual[user_id].size_in_bytes >= entry_size_in_bytes);
actual[user_id].size_in_bytes -= entry_size_in_bytes;
chassert(actual[user_name].num_items >= 1);
actual[user_name].num_items -= 1;
chassert(actual[user_id].num_items >= 1);
actual[user_id].num_items -= 1;
}
bool approveWrite(const String & user_name, size_t entry_size_in_bytes) const override
bool approveWrite(const UUID & user_id, size_t entry_size_in_bytes) const override
{
auto it_actual = actual.find(user_name);
auto it_actual = actual.find(user_id);
Resources actual_for_user{.size_in_bytes = 0, .num_items = 0}; /// assume zero actual resource consumption is user isn't found
if (it_actual != actual.end())
actual_for_user = it_actual->second;
auto it_quota = quotas.find(user_name);
auto it_quota = quotas.find(user_id);
Resources quota_for_user{.size_in_bytes = std::numeric_limits<size_t>::max(), .num_items = std::numeric_limits<size_t>::max()}; /// assume no threshold if no quota is found
if (it_quota != quotas.end())
quota_for_user = it_quota->second;
@ -69,10 +70,10 @@ public:
size_t num_items = 0;
};
/// user name --> cache size quota (in bytes) / number of items quota
std::map<String, Resources> quotas;
/// user name --> actual cache usage (in bytes) / number of items
std::map<String, Resources> actual;
/// user id --> cache size quota (in bytes) / number of items quota
std::map<UUID, Resources> quotas;
/// user id --> actual cache usage (in bytes) / number of items
std::map<UUID, Resources> actual;
};
@ -132,7 +133,8 @@ public:
if (it == cache.end())
return;
size_t sz = weight_function(*it->second);
Base::user_quotas->decreaseActual(it->first.user_name, sz);
if (it->first.user_id.has_value())
Base::user_quotas->decreaseActual(*it->first.user_id, sz);
cache.erase(it);
size_in_bytes -= sz;
}
@ -169,7 +171,9 @@ public:
/// Checks against per-user limits
auto sufficient_space_in_cache_for_user = [&]()
{
return Base::user_quotas->approveWrite(key.user_name, entry_size_in_bytes);
if (key.user_id.has_value())
return Base::user_quotas->approveWrite(*key.user_id, entry_size_in_bytes);
return true;
};
if (!sufficient_space_in_cache() || !sufficient_space_in_cache_for_user())
@ -179,7 +183,8 @@ public:
if (is_stale_function(it->first))
{
size_t sz = weight_function(*it->second);
Base::user_quotas->decreaseActual(it->first.user_name, sz);
if (it->first.user_id.has_value())
Base::user_quotas->decreaseActual(*it->first.user_id, sz);
it = cache.erase(it);
size_in_bytes -= sz;
}
@ -193,14 +198,16 @@ public:
if (auto it = cache.find(key); it != cache.end())
{
size_t sz = weight_function(*it->second);
Base::user_quotas->decreaseActual(it->first.user_name, sz);
if (it->first.user_id.has_value())
Base::user_quotas->decreaseActual(*it->first.user_id, sz);
cache.erase(it); // stupid bug: (*) doesn't replace existing entries (likely due to custom hash function), need to erase explicitly
size_in_bytes -= sz;
}
cache[key] = std::move(mapped); // (*)
size_in_bytes += entry_size_in_bytes;
Base::user_quotas->increaseActual(key.user_name, entry_size_in_bytes);
if (key.user_id.has_value())
Base::user_quotas->increaseActual(*key.user_id, entry_size_in_bytes);
}
}

View File

@ -23,6 +23,12 @@ UInt32 getSupportedArchs()
result |= static_cast<UInt32>(TargetArch::AVX512VBMI);
if (Cpu::CpuFlagsCache::have_AVX512VBMI2)
result |= static_cast<UInt32>(TargetArch::AVX512VBMI2);
if (Cpu::CpuFlagsCache::have_AMXBF16)
result |= static_cast<UInt32>(TargetArch::AMXBF16);
if (Cpu::CpuFlagsCache::have_AMXTILE)
result |= static_cast<UInt32>(TargetArch::AMXTILE);
if (Cpu::CpuFlagsCache::have_AMXINT8)
result |= static_cast<UInt32>(TargetArch::AMXINT8);
return result;
}
@ -44,6 +50,9 @@ String toString(TargetArch arch)
case TargetArch::AVX512BW: return "avx512bw";
case TargetArch::AVX512VBMI: return "avx512vbmi";
case TargetArch::AVX512VBMI2: return "avx512vbmi2";
case TargetArch::AMXBF16: return "amxbf16";
case TargetArch::AMXTILE: return "amxtile";
case TargetArch::AMXINT8: return "amxint8";
}
UNREACHABLE();

View File

@ -83,6 +83,9 @@ enum class TargetArch : UInt32
AVX512BW = (1 << 4),
AVX512VBMI = (1 << 5),
AVX512VBMI2 = (1 << 6),
AMXBF16 = (1 << 7),
AMXTILE = (1 << 8),
AMXINT8 = (1 << 9),
};
/// Runtime detection.

View File

@ -1,4 +1,5 @@
#include <Common/parseGlobs.h>
#include <Common/re2.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
@ -6,15 +7,6 @@
#include <sstream>
#include <iomanip>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
namespace DB
{
namespace ErrorCodes

11
src/Common/re2.h Normal file
View File

@ -0,0 +1,11 @@
#pragma once
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif

View File

@ -95,7 +95,7 @@ TEST(ObjectStorageKey, Performance)
std::cerr << "slow ratio: +" << float(elapsed_new) / elapsed_old << std::endl;
else
std::cerr << "fast ratio: " << float(elapsed_old) / elapsed_new << std::endl;
ASSERT_LT(elapsed_new, 1.2 * elapsed_old);
ASSERT_LT(elapsed_new, 1.5 * elapsed_old);
}
}

View File

@ -1,15 +1,7 @@
#include <Common/parseGlobs.h>
#include <Common/re2.h>
#include <gtest/gtest.h>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
using namespace DB;

View File

@ -1,10 +1,11 @@
#include <Coordination/CoordinationSettings.h>
#include <Common/logger_useful.h>
#include <filesystem>
#include <Coordination/Defines.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteIntText.h>
#include "config.h"
namespace DB
{
namespace ErrorCodes
@ -36,7 +37,11 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
}
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl,ydld";
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD =
#if USE_JEMALLOC
"jmst,jmfp,jmep,jmdp,"
#endif
"conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl,ydld";
KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
: server_id(NOT_EXIST)

View File

@ -18,6 +18,11 @@
#include <unistd.h>
#include <bit>
#if USE_JEMALLOC
#include <Common/Jemalloc.h>
#include <jemalloc/jemalloc.h>
#endif
namespace
{
@ -175,6 +180,20 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
FourLetterCommandPtr yield_leadership_command = std::make_shared<YieldLeadershipCommand>(keeper_dispatcher);
factory.registerCommand(yield_leadership_command);
#if USE_JEMALLOC
FourLetterCommandPtr jemalloc_dump_stats = std::make_shared<JemallocDumpStats>(keeper_dispatcher);
factory.registerCommand(jemalloc_dump_stats);
FourLetterCommandPtr jemalloc_flush_profile = std::make_shared<JemallocFlushProfile>(keeper_dispatcher);
factory.registerCommand(jemalloc_flush_profile);
FourLetterCommandPtr jemalloc_enable_profile = std::make_shared<JemallocEnableProfile>(keeper_dispatcher);
factory.registerCommand(jemalloc_enable_profile);
FourLetterCommandPtr jemalloc_disable_profile = std::make_shared<JemallocDisableProfile>(keeper_dispatcher);
factory.registerCommand(jemalloc_disable_profile);
#endif
factory.initializeAllowList(keeper_dispatcher);
factory.setInitialize(true);
}
@ -588,4 +607,37 @@ String YieldLeadershipCommand::run()
return "Sent yield leadership request to leader.";
}
#if USE_JEMALLOC
void printToString(void * output, const char * data)
{
std::string * output_data = reinterpret_cast<std::string *>(output);
*output_data += std::string(data);
}
String JemallocDumpStats::run()
{
std::string output;
malloc_stats_print(printToString, &output, nullptr);
return output;
}
String JemallocFlushProfile::run()
{
return flushJemallocProfile("/tmp/jemalloc_keeper");
}
String JemallocEnableProfile::run()
{
setJemallocProfileActive(true);
return "ok";
}
String JemallocDisableProfile::run()
{
setJemallocProfileActive(false);
return "ok";
}
#endif
}

View File

@ -428,4 +428,55 @@ struct YieldLeadershipCommand : public IFourLetterCommand
~YieldLeadershipCommand() override = default;
};
#if USE_JEMALLOC
struct JemallocDumpStats : public IFourLetterCommand
{
explicit JemallocDumpStats(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "jmst"; }
String run() override;
~JemallocDumpStats() override = default;
};
struct JemallocFlushProfile : public IFourLetterCommand
{
explicit JemallocFlushProfile(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "jmfp"; }
String run() override;
~JemallocFlushProfile() override = default;
};
struct JemallocEnableProfile : public IFourLetterCommand
{
explicit JemallocEnableProfile(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "jmep"; }
String run() override;
~JemallocEnableProfile() override = default;
};
struct JemallocDisableProfile : public IFourLetterCommand
{
explicit JemallocDisableProfile(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "jmdp"; }
String run() override;
~JemallocDisableProfile() override = default;
};
#endif
}

View File

@ -15,16 +15,10 @@
#include <atomic>
#include <future>
#include <chrono>
#include <filesystem>
#include <iterator>
#include <limits>
#if USE_JEMALLOC
# include <jemalloc/jemalloc.h>
#define STRINGIFY_HELPER(x) #x
#define STRINGIFY(x) STRINGIFY_HELPER(x)
#include <Common/Jemalloc.h>
#endif
namespace CurrentMetrics
@ -33,12 +27,6 @@ namespace CurrentMetrics
extern const Metric KeeperOutstandingRequets;
}
namespace ProfileEvents
{
extern const Event MemoryAllocatorPurge;
extern const Event MemoryAllocatorPurgeTimeMicroseconds;
}
using namespace std::chrono_literals;
namespace DB
@ -986,11 +974,7 @@ Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
void KeeperDispatcher::cleanResources()
{
#if USE_JEMALLOC
LOG_TRACE(&Poco::Logger::get("KeeperDispatcher"), "Purging unused memory");
Stopwatch watch;
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, watch.elapsedMicroseconds());
purgeJemallocArenas();
#endif
}

View File

@ -10,7 +10,9 @@
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeCustomGeo.h>
#include <Common/typeid_cast.h>
#include <Common/logger_useful.h>
namespace DB
@ -41,6 +43,12 @@ void ExternalResultDescription::init(const Block & sample_block_)
DataTypePtr type_not_nullable = removeNullable(elem.type);
const IDataType * type = type_not_nullable.get();
if (dynamic_cast<const DataTypePointName *>(type->getCustomName()))
{
types.emplace_back(ValueType::vtPoint, is_nullable);
continue;
}
WhichDataType which(type);
if (which.isUInt8())

View File

@ -35,7 +35,8 @@ struct ExternalResultDescription
vtDecimal128,
vtDecimal256,
vtArray,
vtFixedString
vtFixedString,
vtPoint,
};
Block sample_block;

View File

@ -24,6 +24,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
}
@ -162,6 +163,8 @@ void insertPostgreSQLValue(
assert_cast<ColumnArray &>(column).insert(Array(dimensions[1].begin(), dimensions[1].end()));
break;
}
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported value type");
}
}

View File

@ -10,13 +10,15 @@
#include "DataTypeDate32.h"
#include "DataTypeDateTime.h"
#include "DataTypeDateTime64.h"
#include "DataTypeEnum.h"
#include "DataTypesDecimal.h"
#include "DataTypeFixedString.h"
#include "DataTypeNullable.h"
#include "DataTypeString.h"
#include "DataTypesNumber.h"
#include "DataTypeCustomGeo.h"
#include "DataTypeFactory.h"
#include "IDataType.h"
#include <Common/logger_useful.h>
namespace DB
{
@ -118,6 +120,10 @@ DataTypePtr convertMySQLDataType(MultiEnum<MySQLDataTypesSupport> type_support,
else if (precision <= DecimalUtils::max_precision<Decimal256>)
res = std::make_shared<DataTypeDecimal<Decimal256>>(precision, scale);
}
else if (type_name == "point")
{
res = DataTypeFactory::instance().get("Point");
}
/// Also String is fallback for all unknown types.
if (!res)

View File

@ -14,20 +14,12 @@
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/IStorage.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/re2.h>
#include <Poco/URI.h>
#include <filesystem>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
namespace fs = std::filesystem;
namespace DB

View File

@ -3,6 +3,30 @@
namespace DB
{
String toString(DataSourceType data_source_type)
{
switch (data_source_type)
{
case DataSourceType::Local:
return "local";
case DataSourceType::RAM:
return "memory";
case DataSourceType::S3:
return "s3";
case DataSourceType::S3_Plain:
return "s3_plain";
case DataSourceType::HDFS:
return "hdfs";
case DataSourceType::WebServer:
return "web";
case DataSourceType::AzureBlobStorage:
return "azure_blob_storage";
case DataSourceType::LocalBlobStorage:
return "local_blob_storage";
}
std::unreachable;
}
bool DataSourceDescription::operator==(const DataSourceDescription & other) const
{
return std::tie(type, description, is_encrypted) == std::tie(other.type, other.description, other.is_encrypted);

View File

@ -18,29 +18,7 @@ enum class DataSourceType
LocalBlobStorage,
};
inline String toString(DataSourceType data_source_type)
{
switch (data_source_type)
{
case DataSourceType::Local:
return "local";
case DataSourceType::RAM:
return "memory";
case DataSourceType::S3:
return "s3";
case DataSourceType::S3_Plain:
return "s3_plain";
case DataSourceType::HDFS:
return "hdfs";
case DataSourceType::WebServer:
return "web";
case DataSourceType::AzureBlobStorage:
return "azure_blob_storage";
case DataSourceType::LocalBlobStorage:
return "local_blob_storage";
}
UNREACHABLE();
}
String toString(DataSourceType data_source_type);
struct DataSourceDescription
{

View File

@ -3,19 +3,11 @@
#if USE_AZURE_BLOB_STORAGE
#include <Common/Exception.h>
#include <Common/re2.h>
#include <optional>
#include <azure/identity/managed_identity_credential.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
using namespace Azure::Storage::Blobs;

View File

@ -5,15 +5,7 @@
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
#include <Common/re2.h>
namespace CurrentMetrics
{

View File

@ -100,6 +100,7 @@ TEST_F(DiskEncryptedTest, WriteAndRead)
{
auto buf = encrypted_disk->writeFile("a.txt", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, {});
writeString(std::string_view{"Some text"}, *buf);
buf->finalize();
}
/// Now we have one file.
@ -130,6 +131,7 @@ TEST_F(DiskEncryptedTest, Append)
{
auto buf = encrypted_disk->writeFile("a.txt", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append, {});
writeString(std::string_view{"Some text"}, *buf);
buf->finalize();
}
EXPECT_EQ(encrypted_disk->getFileSize("a.txt"), 9);
@ -140,6 +142,7 @@ TEST_F(DiskEncryptedTest, Append)
{
auto buf = encrypted_disk->writeFile("a.txt", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append, {});
writeString(std::string_view{" Another text"}, *buf);
buf->finalize();
}
EXPECT_EQ(encrypted_disk->getFileSize("a.txt"), 22);
@ -156,6 +159,7 @@ TEST_F(DiskEncryptedTest, Truncate)
{
auto buf = encrypted_disk->writeFile("a.txt", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append, {});
writeString(std::string_view{"Some text"}, *buf);
buf->finalize();
}
EXPECT_EQ(encrypted_disk->getFileSize("a.txt"), 9);
@ -185,6 +189,7 @@ TEST_F(DiskEncryptedTest, ZeroFileSize)
/// Write nothing to a file.
{
auto buf = encrypted_disk->writeFile("a.txt", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, {});
buf->finalize();
}
EXPECT_EQ(encrypted_disk->getFileSize("a.txt"), 0);
@ -194,6 +199,7 @@ TEST_F(DiskEncryptedTest, ZeroFileSize)
/// Append the file with nothing.
{
auto buf = encrypted_disk->writeFile("a.txt", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append, {});
buf->finalize();
}
EXPECT_EQ(encrypted_disk->getFileSize("a.txt"), 0);
@ -219,6 +225,7 @@ TEST_F(DiskEncryptedTest, AnotherFolder)
{
auto buf = encrypted_disk->writeFile("a.txt", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, {});
writeString(std::string_view{"Some text"}, *buf);
buf->finalize();
}
/// Now we have one file.
@ -239,10 +246,13 @@ TEST_F(DiskEncryptedTest, RandomIV)
{
auto buf = encrypted_disk->writeFile("a.txt", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, {});
writeString(std::string_view{"Some text"}, *buf);
buf->finalize();
}
{
auto buf = encrypted_disk->writeFile("b.txt", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, {});
writeString(std::string_view{"Some text"}, *buf);
buf->finalize();
}
/// Now we have two files.

View File

@ -2,17 +2,9 @@
#include <base/types.h>
#include <Columns/ColumnString.h>
#include <Common/re2.h>
#include <IO/WriteHelpers.h>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
namespace DB
{

File diff suppressed because it is too large Load Diff

View File

@ -1,17 +1,9 @@
#pragma once
#include <Common/re2.h>
#include <string_view>
#include <vector>
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
namespace DB
{

View File

@ -2,13 +2,16 @@
#if USE_SQIDS
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <sqids/sqids.hpp>
@ -22,17 +25,17 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
// sqid(number1, ...)
class FunctionSqid : public IFunction
/// sqidEncode(number1, ...)
class FunctionSqidEncode : public IFunction
{
public:
static constexpr auto name = "sqid";
static constexpr auto name = "sqidEncode";
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
bool isVariadic() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionSqid>(); }
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionSqidEncode>(); }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
@ -80,21 +83,93 @@ private:
sqidscxx::Sqids<> sqids;
};
/// sqidDecode(number1, ...)
class FunctionSqidDecode : public IFunction
{
public:
static constexpr auto name = "sqidDecode";
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionSqidDecode>(); }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
FunctionArgumentDescriptors args{
{"sqid", &isString<IDataType>, nullptr, "String"}
};
validateFunctionArgumentTypes(*this, arguments, args);
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
auto col_res_nested = ColumnUInt64::create();
auto & res_nested_data = col_res_nested->getData();
auto col_res_offsets = ColumnArray::ColumnOffsets::create();
auto & res_offsets_data = col_res_offsets->getData();
res_offsets_data.reserve(input_rows_count);
const auto & src = arguments[0];
const auto & src_column = *src.column;
if (const auto * col_non_const = typeid_cast<const ColumnString *>(&src_column))
{
for (size_t i = 0; i < input_rows_count; ++i)
{
std::string_view sqid = col_non_const->getDataAt(i).toView();
std::vector<UInt64> integers = sqids.decode(sqid);
res_nested_data.insert(integers.begin(), integers.end());
res_offsets_data.push_back(integers.size());
}
}
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal argument for function {}", name);
return ColumnArray::create(std::move(col_res_nested), std::move(col_res_offsets));
}
private:
sqidscxx::Sqids<> sqids;
};
REGISTER_FUNCTION(Sqid)
{
factory.registerFunction<FunctionSqid>(FunctionDocumentation{
factory.registerFunction<FunctionSqidEncode>(FunctionDocumentation{
.description=R"(
Transforms numbers into a [Sqid](https://sqids.org/) which is a Youtube-like ID string.)",
.syntax="sqid(number1, ...)",
.syntax="sqidEncode(number1, ...)",
.arguments={{"number1, ...", "Arbitrarily many UInt8, UInt16, UInt32 or UInt64 arguments"}},
.returned_value="A hash id [String](/docs/en/sql-reference/data-types/string.md).",
.examples={
{"simple",
"SELECT sqid(1, 2, 3, 4, 5);",
"SELECT sqidEncode(1, 2, 3, 4, 5);",
R"(
sqid(1, 2, 3, 4, 5)
gXHfJ1C6dN
sqidEncode(1, 2, 3, 4, 5)
gXHfJ1C6dN
)"
}}
});
factory.registerAlias("sqid", FunctionSqidEncode::name);
factory.registerFunction<FunctionSqidDecode>(FunctionDocumentation{
.description=R"(
Transforms a [Sqid](https://sqids.org/) back into an array of numbers.)",
.syntax="sqidDecode(number1, ...)",
.arguments={{"sqid", "A sqid"}},
.returned_value="An array of [UInt64](/docs/en/sql-reference/data-types/int-uint.md).",
.examples={
{"simple",
"SELECT sqidDecode('gXHfJ1C6dN');",
R"(
sqidDecode('gXHfJ1C6dN')
[1,2,3,4,5]
)"
}}
});

View File

@ -13,33 +13,14 @@ namespace ErrorCodes
}
class BrotliWriteBuffer::BrotliStateWrapper
BrotliWriteBuffer::BrotliStateWrapper::BrotliStateWrapper()
: state(BrotliEncoderCreateInstance(nullptr, nullptr, nullptr))
{
public:
BrotliStateWrapper()
: state(BrotliEncoderCreateInstance(nullptr, nullptr, nullptr))
{
}
}
~BrotliStateWrapper()
{
BrotliEncoderDestroyInstance(state);
}
BrotliEncoderState * state;
};
BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
BrotliWriteBuffer::BrotliStateWrapper::~BrotliStateWrapper()
{
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast<uint32_t>(compression_level));
// Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81)
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_LGWIN, 24);
BrotliEncoderDestroyInstance(state);
}
BrotliWriteBuffer::~BrotliWriteBuffer() = default;
@ -58,18 +39,20 @@ void BrotliWriteBuffer::nextImpl()
{
do
{
const auto * in_data_ptr = in_data;
out->nextIfAtEnd();
out_data = reinterpret_cast<unsigned char *>(out->position());
out_capacity = out->buffer().end() - out->position();
int result = BrotliEncoderCompressStream(
brotli->state,
in_available ? BROTLI_OPERATION_PROCESS : BROTLI_OPERATION_FINISH,
BROTLI_OPERATION_PROCESS,
&in_available,
&in_data,
&out_capacity,
&out_data,
nullptr);
total_in += in_data - in_data_ptr;
out->position() = out->buffer().end() - out_capacity;
@ -92,6 +75,10 @@ void BrotliWriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && total_in == 0)
return;
while (true)
{
out->nextIfAtEnd();

View File

@ -4,18 +4,38 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferDecorator.h>
#include "config.h"
#if USE_BROTLI
# include <brotli/encode.h>
namespace DB
{
class BrotliWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
BrotliWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
, compress_empty(compress_empty_)
{
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast<uint32_t>(compression_level));
// Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81)
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_LGWIN, 24);
}
~BrotliWriteBuffer() override;
@ -24,7 +44,15 @@ private:
void finalizeBefore() override;
class BrotliStateWrapper;
class BrotliStateWrapper
{
public:
BrotliStateWrapper();
~BrotliStateWrapper();
BrotliEncoderState * state;
};
std::unique_ptr<BrotliStateWrapper> brotli;
@ -33,6 +61,12 @@ private:
size_t out_capacity;
uint8_t * out_data;
protected:
UInt64 total_in = 0;
bool compress_empty = true;
};
}
#endif

View File

@ -2,6 +2,7 @@
#include <Core/Defines.h>
#include <algorithm>
#include <memory>
namespace DB

View File

@ -15,34 +15,22 @@ namespace ErrorCodes
}
class Bzip2WriteBuffer::Bzip2StateWrapper
Bzip2WriteBuffer::Bzip2StateWrapper::Bzip2StateWrapper(int compression_level)
{
public:
explicit Bzip2StateWrapper(int compression_level)
{
memset(&stream, 0, sizeof(stream));
memset(&stream, 0, sizeof(stream));
int ret = BZ2_bzCompressInit(&stream, compression_level, 0, 0);
int ret = BZ2_bzCompressInit(&stream, compression_level, 0, 0);
if (ret != BZ_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder init failed: error code: {}",
ret);
}
if (ret != BZ_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder init failed: error code: {}",
ret);
}
~Bzip2StateWrapper()
{
BZ2_bzCompressEnd(&stream);
}
bz_stream stream;
};
Bzip2WriteBuffer::Bzip2WriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, bz(std::make_unique<Bzip2StateWrapper>(compression_level))
Bzip2WriteBuffer::Bzip2StateWrapper::~Bzip2StateWrapper()
{
BZ2_bzCompressEnd(&stream);
}
Bzip2WriteBuffer::~Bzip2WriteBuffer() = default;
@ -77,6 +65,8 @@ void Bzip2WriteBuffer::nextImpl()
}
while (bz->stream.avail_in > 0);
total_in += offset();
}
catch (...)
{
@ -90,6 +80,10 @@ void Bzip2WriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && total_in == 0)
return;
out->nextIfAtEnd();
bz->stream.next_out = out->position();
bz->stream.avail_out = static_cast<unsigned>(out->buffer().end() - out->position());

View File

@ -4,18 +4,29 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferDecorator.h>
#include "config.h"
#if USE_BZIP2
# include <bzlib.h>
namespace DB
{
class Bzip2WriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
Bzip2WriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), bz(std::make_unique<Bzip2StateWrapper>(compression_level))
, compress_empty(compress_empty_)
{
}
~Bzip2WriteBuffer() override;
@ -24,8 +35,20 @@ private:
void finalizeBefore() override;
class Bzip2StateWrapper;
class Bzip2StateWrapper
{
public:
explicit Bzip2StateWrapper(int compression_level);
~Bzip2StateWrapper();
bz_stream stream;
};
std::unique_ptr<Bzip2StateWrapper> bz;
bool compress_empty = true;
UInt64 total_in = 0;
};
}
#endif

View File

@ -169,37 +169,68 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
return createCompressedWrapper(std::move(nested), method, buf_size, existing_memory, alignment, zstd_window_log_max);
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level, int zstd_window_log, size_t buf_size, char * existing_memory, size_t alignment)
template<typename WriteBufferT>
std::unique_ptr<WriteBuffer> createWriteCompressedWrapper(
WriteBufferT && nested, CompressionMethod method, int level, int zstd_window_log, size_t buf_size, char * existing_memory, size_t alignment, bool compress_empty)
{
if (method == DB::CompressionMethod::Gzip || method == CompressionMethod::Zlib)
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(nested), method, level, buf_size, existing_memory, alignment);
return std::make_unique<ZlibDeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), method, level, buf_size, existing_memory, alignment, compress_empty);
#if USE_BROTLI
if (method == DB::CompressionMethod::Brotli)
return std::make_unique<BrotliWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<BrotliWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#endif
if (method == CompressionMethod::Xz)
return std::make_unique<LZMADeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<LZMADeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdDeflatingWriteBuffer>(std::move(nested), level, zstd_window_log, buf_size, existing_memory, alignment);
return std::make_unique<ZstdDeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, zstd_window_log, buf_size, existing_memory, alignment, compress_empty);
if (method == CompressionMethod::Lz4)
return std::make_unique<Lz4DeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<Lz4DeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#if USE_BZIP2
if (method == CompressionMethod::Bzip2)
return std::make_unique<Bzip2WriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<Bzip2WriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#endif
#if USE_SNAPPY
if (method == CompressionMethod::Snappy)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method");
#endif
if (method == CompressionMethod::None)
return nested;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method");
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested,
CompressionMethod method,
int level,
int zstd_window_log,
size_t buf_size,
char * existing_memory,
size_t alignment,
bool compress_empty)
{
if (method == CompressionMethod::None)
return nested;
return createWriteCompressedWrapper(nested, method, level, zstd_window_log, buf_size, existing_memory, alignment, compress_empty);
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
WriteBuffer * nested,
CompressionMethod method,
int level,
int zstd_window_log,
size_t buf_size,
char * existing_memory,
size_t alignment,
bool compress_empty)
{
assert(method != CompressionMethod::None);
return createWriteCompressedWrapper(nested, method, level, zstd_window_log, buf_size, existing_memory, alignment, compress_empty);
}
}

View File

@ -61,7 +61,6 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
char * existing_memory = nullptr,
size_t alignment = 0);
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested,
CompressionMethod method,
@ -69,6 +68,17 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
int zstd_window_log = 0,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty = true);
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
WriteBuffer * nested,
CompressionMethod method,
int level,
int zstd_window_log,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0,
bool compress_empty = true);
}

View File

@ -7,9 +7,7 @@ namespace ErrorCodes
extern const int LZMA_STREAM_ENCODER_FAILED;
}
LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
void LZMADeflatingWriteBuffer::initialize(int compression_level)
{
lstr = LZMA_STREAM_INIT;
@ -94,6 +92,10 @@ void LZMADeflatingWriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && lstr.total_out == 0)
return;
do
{
out->nextIfAtEnd();

Some files were not shown because too many files have changed in this diff Show More