Merge branch 'master' into fix_race_with_concurrent_hardlinks

This commit is contained in:
alesapin 2022-07-14 19:03:58 +02:00
commit f0d4a5c93a
75 changed files with 2334 additions and 731 deletions

View File

@ -335,6 +335,22 @@ if (COMPILER_GCC OR COMPILER_CLANG)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -falign-functions=32")
endif ()
if (ARCH_AMD64)
# align branches within a 32-Byte boundary to avoid the potential performance loss when code layout change,
# which makes benchmark results more stable.
set(BRANCHES_WITHIN_32B_BOUNDARIES "-mbranches-within-32B-boundaries")
if (COMPILER_GCC)
# gcc is in assembler, need to add "-Wa," prefix
set(BRANCHES_WITHIN_32B_BOUNDARIES "-Wa,${BRANCHES_WITHIN_32B_BOUNDARIES}")
endif()
include(CheckCXXCompilerFlag)
check_cxx_compiler_flag("${BRANCHES_WITHIN_32B_BOUNDARIES}" HAS_BRANCHES_WITHIN_32B_BOUNDARIES)
if (HAS_BRANCHES_WITHIN_32B_BOUNDARIES)
set(COMPILER_FLAGS "${COMPILER_FLAGS} ${BRANCHES_WITHIN_32B_BOUNDARIES}")
endif()
endif()
if (COMPILER_GCC)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines")
endif ()

View File

@ -12,7 +12,14 @@
#define JSON_MAX_DEPTH 100
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wdeprecated-dynamic-exception-spec"
#endif
POCO_IMPLEMENT_EXCEPTION(JSONException, Poco::Exception, "JSONException") // NOLINT(cert-err60-cpp, modernize-use-noexcept, hicpp-use-noexcept)
#ifdef __clang__
# pragma clang diagnostic pop
#endif
/// Прочитать беззнаковое целое в простом формате из не-0-terminated строки.

View File

@ -38,8 +38,14 @@
*/
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wdeprecated-dynamic-exception-spec"
#endif
POCO_DECLARE_EXCEPTION(Foundation_API, JSONException, Poco::Exception)
#ifdef __clang__
# pragma clang diagnostic pop
#endif
class JSON
{

View File

@ -29,7 +29,6 @@ if (COMPILER_CLANG)
no_warning(c99-extensions)
no_warning(conversion)
no_warning(ctad-maybe-unsupported) # clang 9+, linux-only
no_warning(deprecated-dynamic-exception-spec)
no_warning(disabled-macro-expansion)
no_warning(documentation-unknown-command)
no_warning(double-promotion)

View File

@ -19,6 +19,10 @@ The `system.part_log` table contains the following columns:
- `REGULAR_MERGE` — Some regular merge.
- `TTL_DELETE_MERGE` — Cleaning up expired data.
- `TTL_RECOMPRESS_MERGE` — Recompressing data part with the.
- `merge_algorithm` ([Enum8](../../sql-reference/data-types/enum.md)) — Merge algorithm for the event with type `MERGE_PARTS`. Can have one of the following values:
- `UNDECIDED`
- `HORIZONTAL`
- `VERTICAL`
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Event date.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Event time.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Event time with microseconds precision.
@ -52,6 +56,7 @@ Row 1:
query_id: 983ad9c7-28d5-4ae1-844e-603116b7de31
event_type: NewPart
merge_reason: NotAMerge
merge_algorithm: Undecided
event_date: 2021-02-02
event_time: 2021-02-02 11:14:28
event_time_microseconds: 2021-02-02 11:14:28.861919

View File

@ -1715,3 +1715,7 @@ Return value type is always [Float64](../../sql-reference/data-types/float.md).
│ 6 │ Float64 │
└─────┴──────────────────────────────────────────────────────────────────────────────────────────┘
```
## Distance functions
All supported functions are described in [distance functions documentation](../../sql-reference/functions/distance-functions.md).

View File

@ -0,0 +1,502 @@
# Distance functions
## L1Norm
Calculates the sum of absolute values of a vector.
**Syntax**
```sql
L1Norm(vector)
```
Alias: `normL1`.
**Arguments**
- `vector` — [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
**Returned value**
- L1-norm or [taxicab geometry](https://en.wikipedia.org/wiki/Taxicab_geometry) distance.
Type: [UInt](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Decimal](../../sql-reference/data-types/decimal.md).
**Examples**
Query:
```sql
SELECT L1Norm((1, 2));
```
Result:
```text
┌─L1Norm((1, 2))─┐
│ 3 │
└────────────────┘
```
## L2Norm
Calculates the square root of the sum of the squares of the vector values.
**Syntax**
```sql
L2Norm(vector)
```
Alias: `normL2`.
**Arguments**
- `vector` — [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
**Returned value**
- L2-norm or [Euclidean distance](https://en.wikipedia.org/wiki/Euclidean_distance).
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT L2Norm((1, 2));
```
Result:
```text
┌───L2Norm((1, 2))─┐
│ 2.23606797749979 │
└──────────────────┘
```
## LinfNorm
Calculates the maximum of absolute values of a vector.
**Syntax**
```sql
LinfNorm(vector)
```
Alias: `normLinf`.
**Arguments**
- `vector` — [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
**Returned value**
- Linf-norm or the maximum absolute value.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LinfNorm((1, -2));
```
Result:
```text
┌─LinfNorm((1, -2))─┐
│ 2 │
└───────────────────┘
```
## LpNorm
Calculates the root of `p`-th power of the sum of the absolute values of a vector in the power of `p`.
**Syntax**
```sql
LpNorm(vector, p)
```
Alias: `normLp`.
**Arguments**
- `vector` — [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
- `p` — The power. Possible values: real number in `[1; inf)`. [UInt](../../sql-reference/data-types/int-uint.md) or [Float](../../sql-reference/data-types/float.md).
**Returned value**
- [Lp-norm](https://en.wikipedia.org/wiki/Norm_(mathematics)#p-norm)
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LpNorm((1, -2), 2);
```
Result:
```text
┌─LpNorm((1, -2), 2)─┐
│ 2.23606797749979 │
└────────────────────┘
```
## L1Distance
Calculates the distance between two points (the values of the vectors are the coordinates) in `L1` space (1-norm ([taxicab geometry](https://en.wikipedia.org/wiki/Taxicab_geometry) distance)).
**Syntax**
```sql
L1Distance(vector1, vector2)
```
Alias: `distanceL1`.
**Arguments**
- `vector1` — First vector. [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
- `vector2` — Second vector. [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
**Returned value**
- 1-norm distance.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT L1Distance((1, 2), (2, 3));
```
Result:
```text
┌─L1Distance((1, 2), (2, 3))─┐
│ 2 │
└────────────────────────────┘
```
## L2Distance
Calculates the distance between two points (the values of the vectors are the coordinates) in Euclidean space ([Euclidean distance](https://en.wikipedia.org/wiki/Euclidean_distance)).
**Syntax**
```sql
L2Distance(vector1, vector2)
```
Alias: `distanceL2`.
**Arguments**
- `vector1` — First vector. [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
- `vector2` — Second vector. [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
**Returned value**
- 2-norm distance.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT L2Distance((1, 2), (2, 3));
```
Result:
```text
┌─L2Distance((1, 2), (2, 3))─┐
│ 1.4142135623730951 │
└────────────────────────────┘
```
## LinfDistance
Calculates the distance between two points (the values of the vectors are the coordinates) in `L_{inf}` space ([maximum norm](https://en.wikipedia.org/wiki/Norm_(mathematics)#Maximum_norm_(special_case_of:_infinity_norm,_uniform_norm,_or_supremum_norm))).
**Syntax**
```sql
LinfDistance(vector1, vector2)
```
Alias: `distanceLinf`.
**Arguments**
- `vector1` — First vector. [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
- `vector1` — Second vector. [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
**Returned value**
- Infinity-norm distance.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LinfDistance((1, 2), (2, 3));
```
Result:
```text
┌─LinfDistance((1, 2), (2, 3))─┐
│ 1 │
└──────────────────────────────┘
```
## LpDistance
Calculates the distance between two points (the values of the vectors are the coordinates) in `Lp` space ([p-norm distance](https://en.wikipedia.org/wiki/Norm_(mathematics)#p-norm)).
**Syntax**
```sql
LpDistance(vector1, vector2, p)
```
Alias: `distanceLp`.
**Arguments**
- `vector1` — First vector. [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
- `vector2` — Second vector. [Tuple](../../sql-reference/data-types/tuple.md) or [Array](../../sql-reference/data-types/array.md).
- `p` — The power. Possible values: real number from `[1; inf)`. [UInt](../../sql-reference/data-types/int-uint.md) or [Float](../../sql-reference/data-types/float.md).
**Returned value**
- p-norm distance.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LpDistance((1, 2), (2, 3), 3);
```
Result:
```text
┌─LpDistance((1, 2), (2, 3), 3)─┐
│ 1.2599210498948732 │
└───────────────────────────────┘
```
## L1Normalize
Calculates the unit vector of a given vector (the values of the tuple are the coordinates) in `L1` space ([taxicab geometry](https://en.wikipedia.org/wiki/Taxicab_geometry)).
**Syntax**
```sql
L1Normalize(tuple)
```
Alias: `normalizeL1`.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- Unit vector.
Type: [Tuple](../../sql-reference/data-types/tuple.md) of [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT L1Normalize((1, 2));
```
Result:
```text
┌─L1Normalize((1, 2))─────────────────────┐
│ (0.3333333333333333,0.6666666666666666) │
└─────────────────────────────────────────┘
```
## L2Normalize
Calculates the unit vector of a given vector (the values of the tuple are the coordinates) in Euclidean space (using [Euclidean distance](https://en.wikipedia.org/wiki/Euclidean_distance)).
**Syntax**
```sql
L2Normalize(tuple)
```
Alias: `normalizeL1`.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- Unit vector.
Type: [Tuple](../../sql-reference/data-types/tuple.md) of [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT L2Normalize((3, 4));
```
Result:
```text
┌─L2Normalize((3, 4))─┐
│ (0.6,0.8) │
└─────────────────────┘
```
## LinfNormalize
Calculates the unit vector of a given vector (the values of the tuple are the coordinates) in `L_{inf}` space (using [maximum norm](https://en.wikipedia.org/wiki/Norm_(mathematics)#Maximum_norm_(special_case_of:_infinity_norm,_uniform_norm,_or_supremum_norm))).
**Syntax**
```sql
LinfNormalize(tuple)
```
Alias: `normalizeLinf `.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- Unit vector.
Type: [Tuple](../../sql-reference/data-types/tuple.md) of [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LinfNormalize((3, 4));
```
Result:
```text
┌─LinfNormalize((3, 4))─┐
│ (0.75,1) │
└───────────────────────┘
```
## LpNormalize
Calculates the unit vector of a given vector (the values of the tuple are the coordinates) in `Lp` space (using [p-norm](https://en.wikipedia.org/wiki/Norm_(mathematics)#p-norm)).
**Syntax**
```sql
LpNormalize(tuple, p)
```
Alias: `normalizeLp `.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
- `p` — The power. Possible values: any number from [1;inf). [UInt](../../sql-reference/data-types/int-uint.md) or [Float](../../sql-reference/data-types/float.md).
**Returned value**
- Unit vector.
Type: [Tuple](../../sql-reference/data-types/tuple.md) of [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LpNormalize((3, 4),5);
```
Result:
```text
┌─LpNormalize((3, 4), 5)──────────────────┐
│ (0.7187302630182624,0.9583070173576831) │
└─────────────────────────────────────────┘
```
## cosineDistance
Calculates the cosine distance between two vectors (the values of the tuples are the coordinates). The less the returned value is, the more similar are the vectors.
**Syntax**
```sql
cosineDistance(tuple1, tuple2)
```
**Arguments**
- `tuple1` — First tuple. [Tuple](../../sql-reference/data-types/tuple.md).
- `tuple2` — Second tuple. [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- Cosine of the angle between two vectors substracted from one.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT cosineDistance((1, 2), (2, 3));
```
Result:
```text
┌─cosineDistance((1, 2), (2, 3))─┐
│ 0.007722123286332261 │
└────────────────────────────────┘
```

View File

@ -93,3 +93,35 @@ Predefined characters: `\0`, `\\`, `|`, `(`, `)`, `^`, `$`, `.`, `[`, `]`, `?`,
This implementation slightly differs from re2::RE2::QuoteMeta. It escapes zero byte as `\0` instead of `\x00` and it escapes only required characters.
For more information, see the link: [RE2](https://github.com/google/re2/blob/master/re2/re2.cc#L473)
## translate(s, from, to)
The function replaces characters in the string s in accordance with one-to-one character mapping defined by from and to strings. from and to must be ASCII strings of the same size. Non-ASCII characters in the original string are not modified.
Example:
``` sql
SELECT translate('Hello, World!', 'delor', 'DELOR') AS res
```
``` text
┌─res───────────┐
│ HELLO, WORLD! │
└───────────────┘
```
## translateUTF8(string, from, to)
Similar to previous function, but works with UTF-8 arguments. from and to must be valid UTF-8 strings of the same size.
Example:
``` sql
SELECT translateUTF8('Hélló, Wórld¡', 'óé¡', 'oe!') AS res
```
``` text
┌─res───────────┐
│ Hello, World! │
└───────────────┘
```

View File

@ -374,7 +374,7 @@ The same as `multiMatchAny`, but returns any index that matches the haystack.
## multiMatchAllIndices(haystack, \[pattern<sub>1</sub>, pattern<sub>2</sub>, …, pattern<sub>n</sub>\])
The same as `multiMatchAny`, but returns the array of all indicies that match the haystack in any order.
The same as `multiMatchAny`, but returns the array of all indices that match the haystack in any order.
## multiFuzzyMatchAny(haystack, distance, \[pattern<sub>1</sub>, pattern<sub>2</sub>, …, pattern<sub>n</sub>\])

View File

@ -559,502 +559,7 @@ Result:
└────────────────────────────┘
```
## L1Norm
Calculates the sum of absolute values of a tuple.
## Distance functions
**Syntax**
```sql
L1Norm(tuple)
```
Alias: `normL1`.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- L1-norm or [taxicab geometry](https://en.wikipedia.org/wiki/Taxicab_geometry) distance.
Type: [UInt](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Decimal](../../sql-reference/data-types/decimal.md).
**Example**
Query:
```sql
SELECT L1Norm((1, 2));
```
Result:
```text
┌─L1Norm((1, 2))─┐
│ 3 │
└────────────────┘
```
## L2Norm
Calculates the square root of the sum of the squares of the tuple values.
**Syntax**
```sql
L2Norm(tuple)
```
Alias: `normL2`.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- L2-norm or [Euclidean distance](https://en.wikipedia.org/wiki/Euclidean_distance).
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT L2Norm((1, 2));
```
Result:
```text
┌───L2Norm((1, 2))─┐
│ 2.23606797749979 │
└──────────────────┘
```
## LinfNorm
Calculates the maximum of absolute values of a tuple.
**Syntax**
```sql
LinfNorm(tuple)
```
Alias: `normLinf`.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- Linf-norm or the maximum absolute value.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LinfNorm((1, -2));
```
Result:
```text
┌─LinfNorm((1, -2))─┐
│ 2 │
└───────────────────┘
```
## LpNorm
Calculates the root of `p`-th power of the sum of the absolute values of a tuple in the power of `p`.
**Syntax**
```sql
LpNorm(tuple, p)
```
Alias: `normLp`.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
- `p` — The power. Possible values: real number in `[1; inf)`. [UInt](../../sql-reference/data-types/int-uint.md) or [Float](../../sql-reference/data-types/float.md).
**Returned value**
- [Lp-norm](https://en.wikipedia.org/wiki/Norm_(mathematics)#p-norm)
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LpNorm((1, -2), 2);
```
Result:
```text
┌─LpNorm((1, -2), 2)─┐
│ 2.23606797749979 │
└────────────────────┘
```
## L1Distance
Calculates the distance between two points (the values of the tuples are the coordinates) in `L1` space (1-norm ([taxicab geometry](https://en.wikipedia.org/wiki/Taxicab_geometry) distance)).
**Syntax**
```sql
L1Distance(tuple1, tuple2)
```
Alias: `distanceL1`.
**Arguments**
- `tuple1` — First tuple. [Tuple](../../sql-reference/data-types/tuple.md).
- `tuple1` — Second tuple. [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- 1-norm distance.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT L1Distance((1, 2), (2, 3));
```
Result:
```text
┌─L1Distance((1, 2), (2, 3))─┐
│ 2 │
└────────────────────────────┘
```
## L2Distance
Calculates the distance between two points (the values of the tuples are the coordinates) in Euclidean space ([Euclidean distance](https://en.wikipedia.org/wiki/Euclidean_distance)).
**Syntax**
```sql
L2Distance(tuple1, tuple2)
```
Alias: `distanceL2`.
**Arguments**
- `tuple1` — First tuple. [Tuple](../../sql-reference/data-types/tuple.md).
- `tuple1` — Second tuple. [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- 2-norm distance.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT L2Distance((1, 2), (2, 3));
```
Result:
```text
┌─L2Distance((1, 2), (2, 3))─┐
│ 1.4142135623730951 │
└────────────────────────────┘
```
## LinfDistance
Calculates the distance between two points (the values of the tuples are the coordinates) in `L_{inf}` space ([maximum norm](https://en.wikipedia.org/wiki/Norm_(mathematics)#Maximum_norm_(special_case_of:_infinity_norm,_uniform_norm,_or_supremum_norm))).
**Syntax**
```sql
LinfDistance(tuple1, tuple2)
```
Alias: `distanceLinf`.
**Arguments**
- `tuple1` — First tuple. [Tuple](../../sql-reference/data-types/tuple.md).
- `tuple1` — Second tuple. [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- Infinity-norm distance.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LinfDistance((1, 2), (2, 3));
```
Result:
```text
┌─LinfDistance((1, 2), (2, 3))─┐
│ 1 │
└──────────────────────────────┘
```
## LpDistance
Calculates the distance between two points (the values of the tuples are the coordinates) in `Lp` space ([p-norm distance](https://en.wikipedia.org/wiki/Norm_(mathematics)#p-norm)).
**Syntax**
```sql
LpDistance(tuple1, tuple2, p)
```
Alias: `distanceLp`.
**Arguments**
- `tuple1` — First tuple. [Tuple](../../sql-reference/data-types/tuple.md).
- `tuple1` — Second tuple. [Tuple](../../sql-reference/data-types/tuple.md).
- `p` — The power. Possible values: real number from `[1; inf)`. [UInt](../../sql-reference/data-types/int-uint.md) or [Float](../../sql-reference/data-types/float.md).
**Returned value**
- p-norm distance.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LpDistance((1, 2), (2, 3), 3);
```
Result:
```text
┌─LpDistance((1, 2), (2, 3), 3)─┐
│ 1.2599210498948732 │
└───────────────────────────────┘
```
## L1Normalize
Calculates the unit vector of a given vector (the values of the tuple are the coordinates) in `L1` space ([taxicab geometry](https://en.wikipedia.org/wiki/Taxicab_geometry)).
**Syntax**
```sql
L1Normalize(tuple)
```
Alias: `normalizeL1`.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- Unit vector.
Type: [Tuple](../../sql-reference/data-types/tuple.md) of [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT L1Normalize((1, 2));
```
Result:
```text
┌─L1Normalize((1, 2))─────────────────────┐
│ (0.3333333333333333,0.6666666666666666) │
└─────────────────────────────────────────┘
```
## L2Normalize
Calculates the unit vector of a given vector (the values of the tuple are the coordinates) in Euclidean space (using [Euclidean distance](https://en.wikipedia.org/wiki/Euclidean_distance)).
**Syntax**
```sql
L2Normalize(tuple)
```
Alias: `normalizeL1`.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- Unit vector.
Type: [Tuple](../../sql-reference/data-types/tuple.md) of [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT L2Normalize((3, 4));
```
Result:
```text
┌─L2Normalize((3, 4))─┐
│ (0.6,0.8) │
└─────────────────────┘
```
## LinfNormalize
Calculates the unit vector of a given vector (the values of the tuple are the coordinates) in `L_{inf}` space (using [maximum norm](https://en.wikipedia.org/wiki/Norm_(mathematics)#Maximum_norm_(special_case_of:_infinity_norm,_uniform_norm,_or_supremum_norm))).
**Syntax**
```sql
LinfNormalize(tuple)
```
Alias: `normalizeLinf `.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- Unit vector.
Type: [Tuple](../../sql-reference/data-types/tuple.md) of [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LinfNormalize((3, 4));
```
Result:
```text
┌─LinfNormalize((3, 4))─┐
│ (0.75,1) │
└───────────────────────┘
```
## LpNormalize
Calculates the unit vector of a given vector (the values of the tuple are the coordinates) in `Lp` space (using [p-norm](https://en.wikipedia.org/wiki/Norm_(mathematics)#p-norm)).
**Syntax**
```sql
LpNormalize(tuple, p)
```
Alias: `normalizeLp `.
**Arguments**
- `tuple` — [Tuple](../../sql-reference/data-types/tuple.md).
- `p` — The power. Possible values: any number from [1;inf). [UInt](../../sql-reference/data-types/int-uint.md) or [Float](../../sql-reference/data-types/float.md).
**Returned value**
- Unit vector.
Type: [Tuple](../../sql-reference/data-types/tuple.md) of [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT LpNormalize((3, 4),5);
```
Result:
```text
┌─LpNormalize((3, 4), 5)──────────────────┐
│ (0.7187302630182624,0.9583070173576831) │
└─────────────────────────────────────────┘
```
## cosineDistance
Calculates the cosine distance between two vectors (the values of the tuples are the coordinates). The less the returned value is, the more similar are the vectors.
**Syntax**
```sql
cosineDistance(tuple1, tuple2)
```
**Arguments**
- `tuple1` — First tuple. [Tuple](../../sql-reference/data-types/tuple.md).
- `tuple2` — Second tuple. [Tuple](../../sql-reference/data-types/tuple.md).
**Returned value**
- Cosine of the angle between two vectors substracted from one.
Type: [Float](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT cosineDistance((1, 2), (2, 3));
```
Result:
```text
┌─cosineDistance((1, 2), (2, 3))─┐
│ 0.007722123286332261 │
└────────────────────────────────┘
```
All supported functions are described in [distance functions documentation](../../sql-reference/functions/distance-functions.md).

View File

@ -5,7 +5,7 @@ sidebar_label: null function
# null
Creates a temporary table of the specified structure with the [Null](../../engines/table-engines/special/null.md) table engine. According to the `Null`-engine properties, the table data is ignored and the table itself is immediately droped right after the query execution. The function is used for the convenience of test writing and demonstrations.
Creates a temporary table of the specified structure with the [Null](../../engines/table-engines/special/null.md) table engine. According to the `Null`-engine properties, the table data is ignored and the table itself is immediately dropped right after the query execution. The function is used for the convenience of test writing and demonstrations.
**Syntax**
@ -40,4 +40,3 @@ See also:
- [Null table engine](../../engines/table-engines/special/null.md)
[Original article](https://clickhouse.com/docs/en/sql-reference/table-functions/null/) <!--hide-->

View File

@ -5,11 +5,11 @@ sidebar_label: remote
# remote, remoteSecure
Allows to access remote servers without creating a [Distributed](../../engines/table-engines/special/distributed.md) table. `remoteSecure` - same as `remote` but with a secured connection.
Allows accessing remote servers, including migration of data, without creating a [Distributed](../../engines/table-engines/special/distributed.md) table. `remoteSecure` - same as `remote` but with a secured connection.
Both functions can be used in `SELECT` and `INSERT` queries.
**Syntax**
## Syntax
``` sql
remote('addresses_expr', db, table[, 'user'[, 'password'], sharding_key])
@ -18,7 +18,7 @@ remoteSecure('addresses_expr', db, table[, 'user'[, 'password'], sharding_key])
remoteSecure('addresses_expr', db.table[, 'user'[, 'password'], sharding_key])
```
**Parameters**
## Parameters
- `addresses_expr` — An expression that generates addresses of remote servers. This may be just one server address. The server address is `host:port`, or just `host`.
@ -36,28 +36,31 @@ remoteSecure('addresses_expr', db.table[, 'user'[, 'password'], sharding_key])
- `password` — User password. If the password is not specified, an empty password is used. Type: [String](../../sql-reference/data-types/string.md).
- `sharding_key` — Sharding key to support distributing data across nodes. For example: `insert into remote('127.0.0.1:9000,127.0.0.2', db, table, 'default', rand())`. Type: [UInt32](../../sql-reference/data-types/int-uint.md).
**Returned value**
## Returned value
The dataset from remote servers.
**Usage**
## Usage
Using the `remote` table function is less optimal than creating a `Distributed` table because in this case the server connection is re-established for every request. Also, if hostnames are set, the names are resolved, and errors are not counted when working with various replicas. When processing a large number of queries, always create the `Distributed` table ahead of time, and do not use the `remote` table function.
Unless you are migrating data from one system to another, using the `remote` table function is less optimal than creating a `Distributed` table because in this case the server connection is re-established for every request. Also, if hostnames are set, the names are resolved, and errors are not counted when working with various replicas. When processing a large number of queries, always create the `Distributed` table ahead of time, and do not use the `remote` table function.
The `remote` table function can be useful in the following cases:
- Accessing a specific server for data comparison, debugging, and testing.
- Queries between various ClickHouse clusters for research purposes.
- Infrequent distributed requests that are made manually.
- Distributed requests where the set of servers is re-defined each time.
- Migrating data from one system to another
- Accessing a specific server for data comparison, debugging, and testing.
- Queries between various ClickHouse clusters for research purposes.
- Infrequent distributed requests that are made manually.
- Distributed requests where the set of servers is re-defined each time.
**Adresses**
### Adresses
``` text
example01-01-1
example01-01-1:9440
example01-01-1:9000
localhost
127.0.0.1
[::]:9440
[::]:9000
[2a02:6b8:0:1111::11]:9000
```
@ -68,15 +71,15 @@ Multiple addresses can be comma-separated. In this case, ClickHouse will use dis
example01-01-1,example01-02-1
```
**Examples**
## Examples
Selecting data from a remote server:
### Selecting data from a remote server:
``` sql
SELECT * FROM remote('127.0.0.1', db.remote_engine_table) LIMIT 3;
```
Inserting data from a remote server into a table:
### Inserting data from a remote server into a table:
``` sql
CREATE TABLE remote_table (name String, value UInt32) ENGINE=Memory;
@ -84,7 +87,65 @@ INSERT INTO FUNCTION remote('127.0.0.1', currentDatabase(), 'remote_table') VALU
SELECT * FROM remote_table;
```
## Globs in Addresses {globs-in-addresses}
### Migration of tables from one system to another:
This example uses one table from a sample dataset. The database is `imdb`, and the table is `actors`.
#### On the source ClickHouse system (the system that currently hosts the data)
- Verify the source database and table name (`imdb.actors`)
```sql
show databases
```
```sql
show tables in imdb
```
- Get the CREATE TABLE statement from the source:
```
select create_table_query
from system.tables
where database = 'imdb' and table = 'actors'
```
Response
```sql
CREATE TABLE imdb.actors (`id` UInt32,
`first_name` String,
`last_name` String,
`gender` FixedString(1))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}')
ORDER BY (id, first_name, last_name, gender)
SETTINGS index_granularity = 8192
```
#### On the destination ClickHouse system:
- Create the destination database:
```sql
CREATE DATABASE imdb
```
- Using the CREATE TABLE statement from the source, create the destination:
```sql
CREATE TABLE imdb.actors (`id` UInt32,
`first_name` String,
`last_name` String,
`gender` FixedString(1))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}')
ORDER BY (id, first_name, last_name, gender)
SETTINGS index_granularity = 8192
```
#### Back on the source deployment:
Insert into the new database and table created on the remote system. You will need the host, port, username, password, destination database, and destination table.
```sql
INSERT INTO FUNCTION
remoteSecure('remote.clickhouse.cloud:9440', 'imdb.actors', 'USER', 'PASSWORD', rand())
SELECT * from imdb.actors
```
## Globs in Addresses {#globs-in-addresses}
Patterns in curly brackets `{ }` are used to generate a set of shards and to specify replicas. If there are multiple pairs of curly brackets, then the direct product of the corresponding sets is generated.
The following pattern types are supported.

View File

@ -335,7 +335,7 @@ SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
Поддерживаемые типы данных: `Int*`, `UInt*`, `Float*`, `Enum`, `Date`, `DateTime`, `String`, `FixedString`.
Фильтром могут пользоваться функции: [equals](../../../sql-reference/functions/comparison-functions.md), [notEquals](../../../sql-reference/functions/comparison-functions.md), [in](../../../sql-reference/functions/in-functions), [notIn](../../../sql-reference/functions/in-functions), [has](../../../sql-reference/functions/array-functions#hasarr-elem).
Фильтром могут пользоваться функции: [equals](../../../sql-reference/functions/comparison-functions.md), [notEquals](../../../sql-reference/functions/comparison-functions.md), [in](../../../sql-reference/functions/in-functions), [notIn](../../../sql-reference/functions/in-functions), [has](../../../sql-reference/functions/array-functions#hasarr-elem), [hasAny](../../../sql-reference/functions/array-functions#hasany), [hasAll](../../../sql-reference/functions/array-functions#hasall).
**Примеры**

View File

@ -14,6 +14,15 @@
- `REMOVE_PART` — удаление или отсоединение из таблицы с помощью [DETACH PARTITION](../../sql-reference/statements/alter/partition.md#alter_detach-partition).
- `MUTATE_PART` — изменение куска.
- `MOVE_PART` — перемещение куска между дисками.
- `merge_reason` ([Enum8](../../sql-reference/data-types/enum.md)) — Причина события с типом `MERGE_PARTS`. Может принимать одно из следующих значений:
- `NOT_A_MERGE` — событие имеет тип иной, чем `MERGE_PARTS`.
- `REGULAR_MERGE` — обычное слияние.
- `TTL_DELETE_MERGE` — очистка истекших данных.
- `TTL_RECOMPRESS_MERGE` — переупаковка куска.
- `merge_algorithm` ([Enum8](../../sql-reference/data-types/enum.md)) — Алгоритм слияния для события с типом `MERGE_PARTS`. Может принимать одно из следующих значений:
- `UNDECIDED`
- `HORIZONTAL`
- `VERTICAL`
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — дата события.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — время события.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — время события с точностью до микросекунд.
@ -46,6 +55,8 @@ Row 1:
──────
query_id: 983ad9c7-28d5-4ae1-844e-603116b7de31
event_type: NewPart
merge_reason: NotAMerge
merge_algorithm: Undecided
event_date: 2021-02-02
event_time: 2021-02-02 11:14:28
event_time_microseconds: 2021-02-02 11:14:28.861919

View File

@ -83,3 +83,34 @@ SELECT replaceRegexpAll('Hello, World!', '^', 'here: ') AS res
└─────────────────────┘
```
## translate(s, from, to)
Данная функция заменяет символы в строке s в соответствии с поэлементным отображением определяемым строками from и to. from и to должны быть корректными ASCII строками одного размера. Не ASCII символы в оригинальной строке не изменяются.
Example:
``` sql
SELECT translate('Hello, World!', 'delor', 'DELOR') AS res
```
``` text
┌─res───────────┐
│ HELLO, WORLD! │
└───────────────┘
```
## translateUTF8(string, from, to)
Аналогично предыдущей функции, но работает со строками, состоящими из UTF-8 символов. from и to должны быть корректными UTF-8 строками одного размера.
Example:
``` sql
SELECT translateUTF8('Hélló, Wórld¡', 'óé¡', 'oe!') AS res
```
``` text
┌─res───────────┐
│ Hello, World! │
└───────────────┘
```

View File

@ -401,11 +401,11 @@ dbms_target_link_libraries (
clickhouse_parsers
ch_contrib::lz4
Poco::JSON
Poco::MongoDB
string_utils
PUBLIC
boost::system
clickhouse_common_io
Poco::MongoDB
)
if (TARGET ch::mysqlxx)

View File

@ -162,20 +162,19 @@ public:
template <typename Function, typename... Args>
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
: state(std::make_shared<Poco::Event>())
, thread_id(std::make_shared<std::thread::id>())
: state(std::make_shared<State>())
{
/// NOTE: If this will throw an exception, the destructor won't be called.
/// NOTE:
/// - If this will throw an exception, the destructor won't be called
/// - this pointer cannot be passed in the lambda, since after detach() it will not be valid
GlobalThreadPool::instance().scheduleOrThrow([
thread_id = thread_id,
state = state,
func = std::forward<Function>(func),
args = std::make_tuple(std::forward<Args>(args)...)]() mutable /// mutable is needed to destroy capture
{
auto event = std::move(state);
SCOPE_EXIT(event->set());
SCOPE_EXIT(state->event.set());
thread_id = std::make_shared<std::thread::id>(std::this_thread::get_id());
state->thread_id = std::this_thread::get_id();
/// This moves are needed to destroy function and arguments before exit.
/// It will guarantee that after ThreadFromGlobalPool::join all captured params are destroyed.
@ -196,31 +195,30 @@ public:
ThreadFromGlobalPool & operator=(ThreadFromGlobalPool && rhs) noexcept
{
if (joinable())
if (initialized())
abort();
state = std::move(rhs.state);
thread_id = std::move(rhs.thread_id);
return *this;
}
~ThreadFromGlobalPool()
{
if (joinable())
if (initialized())
abort();
}
void join()
{
if (!joinable())
if (!initialized())
abort();
state->wait();
state->event.wait();
state.reset();
}
void detach()
{
if (!joinable())
if (!initialized())
abort();
state.reset();
}
@ -230,15 +228,30 @@ public:
if (!state)
return false;
/// Thread cannot join itself.
if (*thread_id == std::this_thread::get_id())
if (state->thread_id == std::this_thread::get_id())
return false;
return true;
}
private:
/// The state used in this object and inside the thread job.
std::shared_ptr<Poco::Event> state;
std::shared_ptr<std::thread::id> thread_id;
struct State
{
/// Should be atomic() because of possible concurrent access between
/// assignment and joinable() check.
std::atomic<std::thread::id> thread_id;
/// The state used in this object and inside the thread job.
Poco::Event event;
};
std::shared_ptr<State> state;
/// Internally initialized() should be used over joinable(),
/// since it is enough to know that the thread is initialized,
/// and ignore that fact that thread cannot join itself.
bool initialized() const
{
return static_cast<bool>(state);
}
};

View File

@ -12,8 +12,8 @@ struct Exception : public Poco::Exception
{
explicit Exception(const std::string & msg, int code = 0) : Poco::Exception(msg, code) {}
int errnum() const { return code(); }
const char * name() const throw() override { return "mysqlxx::Exception"; }
const char * className() const throw() override { return "mysqlxx::Exception"; }
const char * name() const noexcept override { return "mysqlxx::Exception"; }
const char * className() const noexcept override { return "mysqlxx::Exception"; }
};
@ -21,8 +21,8 @@ struct Exception : public Poco::Exception
struct ConnectionFailed : public Exception
{
explicit ConnectionFailed(const std::string & msg, int code = 0) : Exception(msg, code) {}
const char * name() const throw() override { return "mysqlxx::ConnectionFailed"; }
const char * className() const throw() override { return "mysqlxx::ConnectionFailed"; }
const char * name() const noexcept override { return "mysqlxx::ConnectionFailed"; }
const char * className() const noexcept override { return "mysqlxx::ConnectionFailed"; }
};
@ -30,8 +30,8 @@ struct ConnectionFailed : public Exception
struct ConnectionLost : public Exception
{
explicit ConnectionLost(const std::string & msg, int code = 0) : Exception(msg, code) {}
const char * name() const throw() override { return "mysqlxx::ConnectionLost"; }
const char * className() const throw() override { return "mysqlxx::ConnectionLost"; }
const char * name() const noexcept override { return "mysqlxx::ConnectionLost"; }
const char * className() const noexcept override { return "mysqlxx::ConnectionLost"; }
};
@ -39,8 +39,8 @@ struct ConnectionLost : public Exception
struct BadQuery : public Exception
{
explicit BadQuery(const std::string & msg, int code = 0) : Exception(msg, code) {}
const char * name() const throw() override { return "mysqlxx::BadQuery"; }
const char * className() const throw() override { return "mysqlxx::BadQuery"; }
const char * name() const noexcept override { return "mysqlxx::BadQuery"; }
const char * className() const noexcept override { return "mysqlxx::BadQuery"; }
};
@ -48,8 +48,8 @@ struct BadQuery : public Exception
struct CannotParseValue : public Exception
{
explicit CannotParseValue(const std::string & msg, int code = 0) : Exception(msg, code) {}
const char * name() const throw() override { return "mysqlxx::CannotParseValue"; }
const char * className() const throw() override { return "mysqlxx::CannotParseValue"; }
const char * name() const noexcept override { return "mysqlxx::CannotParseValue"; }
const char * className() const noexcept override { return "mysqlxx::CannotParseValue"; }
};

View File

@ -32,7 +32,7 @@ std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_glob
}
std::string escaped_with_globs = buf_for_escaping.str();
static const re2::RE2 enum_or_range(R"({([\d]+\.\.[\d]+|[^{}*,]+,[^{}*]*[^{}*,])})"); /// regexp for {expr1,expr2,expr3} or {M..N}, where M and N - non-negative integers, expr's should be without {}*,
static const re2::RE2 enum_or_range(R"({([\d]+\.\.[\d]+|[^{}*,]+,[^{}*]*[^{}*,])})"); /// regexp for {expr1,expr2,expr3} or {M..N}, where M and N - non-negative integers, expr's should be without "{", "}", "*" and ","
re2::StringPiece input(escaped_with_globs);
re2::StringPiece matched;
std::ostringstream oss_for_replacing; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
@ -50,16 +50,32 @@ std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_glob
char point;
ReadBufferFromString buf_range(buffer);
buf_range >> range_begin >> point >> point >> range_end;
size_t range_begin_width = buffer.find('.');
size_t range_end_width = buffer.size() - buffer.find_last_of('.') - 1;
bool leading_zeros = buffer[0] == '0';
size_t num_len = std::to_string(range_end).size();
size_t output_width = 0;
if (range_begin > range_end) //Descending Sequence {20..15} {9..01}
{
std::swap(range_begin,range_end);
leading_zeros = buffer[buffer.find_last_of('.')+1]=='0';
std::swap(range_begin_width,range_end_width);
}
if (range_begin_width == 1 && leading_zeros)
output_width = 1; ///Special Case: {0..10} {0..999}
else
output_width = std::max(range_begin_width, range_end_width);
if (leading_zeros)
oss_for_replacing << std::setfill('0') << std::setw(num_len);
oss_for_replacing << std::setfill('0') << std::setw(output_width);
oss_for_replacing << range_begin;
for (size_t i = range_begin + 1; i <= range_end; ++i)
{
oss_for_replacing << '|';
if (leading_zeros)
oss_for_replacing << std::setfill('0') << std::setw(num_len);
oss_for_replacing << std::setfill('0') << std::setw(output_width);
oss_for_replacing << i;
}
}

View File

@ -8,14 +8,36 @@ using namespace DB;
TEST(Common, makeRegexpPatternFromGlobs)
{
EXPECT_EQ(makeRegexpPatternFromGlobs("?"), "[^/]");
EXPECT_EQ(makeRegexpPatternFromGlobs("*"), "[^/]*");
EXPECT_EQ(makeRegexpPatternFromGlobs("/?"), "/[^/]");
EXPECT_EQ(makeRegexpPatternFromGlobs("/*"), "/[^/]*");
EXPECT_EQ(makeRegexpPatternFromGlobs("*_{{a,b,c,d}}/?.csv"), "[^/]*_\\{(a|b|c|d)\\}/[^/]\\.csv");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{01..09}"), "f(1|2|3|4|5|6|7|8|9)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{01..9}"), "f(1|2|3|4|5|6|7|8|9)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{0001..0000009}"), "f(1|2|3|4|5|6|7|8|9)");
/* Regex Parsing for {..} can have three possible cases
1) The left range width == the right range width
2) The left range width > the right range width
3) The left range width < the right range width
*/
// Ascending Sequences
EXPECT_EQ(makeRegexpPatternFromGlobs("f{1..9}"), "f(1|2|3|4|5|6|7|8|9)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{0..10}"), "f(0|1|2|3|4|5|6|7|8|9|10)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{10..20}"), "f(10|11|12|13|14|15|16|17|18|19|20)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{00..10}"), "f(00|01|02|03|04|05|06|07|08|09|10)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{0001..0009}"), "f(0001|0002|0003|0004|0005|0006|0007|0008|0009)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{01..9}"), "f(01|02|03|04|05|06|07|08|09)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{000..9}"), "f(000|001|002|003|004|005|006|007|008|009)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{95..103}"), "f(95|96|97|98|99|100|101|102|103)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{99..109}"), "f(99|100|101|102|103|104|105|106|107|108|109)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{001..0009}"), "f(0001|0002|0003|0004|0005|0006|0007|0008|0009)");
// Descending Sequences
EXPECT_EQ(makeRegexpPatternFromGlobs("f{20..15}"), "f(15|16|17|18|19|20)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{200..199}"), "f(199|200)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{0009..0001}"), "f(0001|0002|0003|0004|0005|0006|0007|0008|0009)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{100..90}"), "f(90|91|92|93|94|95|96|97|98|99|100)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{103..95}"), "f(95|96|97|98|99|100|101|102|103)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{9..01}"), "f(01|02|03|04|05|06|07|08|09)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{9..000}"), "f(000|001|002|003|004|005|006|007|008|009)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{1..2}{1..2}"), "f(1|2)(1|2)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{1..1}{1..1}"), "f(1)(1)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{0..0}{0..0}"), "f(0)(0)");

View File

@ -286,6 +286,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, metrics_perf_events_enabled, false, "If enabled, some of the perf events will be measured throughout queries' execution.", 0) \
M(String, metrics_perf_events_list, "", "Comma separated list of perf metrics that will be measured throughout queries' execution. Empty means all events. See PerfEventInfo in sources for the available events.", 0) \
M(Float, opentelemetry_start_trace_probability, 0., "Probability to start an OpenTelemetry trace for an incoming query.", 0) \
M(Bool, opentelemetry_trace_processors, false, "Collect OpenTelemetry spans for processors.", 0) \
M(Bool, prefer_column_name_to_alias, false, "Prefer using column names instead of aliases if possible.", 0) \
M(Bool, prefer_global_in_and_join, false, "If enabled, all IN/JOIN operators will be rewritten as GLOBAL IN/JOIN. It's useful when the to-be-joined tables are only available on the initiator and we need to always scatter their data on-the-fly during distributed processing with the GLOBAL keyword. It's also useful to reduce the need to access the external sources joining external tables.", 0) \
\
@ -391,6 +392,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, log_query_views, true, "Log query dependent views into system.query_views_log table. This setting have effect only when 'log_queries' is true.", 0) \
M(String, log_comment, "", "Log comment into system.query_log table and server log. It can be set to arbitrary string no longer than max_query_size.", 0) \
M(LogsLevel, send_logs_level, LogsLevel::fatal, "Send server text logs with specified minimum level to client. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(String, send_logs_source_regexp, "", "Send server text logs with specified regexp to match log source name. Empty means all sources.", 0) \
M(Bool, enable_optimize_predicate_expression, true, "If it is set to true, optimize predicates to subqueries.", 0) \
M(Bool, enable_optimize_predicate_expression_to_final_subquery, true, "Allow push predicate to final subquery.", 0) \
M(Bool, allow_push_predicate_when_subquery_contains_with, true, "Allows push predicate when subquery contains WITH clause", 0) \

View File

@ -567,28 +567,31 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size);
if (file_segment->reserve(current_predownload_size))
bool continue_predownload = file_segment->reserve(current_predownload_size);
if (continue_predownload)
{
LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size);
assert(file_segment->getDownloadOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
Stopwatch watch(CLOCK_MONOTONIC);
bool success = writeCache(implementation_buffer->buffer().begin(), current_predownload_size, current_offset, *file_segment);
if (success)
{
current_offset += current_predownload_size;
file_segment->write(implementation_buffer->buffer().begin(), current_predownload_size, current_offset);
bytes_to_predownload -= current_predownload_size;
implementation_buffer->position() += current_predownload_size;
}
else
{
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
watch.stop();
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, current_predownload_size);
current_offset += current_predownload_size;
bytes_to_predownload -= current_predownload_size;
implementation_buffer->position() += current_predownload_size;
continue_predownload = false;
}
}
else
if (!continue_predownload)
{
/// We were predownloading:
/// segment{1}
@ -691,6 +694,34 @@ bool CachedReadBufferFromRemoteFS::updateImplementationBufferIfNeeded()
return true;
}
bool CachedReadBufferFromRemoteFS::writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment)
{
Stopwatch watch(CLOCK_MONOTONIC);
try
{
file_segment.write(data, size, offset);
}
catch (ErrnoException & e)
{
int code = e.getErrno();
if (code == /* No space left on device */28 || code == /* Quota exceeded */122)
{
LOG_INFO(log, "Insert into cache is skipped due to insufficient disk space. ({})", e.displayText());
return false;
}
throw;
}
watch.stop();
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size);
return true;
}
bool CachedReadBufferFromRemoteFS::nextImpl()
{
try
@ -840,33 +871,34 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
{
assert(file_offset_of_buffer_end + size - 1 <= file_segment->range().right);
if (file_segment->reserve(size))
bool success = file_segment->reserve(size);
if (success)
{
assert(file_segment->getDownloadOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
Stopwatch watch(CLOCK_MONOTONIC);
file_segment->write(
needed_to_predownload ? implementation_buffer->position() : implementation_buffer->buffer().begin(),
size,
file_offset_of_buffer_end);
watch.stop();
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size);
assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1);
assert(
std::next(current_file_segment_it) == file_segments_holder->file_segments.end()
|| file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
success = writeCache(implementation_buffer->position(), size, file_offset_of_buffer_end, *file_segment);
if (success)
{
assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1);
assert(
std::next(current_file_segment_it) == file_segments_holder->file_segments.end()
|| file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
}
else
{
assert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
}
}
else
{
download_current_segment = false;
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_DEBUG(log, "No space left in cache, will continue without cache download");
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
}
if (!success)
{
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
download_current_segment = false;
}
}

View File

@ -73,10 +73,13 @@ private:
SeekableReadBufferPtr getRemoteFSReadBuffer(FileSegmentPtr & file_segment, ReadType read_type_);
size_t getTotalSizeToRead();
bool completeFileSegmentAndGetNext();
void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type);
bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment);
Poco::Logger * log;
IFileCache::Key cache_key;
String remote_fs_object_path;

View File

@ -562,15 +562,23 @@ public:
{
const ColumnMap * col_map_left = typeid_cast<const ColumnMap *>(arguments[0].column.get());
const auto * col_const_map_left = checkAndGetColumnConst<ColumnMap>(arguments[0].column.get());
bool col_const_map_left_flag = false;
if (col_const_map_left)
{
col_const_map_left_flag = true;
col_map_left = typeid_cast<const ColumnMap *>(&col_const_map_left->getDataColumn());
}
if (!col_map_left)
return nullptr;
const ColumnMap * col_map_right = typeid_cast<const ColumnMap *>(arguments[1].column.get());
const auto * col_const_map_right = checkAndGetColumnConst<ColumnMap>(arguments[1].column.get());
bool col_const_map_right_flag = false;
if (col_const_map_right)
{
col_const_map_right_flag = true;
col_map_right = typeid_cast<const ColumnMap *>(&col_const_map_right->getDataColumn());
}
if (!col_map_right)
return nullptr;
@ -592,13 +600,18 @@ public:
MutableColumnPtr offsets = DataTypeNumber<IColumn::Offset>().createColumn();
IColumn::Offset current_offset = 0;
for (size_t idx = 0; idx < input_rows_count; ++idx)
for (size_t row_idx = 0; row_idx < input_rows_count; ++row_idx)
{
for (size_t i = offsets_left[idx - 1]; i < offsets_left[idx]; ++i)
size_t left_it_begin = col_const_map_left_flag ? 0 : offsets_left[row_idx - 1];
size_t left_it_end = col_const_map_left_flag ? offsets_left.size() : offsets_left[row_idx];
size_t right_it_begin = col_const_map_right_flag ? 0 : offsets_right[row_idx - 1];
size_t right_it_end = col_const_map_right_flag ? offsets_right.size() : offsets_right[row_idx];
for (size_t i = left_it_begin; i < left_it_end; ++i)
{
bool matched = false;
auto key = keys_data_left.getDataAt(i);
for (size_t j = offsets_right[idx - 1]; j < offsets_right[idx]; ++j)
for (size_t j = right_it_begin; j < right_it_end; ++j)
{
if (keys_data_right.getDataAt(j).toString() == key.toString())
{
@ -613,12 +626,14 @@ public:
++current_offset;
}
}
for (size_t j = offsets_right[idx - 1]; j < offsets_right[idx]; ++j)
for (size_t j = right_it_begin; j < right_it_end; ++j)
{
keys_data->insertFrom(keys_data_right, j);
values_data->insertFrom(values_data_right, j);
++current_offset;
}
offsets->insert(current_offset);
}

View File

@ -9,6 +9,7 @@ void registerFunctionNotLike(FunctionFactory &);
void registerFunctionNotILike(FunctionFactory &);
void registerFunctionMatch(FunctionFactory &);
void registerFunctionExtract(FunctionFactory &);
void registerFunctionTranslate(FunctionFactory &);
void registerFunctionReplaceOne(FunctionFactory &);
void registerFunctionReplaceAll(FunctionFactory &);
void registerFunctionReplaceRegexpOne(FunctionFactory &);
@ -31,6 +32,7 @@ void registerFunctionsStringRegexp(FunctionFactory & factory)
registerFunctionNotILike(factory);
registerFunctionMatch(factory);
registerFunctionExtract(factory);
registerFunctionTranslate(factory);
registerFunctionReplaceOne(factory);
registerFunctionReplaceAll(factory);
registerFunctionReplaceRegexpOne(factory);

364
src/Functions/translate.cpp Normal file
View File

@ -0,0 +1,364 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/UTF8Helpers.h>
#include <Common/HashTable/HashMap.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
struct TranslateImpl
{
using Map = std::array<UInt8, 128>;
static void fillMapWithValues(
Map & map,
const std::string & map_from,
const std::string & map_to)
{
if (map_from.size() != map_to.size())
throw Exception("Second and trird arguments must be the same length", ErrorCodes::BAD_ARGUMENTS);
std::iota(map.begin(), map.end(), 0);
for (size_t i = 0; i < map_from.size(); ++i)
{
if (!isASCII(map_from[i]) || !isASCII(map_to[i]))
throw Exception("Second and trird arguments must be ASCII strings", ErrorCodes::BAD_ARGUMENTS);
map[map_from[i]] = map_to[i];
}
}
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
const std::string & map_from,
const std::string & map_to,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
Map map;
fillMapWithValues(map, map_from, map_to);
res_data.resize(data.size());
res_offsets.assign(offsets);
UInt8 * dst = res_data.data();
for (UInt64 i = 0; i < offsets.size(); ++i)
{
const UInt8 * src = data.data() + offsets[i - 1];
const UInt8 * src_end = data.data() + offsets[i] - 1;
while (src < src_end)
{
if (*src <= ascii_upper_bound)
*dst = map[*src];
else
*dst = *src;
++src;
++dst;
}
/// Technically '\0' can be mapped into other character,
/// so we need to process '\0' delimiter separately
*dst++ = 0;
}
}
static void vectorFixed(
const ColumnString::Chars & data,
size_t /*n*/,
const std::string & map_from,
const std::string & map_to,
ColumnString::Chars & res_data)
{
std::array<UInt8, 128> map;
fillMapWithValues(map, map_from, map_to);
res_data.resize(data.size());
const UInt8 * src = data.data();
const UInt8 * src_end = data.data() + data.size();
UInt8 * dst = res_data.data();
while (src < src_end)
{
if (*src <= ascii_upper_bound)
*dst = map[*src];
else
*dst = *src;
++src;
++dst;
}
}
private:
static constexpr auto ascii_upper_bound = '\x7f';
};
struct TranslateUTF8Impl
{
using MapASCII = std::array<UInt32, 128>;
using MapUTF8 = HashMap<UInt32, UInt32, HashCRC32<UInt32>>;
static void fillMapWithValues(
MapASCII & map_ascii,
MapUTF8 & map,
const std::string & map_from,
const std::string & map_to)
{
auto map_from_size = UTF8::countCodePoints(reinterpret_cast<const UInt8 *>(map_from.data()), map_from.size());
auto map_to_size = UTF8::countCodePoints(reinterpret_cast<const UInt8 *>(map_to.data()), map_to.size());
if (map_from_size != map_to_size)
throw Exception("Second and trird arguments must be the same length", ErrorCodes::BAD_ARGUMENTS);
std::iota(map_ascii.begin(), map_ascii.end(), 0);
const UInt8 * map_from_ptr = reinterpret_cast<const UInt8 *>(map_from.data());
const UInt8 * map_from_end = map_from_ptr + map_from.size();
const UInt8 * map_to_ptr = reinterpret_cast<const UInt8 *>(map_to.data());
const UInt8 * map_to_end = map_to_ptr + map_to.size();
while (map_from_ptr < map_from_end && map_to_ptr < map_to_end)
{
size_t len_from = UTF8::seqLength(*map_from_ptr);
size_t len_to = UTF8::seqLength(*map_to_ptr);
std::optional<UInt32> res_from, res_to;
if (map_from_ptr + len_from <= map_from_end)
res_from = UTF8::convertUTF8ToCodePoint(map_from_ptr, len_from);
if (map_to_ptr + len_to <= map_to_end)
res_to = UTF8::convertUTF8ToCodePoint(map_to_ptr, len_to);
if (!res_from)
throw Exception("Second argument must be a valid UTF-8 string", ErrorCodes::BAD_ARGUMENTS);
if (!res_to)
throw Exception("Third argument must be a valid UTF-8 string", ErrorCodes::BAD_ARGUMENTS);
if (*map_from_ptr <= ascii_upper_bound)
map_ascii[*map_from_ptr] = *res_to;
else
map[*res_from] = *res_to;
map_from_ptr += len_from;
map_to_ptr += len_to;
}
}
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
const std::string & map_from,
const std::string & map_to,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
MapASCII map_ascii;
MapUTF8 map;
fillMapWithValues(map_ascii, map, map_from, map_to);
res_data.resize(data.size());
res_offsets.resize(offsets.size());
UInt8 * dst = res_data.data();
UInt64 data_size = 0;
for (UInt64 i = 0; i < offsets.size(); ++i)
{
const UInt8 * src = data.data() + offsets[i - 1];
const UInt8 * src_end = data.data() + offsets[i] - 1;
while (src < src_end)
{
/// Maximum length of UTF-8 sequence is 4 bytes + 1 zero byte
if (data_size + 5 > res_data.size())
{
res_data.resize(data_size * 2 + 5);
dst = res_data.data() + data_size;
}
if (*src <= ascii_upper_bound)
{
size_t dst_len = UTF8::convertCodePointToUTF8(map_ascii[*src], dst, 4);
assert(0 < dst_len && dst_len <= 4);
src += 1;
dst += dst_len;
data_size += dst_len;
continue;
}
size_t src_len = UTF8::seqLength(*src);
assert(0 < src_len && src_len <= 4);
if (src + src_len <= src_end)
{
auto src_code_point = UTF8::convertUTF8ToCodePoint(src, src_len);
if (src_code_point)
{
auto * it = map.find(*src_code_point);
if (it != map.end())
{
size_t dst_len = UTF8::convertCodePointToUTF8(it->getMapped(), dst, 4);
assert(0 < dst_len && dst_len <= 4);
src += src_len;
dst += dst_len;
data_size += dst_len;
continue;
}
}
}
else
{
src_len = src_end - src;
}
memcpy(dst, src, src_len);
dst += src_len;
src += src_len;
data_size += src_len;
}
/// Technically '\0' can be mapped into other character,
/// so we need to process '\0' delimiter separately
*dst++ = 0;
++data_size;
res_offsets[i] = data_size;
}
res_data.resize(data_size);
}
[[noreturn]] static void vectorFixed(
const ColumnString::Chars & /*data*/,
size_t /*n*/,
const std::string & /*map_from*/,
const std::string & /*map_to*/,
ColumnString::Chars & /*res_data*/)
{
throw Exception("Function translateUTF8 does not support FixedString argument", ErrorCodes::BAD_ARGUMENTS);
}
private:
static constexpr auto ascii_upper_bound = '\x7f';
};
template <typename Impl, typename Name>
class FunctionTranslate : public IFunction
{
public:
static constexpr auto name = Name::name;
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionTranslate>(); }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 3; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1, 2}; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isStringOrFixedString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isStringOrFixedString(arguments[1]))
throw Exception(
"Illegal type " + arguments[1]->getName() + " of second argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isStringOrFixedString(arguments[2]))
throw Exception(
"Illegal type " + arguments[2]->getName() + " of third argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeString>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
const ColumnPtr column_src = arguments[0].column;
const ColumnPtr column_map_from = arguments[1].column;
const ColumnPtr column_map_to = arguments[2].column;
if (!isColumnConst(*column_map_from) || !isColumnConst(*column_map_to))
throw Exception("2nd and 3rd arguments of function " + getName() + " must be constants.", ErrorCodes::ILLEGAL_COLUMN);
const IColumn * c1 = arguments[1].column.get();
const IColumn * c2 = arguments[2].column.get();
const ColumnConst * c1_const = typeid_cast<const ColumnConst *>(c1);
const ColumnConst * c2_const = typeid_cast<const ColumnConst *>(c2);
String map_from = c1_const->getValue<String>();
String map_to = c2_const->getValue<String>();
if (const ColumnString * col = checkAndGetColumn<ColumnString>(column_src.get()))
{
auto col_res = ColumnString::create();
Impl::vector(col->getChars(), col->getOffsets(), map_from, map_to, col_res->getChars(), col_res->getOffsets());
return col_res;
}
else if (const ColumnFixedString * col_fixed = checkAndGetColumn<ColumnFixedString>(column_src.get()))
{
auto col_res = ColumnFixedString::create(col_fixed->getN());
Impl::vectorFixed(col_fixed->getChars(), col_fixed->getN(), map_from, map_to, col_res->getChars());
return col_res;
}
else
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
};
namespace
{
struct NameTranslate
{
static constexpr auto name = "translate";
};
struct NameTranslateUTF8
{
static constexpr auto name = "translateUTF8";
};
using FunctionTranslateASCII = FunctionTranslate<TranslateImpl, NameTranslate>;
using FunctionTranslateUTF8 = FunctionTranslate<TranslateUTF8Impl, NameTranslateUTF8>;
}
void registerFunctionTranslate(FunctionFactory & factory)
{
factory.registerFunction<FunctionTranslateASCII>();
factory.registerFunction<FunctionTranslateUTF8>();
}
}

View File

@ -95,7 +95,7 @@ public:
}
/// Left and right streams have the same priority and are processed simultaneously
virtual JoinPipelineType pipelineType() const override { return JoinPipelineType::YShaped; }
JoinPipelineType pipelineType() const override { return JoinPipelineType::YShaped; }
private:
std::shared_ptr<TableJoin> table_join;

View File

@ -66,4 +66,19 @@ const char * InternalTextLogsQueue::getPriorityName(int priority)
return (priority >= 1 && priority <= 8) ? PRIORITIES[priority] : PRIORITIES[0];
}
bool InternalTextLogsQueue::isNeeded(int priority, const String & source) const
{
bool is_needed = priority <= max_priority;
if (is_needed && source_regexp)
is_needed = re2::RE2::PartialMatch(source, *source_regexp);
return is_needed;
}
void InternalTextLogsQueue::setSourceRegexp(const String & regexp)
{
source_regexp = std::make_unique<re2::RE2>(regexp);
}
}

View File

@ -2,7 +2,7 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/OvercommitTracker.h>
#include <Core/Block.h>
#include <re2/re2.h>
namespace DB
{
@ -15,6 +15,8 @@ public:
InternalTextLogsQueue();
bool isNeeded(int priority, const String & source) const;
static Block getSampleBlock();
static MutableColumns getSampleColumns();
@ -23,6 +25,11 @@ public:
/// Converts priority from Poco::Message::Priority to a string
static const char * getPriorityName(int priority);
void setSourceRegexp(const String & regexp);
private:
/// If not null, you should only push logs which are matched with this regexp
std::unique_ptr<re2::RE2> source_regexp;
};
using InternalTextLogsQueuePtr = std::shared_ptr<InternalTextLogsQueue>;

View File

@ -1350,7 +1350,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (!joined_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no joined plan for query");
auto add_sorting = [&settings, this] (QueryPlan & plan, const Names & key_names)
auto add_sorting = [&settings, this] (QueryPlan & plan, const Names & key_names, bool is_right)
{
SortDescription order_descr;
order_descr.reserve(key_names.size());
@ -1368,15 +1368,15 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
settings.max_bytes_before_external_sort,
this->context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
sorting_step->setStepDescription("Sort before JOIN");
sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", is_right ? "right" : "left"));
plan.addStep(std::move(sorting_step));
};
if (expressions.join->pipelineType() == JoinPipelineType::YShaped)
{
const auto & join_clause = expressions.join->getTableJoin().getOnlyClause();
add_sorting(query_plan, join_clause.key_names_left);
add_sorting(*joined_plan, join_clause.key_names_right);
add_sorting(query_plan, join_clause.key_names_left, false);
add_sorting(*joined_plan, join_clause.key_names_right, true);
}
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(

View File

@ -25,17 +25,32 @@ PartLogElement::MergeReasonType PartLogElement::getMergeReasonType(MergeType mer
{
switch (merge_type)
{
case MergeType::Regular:
return REGULAR_MERGE;
case MergeType::TTLDelete:
return TTL_DELETE_MERGE;
case MergeType::TTLRecompress:
return TTL_RECOMPRESS_MERGE;
case MergeType::Regular:
return REGULAR_MERGE;
case MergeType::TTLDelete:
return TTL_DELETE_MERGE;
case MergeType::TTLRecompress:
return TTL_RECOMPRESS_MERGE;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unknown MergeType {}", static_cast<UInt64>(merge_type));
}
PartLogElement::PartMergeAlgorithm PartLogElement::getMergeAlgorithm(MergeAlgorithm merge_algorithm_)
{
switch (merge_algorithm_)
{
case MergeAlgorithm::Undecided:
return UNDECIDED;
case MergeAlgorithm::Horizontal:
return HORIZONTAL;
case MergeAlgorithm::Vertical:
return VERTICAL;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unknown MergeAlgorithm {}", static_cast<UInt64>(merge_algorithm_));
}
NamesAndTypesList PartLogElement::getNamesAndTypes()
{
auto event_type_datatype = std::make_shared<DataTypeEnum8>(
@ -60,12 +75,22 @@ NamesAndTypesList PartLogElement::getNamesAndTypes()
}
);
auto merge_algorithm_datatype = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"Undecided", static_cast<Int8>(UNDECIDED)},
{"Horizontal", static_cast<Int8>(HORIZONTAL)},
{"Vertical", static_cast<Int8>(VERTICAL)},
}
);
ColumnsWithTypeAndName columns_with_type_and_name;
return {
{"query_id", std::make_shared<DataTypeString>()},
{"event_type", std::move(event_type_datatype)},
{"merge_reason", std::move(merge_reason_datatype)},
{"merge_algorithm", std::move(merge_algorithm_datatype)},
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
@ -104,6 +129,7 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(query_id);
columns[i++]->insert(event_type);
columns[i++]->insert(merge_reason);
columns[i++]->insert(merge_algorithm);
columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(event_time_microseconds);

View File

@ -5,6 +5,7 @@
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Storages/MergeTree/MergeType.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
namespace DB
@ -22,6 +23,14 @@ struct PartLogElement
MOVE_PART = 6,
};
/// Copy of MergeAlgorithm since values are written to disk.
enum PartMergeAlgorithm
{
UNDECIDED = 0,
VERTICAL = 1,
HORIZONTAL = 2,
};
enum MergeReasonType
{
/// merge_reason is relevant only for event_type = 'MERGE_PARTS', in other cases it is NOT_A_MERGE
@ -38,6 +47,7 @@ struct PartLogElement
Type event_type = NEW_PART;
MergeReasonType merge_reason = NOT_A_MERGE;
PartMergeAlgorithm merge_algorithm = UNDECIDED;
time_t event_time = 0;
Decimal64 event_time_microseconds = 0;
@ -72,6 +82,8 @@ struct PartLogElement
static std::string name() { return "PartLog"; }
static MergeReasonType getMergeReasonType(MergeType merge_type);
static PartMergeAlgorithm getMergeAlgorithm(MergeAlgorithm merge_algorithm_);
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;

View File

@ -24,7 +24,7 @@ void OwnSplitChannel::log(const Poco::Message & msg)
#ifdef WITH_TEXT_LOG
auto logs_queue = CurrentThread::getInternalTextLogsQueue();
if (channels.empty() && (logs_queue == nullptr || msg.getPriority() > logs_queue->max_priority))
if (channels.empty() && (logs_queue == nullptr || !logs_queue->isNeeded(msg.getPriority(), msg.getSource())))
return;
#endif
@ -93,7 +93,7 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
auto logs_queue = CurrentThread::getInternalTextLogsQueue();
/// Log to "TCP queue" if message is not too noisy
if (logs_queue && msg.getPriority() <= logs_queue->max_priority)
if (logs_queue && logs_queue->isNeeded(msg.getPriority(), msg.getSource()))
{
MutableColumns columns = InternalTextLogsQueue::getSampleColumns();

View File

@ -71,7 +71,13 @@ static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_
bool ExecutionThreadContext::executeTask()
{
OpenTelemetrySpanHolder span("ExecutionThreadContext::executeTask() " + node->processor->getName());
std::unique_ptr<OpenTelemetrySpanHolder> span;
if (trace_processors)
{
span = std::make_unique<OpenTelemetrySpanHolder>("ExecutionThreadContext::executeTask() " + node->processor->getName());
span->addAttribute("thread_number", thread_number);
}
std::optional<Stopwatch> execution_time_watch;
#ifndef NDEBUG
@ -93,17 +99,16 @@ bool ExecutionThreadContext::executeTask()
if (profile_processors)
{
UInt64 elapsed_microseconds = execution_time_watch->elapsedMicroseconds();
node->processor->elapsed_us += elapsed_microseconds;
span.addAttribute("execution_time_ms", elapsed_microseconds);
}
UInt64 elapsed_microseconds = execution_time_watch->elapsedMicroseconds();
node->processor->elapsed_us += elapsed_microseconds;
if (trace_processors)
span->addAttribute("execution_time_ms", elapsed_microseconds);
}
#ifndef NDEBUG
execution_time_ns += execution_time_watch->elapsed();
span.addAttribute("execution_time_ns", execution_time_watch->elapsed());
if (trace_processors)
span->addAttribute("execution_time_ns", execution_time_watch->elapsed());
#endif
span.addAttribute("thread_number", thread_number);
return node->exception == nullptr;
}

View File

@ -41,6 +41,7 @@ public:
const size_t thread_number;
const bool profile_processors;
const bool trace_processors;
void wait(std::atomic_bool & finished);
void wakeUp();
@ -61,10 +62,11 @@ public:
void setException(std::exception_ptr exception_) { exception = exception_; }
void rethrowExceptionIfHas();
explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_, ReadProgressCallback * callback)
explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_, bool trace_processors_, ReadProgressCallback * callback)
: read_progress_callback(callback)
, thread_number(thread_number_)
, profile_processors(profile_processors_)
, trace_processors(trace_processors_)
{}
};

View File

@ -128,7 +128,7 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea
}
}
void ExecutorTasks::init(size_t num_threads_, bool profile_processors, ReadProgressCallback * callback)
void ExecutorTasks::init(size_t num_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback)
{
num_threads = num_threads_;
threads_queue.init(num_threads);
@ -139,7 +139,7 @@ void ExecutorTasks::init(size_t num_threads_, bool profile_processors, ReadProgr
executor_contexts.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
executor_contexts.emplace_back(std::make_unique<ExecutionThreadContext>(i, profile_processors, callback));
executor_contexts.emplace_back(std::make_unique<ExecutionThreadContext>(i, profile_processors, trace_processors, callback));
}
}

View File

@ -54,7 +54,7 @@ public:
void tryGetTask(ExecutionThreadContext & context);
void pushTasks(Queue & queue, Queue & async_queue, ExecutionThreadContext & context);
void init(size_t num_threads_, bool profile_processors, ReadProgressCallback * callback);
void init(size_t num_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback);
void fill(Queue & queue);
void processAsyncTasks();

View File

@ -29,8 +29,10 @@ PipelineExecutor::PipelineExecutor(Processors & processors, QueryStatus * elem)
: process_list_element(elem)
{
if (process_list_element)
{
profile_processors = process_list_element->getContext()->getSettingsRef().log_processors_profiles;
trace_processors = process_list_element->getContext()->getSettingsRef().opentelemetry_trace_processors;
}
try
{
graph = std::make_unique<ExecutingGraph>(processors, profile_processors);
@ -268,7 +270,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
Queue queue;
graph->initializeExecution(queue);
tasks.init(num_threads, profile_processors, read_progress_callback.get());
tasks.init(num_threads, profile_processors, trace_processors, read_progress_callback.get());
tasks.fill(queue);
}

View File

@ -65,6 +65,8 @@ private:
bool is_execution_initialized = false;
/// system.processors_profile_log
bool profile_processors = false;
/// system.opentelemetry_span_log
bool trace_processors = false;
std::atomic_bool cancelled = false;

View File

@ -48,18 +48,30 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines
&processors);
}
bool JoinStep::allowPushDownToRight() const
{
return join->pipelineType() == JoinPipelineType::YShaped;
}
void JoinStep::describePipeline(FormatSettings & settings) const
{
IQueryPlanStep::describePipeline(processors, settings);
}
void JoinStep::updateLeftStream(const DataStream & left_stream_)
void JoinStep::updateInputStream(const DataStream & new_input_stream_, size_t idx)
{
input_streams = {left_stream_, input_streams.at(1)};
output_stream = DataStream
if (idx == 0)
{
.header = JoiningTransform::transformHeader(left_stream_.header, join),
};
input_streams = {new_input_stream_, input_streams.at(1)};
output_stream = DataStream
{
.header = JoiningTransform::transformHeader(new_input_stream_.header, join),
};
}
else
{
input_streams = {input_streams.at(0), new_input_stream_};
}
}
static ITransformingStep::Traits getStorageJoinTraits()

View File

@ -28,8 +28,9 @@ public:
void describePipeline(FormatSettings & settings) const override;
const JoinPtr & getJoin() const { return join; }
bool allowPushDownToRight() const;
void updateLeftStream(const DataStream & left_stream_);
void updateInputStream(const DataStream & new_input_stream_, size_t idx);
private:
JoinPtr join;

View File

@ -1,3 +1,10 @@
#include <Columns/IColumn.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
@ -11,13 +18,10 @@
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/TableJoin.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/IColumn.h>
namespace DB::ErrorCodes
{
@ -39,7 +43,8 @@ static bool filterColumnIsNotAmongAggregatesArguments(const AggregateDescription
}
static size_t
tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, const Names & allowed_inputs, bool can_remove_filter = true)
tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, const Names & allowed_inputs,
bool can_remove_filter = true, size_t child_idx = 0)
{
QueryPlan::Node * child_node = parent_node->children.front();
@ -53,7 +58,11 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
// std::cerr << "Filter: \n" << expression->dumpDAG() << std::endl;
const auto & all_inputs = child->getInputStreams().front().header.getColumnsWithTypeAndName();
if (child_idx >= child->getInputStreams().size() || child_idx >= child_node->children.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Child index {} is out of range (streams: {}, children: {})",
child_idx, child->getInputStreams().size(), child_node->children.size());
const auto & all_inputs = child->getInputStreams()[child_idx].header.getColumnsWithTypeAndName();
auto split_filter = expression->cloneActionsForFilterPushDown(filter_column_name, removes_filter, allowed_inputs, all_inputs);
if (!split_filter)
@ -75,7 +84,8 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
/// Expression/Filter -> Aggregating -> Something
auto & node = nodes.emplace_back();
node.children.emplace_back(&node);
std::swap(node.children[0], child_node->children[0]);
std::swap(node.children[0], child_node->children[child_idx]);
/// Expression/Filter -> Aggregating -> Filter -> Something
/// New filter column is the first one.
@ -90,7 +100,9 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
else
{
if (auto * join = typeid_cast<JoinStep *>(child.get()))
join->updateLeftStream(node.step->getOutputStream());
{
join->updateInputStream(node.step->getOutputStream(), child_idx);
}
else
throw Exception(
ErrorCodes::LOGICAL_ERROR, "We are trying to push down a filter through a step for which we cannot update input stream");
@ -208,25 +220,29 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
if (auto * join = typeid_cast<JoinStep *>(child.get()))
{
const auto & table_join = join->getJoin()->getTableJoin();
/// Push down is for left table only. We need to update JoinStep for push down into right.
/// Only inner and left join are supported. Other types may generate default values for left table keys.
/// So, if we push down a condition like `key != 0`, not all rows may be filtered.
if (table_join.kind() == ASTTableJoin::Kind::Inner || table_join.kind() == ASTTableJoin::Kind::Left)
auto join_push_down = [&](ASTTableJoin::Kind kind) -> size_t
{
const auto & left_header = join->getInputStreams().front().header;
const auto & table_join = join->getJoin()->getTableJoin();
/// Only inner and left(/right) join are supported. Other types may generate default values for left table keys.
/// So, if we push down a condition like `key != 0`, not all rows may be filtered.
if (table_join.kind() != ASTTableJoin::Kind::Inner && table_join.kind() != kind)
return 0;
bool is_left = kind == ASTTableJoin::Kind::Left;
const auto & input_header = is_left ? join->getInputStreams().front().header : join->getInputStreams().back().header;
const auto & res_header = join->getOutputStream().header;
Names allowed_keys;
const auto & source_columns = left_header.getNames();
const auto & source_columns = input_header.getNames();
for (const auto & name : source_columns)
{
/// Skip key if it is renamed.
/// I don't know if it is possible. Just in case.
if (!left_header.has(name) || !res_header.has(name))
if (!input_header.has(name) || !res_header.has(name))
continue;
/// Skip if type is changed. Push down expression expect equal types.
if (!left_header.getByName(name).type->equals(*res_header.getByName(name).type))
if (!input_header.getByName(name).type->equals(*res_header.getByName(name).type))
continue;
allowed_keys.push_back(name);
@ -234,7 +250,21 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
const bool can_remove_filter
= std::find(source_columns.begin(), source_columns.end(), filter->getFilterColumnName()) == source_columns.end();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_keys, can_remove_filter))
size_t updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_keys, can_remove_filter, is_left ? 0 : 1);
if (updated_steps > 0)
{
LOG_DEBUG(&Poco::Logger::get("tryPushDownFilter"), "Pushed down filter to {} side of join", kind);
}
return updated_steps;
};
if (size_t updated_steps = join_push_down(ASTTableJoin::Kind::Left))
return updated_steps;
/// For full sorting merge join we push down both to the left and right tables, because left and right streams are not independent.
if (join->allowPushDownToRight())
{
if (size_t updated_steps = join_push_down(ASTTableJoin::Kind::Right))
return updated_steps;
}
}

View File

@ -855,7 +855,7 @@ MergeJoinTransform::MergeJoinTransform(
void MergeJoinTransform::onFinish()
{
algorithm.logElapsed(total_stopwatch.elapsedSeconds(), true);
algorithm.logElapsed(total_stopwatch.elapsedSeconds());
}
}

View File

@ -233,19 +233,14 @@ public:
virtual void consume(Input & input, size_t source_num) override;
virtual Status merge() override;
void logElapsed(double seconds, bool force)
void logElapsed(double seconds)
{
/// Do not log more frequently than once per ten seconds
if (seconds - stat.last_log_seconds < 10 && !force)
return;
LOG_TRACE(log,
"Finished pocessing in {} seconds"
", left: {} blocks, {} rows; right: {} blocks, {} rows"
", max blocks loaded to memory: {}",
seconds, stat.num_blocks[0], stat.num_rows[0], stat.num_blocks[1], stat.num_rows[1],
stat.max_blocks_loaded);
stat.last_log_seconds = seconds;
}
private:
@ -277,8 +272,6 @@ private:
size_t num_rows[2] = {0, 0};
size_t max_blocks_loaded = 0;
double last_log_seconds = 0;
};
Statistic stat;
@ -303,12 +296,6 @@ public:
protected:
void onFinish() override;
void work() override
{
algorithm.logElapsed(total_stopwatch.elapsedSeconds(), true);
Base::work();
}
Poco::Logger * log;
};

View File

@ -848,6 +848,7 @@ namespace
{
logs_queue = std::make_shared<InternalTextLogsQueue>();
logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
logs_queue->setSourceRegexp(settings.send_logs_source_regexp);
CurrentThread::attachInternalTextLogsQueue(logs_queue, client_logs_level);
CurrentThread::setFatalErrorCallback([this]{ onFatalError(); });
}

View File

@ -241,6 +241,7 @@ void TCPHandler::runImpl()
{
state.logs_queue = std::make_shared<InternalTextLogsQueue>();
state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
state.logs_queue->setSourceRegexp(query_context->getSettingsRef().send_logs_source_regexp);
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level);
CurrentThread::setFatalErrorCallback([this]
{

View File

@ -6229,8 +6229,13 @@ try
part_log_elem.event_type = type;
if (part_log_elem.event_type == PartLogElement::MERGE_PARTS)
{
if (merge_entry)
{
part_log_elem.merge_reason = PartLogElement::getMergeReasonType((*merge_entry)->merge_type);
part_log_elem.merge_algorithm = PartLogElement::getMergeAlgorithm((*merge_entry)->merge_algorithm);
}
}
part_log_elem.error = static_cast<UInt16>(execution_status.code);
part_log_elem.exception = execution_status.message;

View File

@ -15,6 +15,7 @@
#include <Parsers/ASTLiteral.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Transforms/MongoDBSource.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
{
@ -86,6 +87,62 @@ void StorageMongoDB::connectIfNotConnected()
}
class StorageMongoDBSink : public SinkToStorage
{
public:
explicit StorageMongoDBSink(
const std::string & collection_name_,
const std::string & db_name_,
const StorageMetadataPtr & metadata_snapshot_,
std::shared_ptr<Poco::MongoDB::Connection> connection_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, collection_name(collection_name_)
, db_name(db_name_)
, metadata_snapshot{metadata_snapshot_}
, connection(connection_)
{
}
String getName() const override { return "StorageMongoDBSink"; }
void consume(Chunk chunk) override
{
Poco::MongoDB::Database db(db_name);
Poco::MongoDB::Document::Ptr index = new Poco::MongoDB::Document();
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
size_t num_rows = block.rows();
size_t num_cols = block.columns();
const auto columns = block.getColumns();
const auto data_types = block.getDataTypes();
const auto data_names = block.getNames();
std::vector<std::string> row(num_cols);
for (const auto i : collections::range(0, num_rows))
{
for (const auto j : collections::range(0, num_cols))
{
WriteBufferFromOwnString ostr;
data_types[j]->getDefaultSerialization()->serializeText(*columns[j], i, ostr, FormatSettings{});
row[j] = ostr.str();
index->add(data_names[j], row[j]);
}
}
Poco::SharedPtr<Poco::MongoDB::InsertRequest> insert_request = db.createInsertRequest(collection_name);
insert_request->documents().push_back(index);
connection->sendRequest(*insert_request);
}
private:
String collection_name;
String db_name;
StorageMetadataPtr metadata_snapshot;
std::shared_ptr<Poco::MongoDB::Connection> connection;
};
Pipe StorageMongoDB::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@ -109,6 +166,11 @@ Pipe StorageMongoDB::read(
return Pipe(std::make_shared<MongoDBSource>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size));
}
SinkToStoragePtr StorageMongoDB::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr /* context */)
{
connectIfNotConnected();
return std::make_shared<StorageMongoDBSink>(collection_name, database_name, metadata_snapshot, connection);
}
StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, ContextPtr context)
{

View File

@ -39,6 +39,11 @@ public:
size_t max_block_size,
unsigned num_streams) override;
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr context) override;
static StorageMongoDBConfiguration getConfiguration(ASTs engine_args, ContextPtr context);
private:

View File

@ -5,10 +5,14 @@
#include <mutex>
#include <filesystem>
#include <unordered_map>
#include <base/scope_guard.h>
#include <Storages/System/StorageSystemStackTrace.h>
#include <Storages/VirtualColumnUtils.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
@ -16,8 +20,11 @@
#include <IO/ReadBufferFromFile.h>
#include <Common/PipeFDs.h>
#include <Common/CurrentThread.h>
#include <base/getThreadId.h>
#include <Common/HashTable/Hash.h>
#include <Common/logger_useful.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
#include <base/getThreadId.h>
namespace DB
@ -147,13 +154,84 @@ namespace
throw Exception("Logical error: read wrong number of bytes from pipe", ErrorCodes::LOGICAL_ERROR);
}
}
ColumnPtr getFilteredThreadIds(ASTPtr query, ContextPtr context)
{
MutableColumnPtr all_thread_ids = ColumnUInt64::create();
std::filesystem::directory_iterator end;
/// There is no better way to enumerate threads in a process other than looking into procfs.
for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
{
pid_t tid = parse<pid_t>(it->path().filename());
all_thread_ids->insert(tid);
}
Block block { ColumnWithTypeAndName(std::move(all_thread_ids), std::make_shared<DataTypeUInt64>(), "thread_id") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
return block.getByPosition(0).column;
}
using ThreadIdToName = std::unordered_map<UInt64, String, DefaultHash<UInt64>>;
ThreadIdToName getFilteredThreadNames(ASTPtr query, ContextPtr context, const PaddedPODArray<UInt64> & thread_ids)
{
ThreadIdToName tid_to_name;
MutableColumnPtr all_thread_names = ColumnString::create();
for (UInt64 tid : thread_ids)
{
std::filesystem::path thread_name_path = fmt::format("/proc/self/task/{}/comm", tid);
String thread_name;
if (std::filesystem::exists(thread_name_path))
{
constexpr size_t comm_buf_size = 32; /// More than enough for thread name
ReadBufferFromFile comm(thread_name_path.string(), comm_buf_size);
readEscapedStringUntilEOL(thread_name, comm);
comm.close();
}
tid_to_name[tid] = thread_name;
all_thread_names->insert(thread_name);
}
Block block { ColumnWithTypeAndName(std::move(all_thread_names), std::make_shared<DataTypeString>(), "thread_name") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
ColumnPtr thread_names = std::move(block.getByPosition(0).column);
std::unordered_set<String> filtered_thread_names;
for (size_t i = 0; i != thread_names->size(); ++i)
{
const auto & thread_name = thread_names->getDataAt(i);
filtered_thread_names.emplace(thread_name);
}
for (auto it = tid_to_name.begin(); it != tid_to_name.end();)
{
if (!filtered_thread_names.contains(it->second))
it = tid_to_name.erase(it);
else
++it;
}
return tid_to_name;
}
}
StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
: IStorageSystemOneBlock<StorageSystemStackTrace>(table_id_)
: IStorage(table_id_)
, log(&Poco::Logger::get("StorageSystemStackTrace"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(ColumnsDescription({
{ "thread_name", std::make_shared<DataTypeString>() },
{ "thread_id", std::make_shared<DataTypeUInt64>() },
{ "query_id", std::make_shared<DataTypeString>() },
{ "trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) },
}, { /* aliases */ }));
setInMemoryMetadata(storage_metadata);
notification_pipe.open();
/// Setup signal handler.
@ -173,23 +251,40 @@ StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
}
NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes()
Pipe StorageSystemStackTrace::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
return
{
{ "thread_name", std::make_shared<DataTypeString>() },
{ "thread_id", std::make_shared<DataTypeUInt64>() },
{ "query_id", std::make_shared<DataTypeString>() },
{ "trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) }
};
}
storage_snapshot->check(column_names);
void StorageSystemStackTrace::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
{
/// It shouldn't be possible to do concurrent reads from this table.
std::lock_guard lock(mutex);
/// Create a mask of what columns are needed in the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock();
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
}
}
bool send_signal = names_set.contains("trace") || names_set.contains("query_id");
bool read_thread_names = names_set.contains("thread_name");
MutableColumns res_columns = sample_block.cloneEmptyColumns();
/// Send a signal to every thread and wait for result.
/// We must wait for every thread one by one sequentially,
/// because there is a limit on number of queued signals in OS and otherwise signals may get lost.
@ -197,71 +292,85 @@ void StorageSystemStackTrace::fillData(MutableColumns & res_columns, ContextPtr,
/// Obviously, results for different threads may be out of sync.
/// There is no better way to enumerate threads in a process other than looking into procfs.
ColumnPtr thread_ids = getFilteredThreadIds(query_info.query, context);
const auto & thread_ids_data = assert_cast<const ColumnUInt64 &>(*thread_ids).getData();
std::filesystem::directory_iterator end;
for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
ThreadIdToName thread_names;
if (read_thread_names)
thread_names = getFilteredThreadNames(query_info.query, context, thread_ids_data);
for (UInt64 tid : thread_ids_data)
{
pid_t tid = parse<pid_t>(it->path().filename());
sigval sig_value{};
sig_value.sival_int = sequence_num.load(std::memory_order_acquire);
if (0 != ::sigqueue(tid, sig, sig_value))
{
/// The thread may has been already finished.
if (ESRCH == errno)
continue;
throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
}
std::filesystem::path thread_name_path = it->path();
thread_name_path.append("comm");
size_t res_index = 0;
String thread_name;
if (std::filesystem::exists(thread_name_path))
if (read_thread_names)
{
constexpr size_t comm_buf_size = 32; /// More than enough for thread name
ReadBufferFromFile comm(thread_name_path.string(), comm_buf_size);
readEscapedStringUntilEOL(thread_name, comm);
comm.close();
if (auto it = thread_names.find(tid); it != thread_names.end())
thread_name = it->second;
else
continue; /// was filtered out by "thread_name" condition
}
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (wait(100) && sig_value.sival_int == data_ready_num.load(std::memory_order_acquire))
if (!send_signal)
{
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace.getFramePointers()[i]));
res_columns[0]->insert(thread_name);
res_columns[1]->insert(tid);
res_columns[2]->insertData(query_id_data, query_id_size);
res_columns[3]->insert(arr);
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertDefault();
res_columns[res_index++]->insertDefault();
}
else
{
LOG_DEBUG(log, "Cannot obtain a stack trace for thread {}", tid);
sigval sig_value{};
/// Cannot obtain a stack trace. But create a record in result nevertheless.
sig_value.sival_int = sequence_num.load(std::memory_order_acquire);
if (0 != ::sigqueue(tid, sig, sig_value))
{
/// The thread may has been already finished.
if (ESRCH == errno)
continue;
res_columns[0]->insert(thread_name);
res_columns[1]->insert(tid);
res_columns[2]->insertDefault();
res_columns[3]->insertDefault();
throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
}
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (send_signal && wait(100) && sig_value.sival_int == data_ready_num.load(std::memory_order_acquire))
{
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace.getFramePointers()[i]));
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertData(query_id_data, query_id_size);
res_columns[res_index++]->insert(arr);
}
else
{
LOG_DEBUG(log, "Cannot obtain a stack trace for thread {}", tid);
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertDefault();
res_columns[res_index++]->insertDefault();
}
/// Signed integer overflow is undefined behavior in both C and C++. However, according to
/// C++ standard, Atomic signed integer arithmetic is defined to use two's complement; there
/// are no undefined results. See https://en.cppreference.com/w/cpp/atomic/atomic and
/// http://eel.is/c++draft/atomics.types.generic#atomics.types.int-8
++sequence_num;
}
/// Signed integer overflow is undefined behavior in both C and C++. However, according to
/// C++ standard, Atomic signed integer arithmetic is defined to use two's complement; there
/// are no undefined results. See https://en.cppreference.com/w/cpp/atomic/atomic and
/// http://eel.is/c++draft/atomics.types.generic#atomics.types.int-8
++sequence_num;
}
UInt64 num_rows = res_columns.at(0)->size();
Chunk chunk(std::move(res_columns), num_rows);
return Pipe(std::make_shared<SourceFromSingleChunk>(sample_block, std::move(chunk)));
}
}

View File

@ -3,7 +3,7 @@
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
#include <mutex>
#include <Storages/System/IStorageSystemOneBlock.h>
#include <Storages/IStorage.h>
namespace Poco
{
@ -19,20 +19,26 @@ class Context;
/// Allows to introspect stack trace of all server threads.
/// It acts like an embedded debugger.
/// More than one instance of this table cannot be used.
class StorageSystemStackTrace final : public IStorageSystemOneBlock<StorageSystemStackTrace>
class StorageSystemStackTrace final : public IStorage
{
public:
explicit StorageSystemStackTrace(const StorageID & table_id_);
String getName() const override { return "SystemStackTrace"; }
static NamesAndTypesList getNamesAndTypes();
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
bool isSystemStorage() const override { return true; }
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
mutable std::mutex mutex;
Poco::Logger * log;
};

View File

@ -0,0 +1,104 @@
#include <Common/Exception.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <TableFunctions/TableFunctionMongoDB.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <TableFunctions/registerTableFunctions.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/ColumnsDescription.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr TableFunctionMongoDB::executeImpl(const ASTPtr & /*ast_function*/,
ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto storage = std::make_shared<StorageMongoDB>(
StorageID(configuration->database, table_name),
configuration->host,
configuration->port,
configuration->database,
configuration->table,
configuration->username,
configuration->password,
configuration->options,
columns,
ConstraintsDescription(),
String{});
storage->startup();
return storage;
}
ColumnsDescription TableFunctionMongoDB::getActualTableStructure(ContextPtr context) const
{
return parseColumnsListFromString(structure, context);
}
void TableFunctionMongoDB::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
const auto & func_args = ast_function->as<ASTFunction &>();
if (!func_args.arguments)
throw Exception("Table function 'mongodb' must have arguments.", ErrorCodes::BAD_ARGUMENTS);
ASTs & args = func_args.arguments->children;
if (args.size() < 6 || args.size() > 7)
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Table function 'mongodb' requires from 6 to 7 parameters: mongodb('host:port', database, collection, 'user', 'password', structure, [, 'options'])");
}
ASTs main_arguments(args.begin(), args.begin() + 5);
for (size_t i = 5; i < args.size(); ++i)
{
if (const auto * ast_func = typeid_cast<const ASTFunction *>(args[i].get()))
{
const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_func->arguments.get());
auto function_args = args_expr->children;
if (function_args.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
if (arg_name == "structure")
structure = checkAndGetLiteralArgument<String>(function_args[1], "structure");
else if (arg_name == "options")
main_arguments.push_back(function_args[1]);
}
else if (i == 5)
{
structure = checkAndGetLiteralArgument<String>(args[i], "structure");
}
else if (i == 6)
{
main_arguments.push_back(args[i]);
}
}
configuration = StorageMongoDB::getConfiguration(main_arguments, context);
}
void registerTableFunctionMongoDB(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionMongoDB>();
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StorageMongoDB.h>
namespace DB
{
class TableFunctionMongoDB : public ITableFunction
{
public:
static constexpr auto name = "mongodb";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context,
const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "MongoDB"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
std::optional<StorageMongoDBConfiguration> configuration;
String structure;
};
}

View File

@ -19,6 +19,7 @@ void registerTableFunctions()
registerTableFunctionValues(factory);
registerTableFunctionInput(factory);
registerTableFunctionGenerate(factory);
registerTableFunctionMongoDB(factory);
registerTableFunctionMeiliSearch(factory);

View File

@ -17,6 +17,7 @@ void registerTableFunctionURL(TableFunctionFactory & factory);
void registerTableFunctionValues(TableFunctionFactory & factory);
void registerTableFunctionInput(TableFunctionFactory & factory);
void registerTableFunctionGenerate(TableFunctionFactory & factory);
void registerTableFunctionMongoDB(TableFunctionFactory & factory);
void registerTableFunctionMeiliSearch(TableFunctionFactory & factory);

View File

@ -34,6 +34,16 @@
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
<data_cache_enabled>1</data_cache_enabled>
</s3_with_cache>
<s3_with_cache_and_jbod>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
<data_cache_enabled>1</data_cache_enabled>
<data_cache_path>/jbod1/</data_cache_path>
<data_cache_max_size>1000000000</data_cache_max_size>
</s3_with_cache_and_jbod>
</disks>
<policies>
<s3>
@ -67,6 +77,13 @@
</main>
</volumes>
</s3_cache>
<s3_with_cache_and_jbod>
<volumes>
<main>
<disk>s3_with_cache_and_jbod</disk>
</main>
</volumes>
</s3_with_cache_and_jbod>
</policies>
</storage_configuration>

View File

@ -26,6 +26,18 @@ def cluster():
],
with_minio=True,
)
cluster.add_instance(
"node_with_limited_disk",
main_configs=[
"configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
],
with_minio=True,
tmpfs=[
"/jbod1:size=2M",
],
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -678,3 +690,22 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name):
minio = cluster.minio_client
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@pytest.mark.parametrize("node_name", ["node_with_limited_disk"])
def test_cache_with_full_disk_space(cluster, node_name):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node.query(
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_with_cache_and_jbod';"
)
node.query(
"INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 500000"
)
node.query(
"SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value FORMAT Null"
)
assert node.contains_in_log(
"Insert into cache is skipped due to insufficient disk space"
)
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")

View File

@ -253,3 +253,30 @@ def test_missing_columns(started_cluster):
result = node.query("SELECT count() FROM simple_mongo_table WHERE isNull(data)")
assert result == "10\n"
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_simple_insert_select(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
node = started_cluster.instances["node"]
node.query("DROP TABLE IF EXISTS simple_mongo_table")
node.query(
"CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')"
)
node.query("INSERT INTO simple_mongo_table SELECT 1, 'kek'")
assert (
node.query("SELECT data from simple_mongo_table where key = 1").strip() == "kek"
)
node.query("INSERT INTO simple_mongo_table(key) SELECT 12")
assert int(node.query("SELECT count() from simple_mongo_table")) == 2
assert (
node.query("SELECT data from simple_mongo_table where key = 12").strip() == ""
)
node.query("DROP TABLE simple_mongo_table")
simple_mongo_table.drop()

View File

@ -0,0 +1,8 @@
<clickhouse>
<openSSL>
<client>
<!-- For self-signed certificate -->
<verificationMode>none</verificationMode>
</client>
</openSSL>
</clickhouse>

View File

@ -0,0 +1,276 @@
import pymongo
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
@pytest.fixture(scope="module")
def started_cluster(request):
try:
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
with_mongo=True,
main_configs=[
"configs_secure/config.d/ssl_conf.xml",
],
with_mongo_secure=request.param,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_mongo_connection(started_cluster, secure=False, with_credentials=True):
connection_str = ""
if with_credentials:
connection_str = "mongodb://root:clickhouse@localhost:{}".format(
started_cluster.mongo_port
)
else:
connection_str = "mongodb://localhost:{}".format(
started_cluster.mongo_no_cred_port
)
if secure:
connection_str += "/?tls=true&tlsAllowInvalidCertificates=true"
return pymongo.MongoClient(connection_str)
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_simple_select(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
node = started_cluster.instances["node"]
for i in range(0, 100):
node.query(
"INSERT INTO FUNCTION mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String') (key, data) VALUES ({}, '{}')".format(
i, hex(i * i)
)
)
assert (
node.query(
"SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
)
== "100\n"
)
assert (
node.query(
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
)
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query(
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', 'key UInt64, data String')"
)
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query(
"SELECT data from mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String') where key = 42"
)
== hex(42 * 42) + "\n"
)
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_complex_data_type(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
incomplete_mongo_table = db["complex_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i), "dict": {"a": i, "b": str(i)}})
incomplete_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
assert (
node.query(
"SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)')"
)
== "100\n"
)
assert (
node.query(
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)')"
)
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query(
"SELECT data from mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)') where key = 42"
)
== hex(42 * 42) + "\n"
)
incomplete_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_incorrect_data_type(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
strange_mongo_table = db["strange_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i), "aaaa": "Hello"})
strange_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
with pytest.raises(QueryRuntimeException):
node.query(
"SELECT aaaa FROM mongodb('mongo1:27017', 'test', 'strange_table', 'root', 'clickhouse', structure='key UInt64, data String')"
)
strange_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [True], indirect=["started_cluster"])
def test_secure_connection(started_cluster):
mongo_connection = get_mongo_connection(started_cluster, secure=True)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
assert (
node.query(
"SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true')"
)
== "100\n"
)
assert (
node.query(
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true')"
)
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query(
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', 'key UInt64, data String', 'ssl=true')"
)
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query(
"SELECT data from mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true') where key = 42"
)
== hex(42 * 42) + "\n"
)
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_predefined_connection_configuration(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
assert (
node.query(
"SELECT count() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
)
== "100\n"
)
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_no_credentials(started_cluster):
mongo_connection = get_mongo_connection(started_cluster, with_credentials=False)
db = mongo_connection["test"]
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
assert (
node.query(
"SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', '', '', structure='key UInt64, data String')"
)
== "100\n"
)
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_auth_source(started_cluster):
mongo_connection = get_mongo_connection(started_cluster, with_credentials=False)
admin_db = mongo_connection["admin"]
admin_db.add_user(
"root",
"clickhouse",
roles=[{"role": "userAdminAnyDatabase", "db": "admin"}, "readWriteAnyDatabase"],
)
simple_mongo_table = admin_db["simple_table"]
data = []
for i in range(0, 50):
data.append({"key": i, "data": hex(i * i)})
simple_mongo_table.insert_many(data)
db = mongo_connection["test"]
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
node.query_and_get_error(
"SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
)
assert (
node.query(
"SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='authSource=admin')"
)
== "100\n"
)
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_missing_columns(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 10):
data.append({"key": i, "data": hex(i * i)})
for i in range(0, 10):
data.append({"key": i})
simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
result = node.query(
"SELECT count() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data Nullable(String)') WHERE isNull(data)"
)
assert result == "10\n"
simple_mongo_table.drop()

View File

@ -1 +1,18 @@
-- { echo }
SELECT count() > 0 FROM system.stack_trace WHERE query_id != '';
1
-- opimization for not reading /proc/self/task/{}/comm and avoid sending signal
SELECT countIf(thread_id > 0) > 0 FROM system.stack_trace;
1
-- optimization for trace
SELECT length(trace) > 0 FROM system.stack_trace LIMIT 1;
1
-- optimization for query_id
SELECT length(query_id) > 0 FROM system.stack_trace WHERE query_id != '' LIMIT 1;
1
-- optimization for thread_name
SELECT length(thread_name) > 0 FROM system.stack_trace WHERE thread_name != '' LIMIT 1;
1
-- enough rows (optimizations works "correctly")
SELECT count() > 100 FROM system.stack_trace;
1

View File

@ -1,4 +1,14 @@
-- Tags: race
-- at least this query should be present
-- { echo }
SELECT count() > 0 FROM system.stack_trace WHERE query_id != '';
-- opimization for not reading /proc/self/task/{}/comm and avoid sending signal
SELECT countIf(thread_id > 0) > 0 FROM system.stack_trace;
-- optimization for trace
SELECT length(trace) > 0 FROM system.stack_trace LIMIT 1;
-- optimization for query_id
SELECT length(query_id) > 0 FROM system.stack_trace WHERE query_id != '' LIMIT 1;
-- optimization for thread_name
SELECT length(thread_name) > 0 FROM system.stack_trace WHERE thread_name != '' LIMIT 1;
-- enough rows (optimizations works "correctly")
SELECT count() > 100 FROM system.stack_trace;

View File

@ -0,0 +1,40 @@
#!/usr/bin/env bash
# Tags: no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# Data preparation.
# Now we can get the user_files_path by use the table file function for trick. also we can get it by query as:
# "insert into function file('exist.txt', 'CSV', 'val1 char') values ('aaaa'); select _path from file('exist.txt', 'CSV', 'val1 char')"
CLICKHOUSE_USER_FILES_PATH=$(clickhouse-client --query "select _path, _file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
mkdir -p ${CLICKHOUSE_USER_FILES_PATH}/
rm -rf ${CLICKHOUSE_USER_FILES_PATH}/file_{0..10}.csv
echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_0.csv
echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_1.csv
echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_2.csv
echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_3.csv
echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_4.csv
echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_5.csv
echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_6.csv
echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_7.csv
echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_8.csv
echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_9.csv
echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_10.csv
# echo '' > ${CLICKHOUSE_USER_FILES_PATH}/file_10.csv
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_regex;"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_regex (id UInt64) ENGINE = MergeTree() order by id;"
${CLICKHOUSE_CLIENT} -q "INSERT INTO t_regex SELECT * FROM file('file_{0..10}.csv','CSV');"
${CLICKHOUSE_CLIENT} -q "SELECT count() from t_regex;"
rm -rf ${CLICKHOUSE_USER_FILES_PATH}/file_{0..10}.csv;
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_regex;"

View File

@ -0,0 +1,16 @@
Hello, world!
cagaacgttc
jihgfe
jihgff
jihgfg
jihgfh
jihgfi
HotelGenev
ードとは
¿йðՅন𐐏
¿йðՅনন
¿йðՅনՅ
¿йðՅনð
¿йðՅনй
abc
abc

View File

@ -0,0 +1,13 @@
SELECT translate('Hello? world.', '.?', '!,');
SELECT translate('gtcttgcaag', 'ACGTacgt', 'TGCAtgca');
SELECT translate(toString(number), '0123456789', 'abcdefghij') FROM numbers(987654, 5);
SELECT translateUTF8('HôtelGenèv', 'Ááéíóúôè', 'aaeiouoe');
SELECT translateUTF8('中文内码', '久标准中文内码', 'ユニコードとは');
SELECT translateUTF8(toString(number), '1234567890', 'ዩय𐑿𐐏নՅðй¿ค') FROM numbers(987654, 5);
SELECT translate('abc', '', '');
SELECT translateUTF8('abc', '', '');
SELECT translate('abc', 'Ááéíóúôè', 'aaeiouoe'); -- { serverError 36 }
SELECT translateUTF8('abc', 'efg', ''); -- { serverError 36 }

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Tags: race
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "create table tab (x UInt64, y String) engine = MergeTree order by x"
for _ in $(seq 1 100); do timeout -s 2 0.05 $CLICKHOUSE_CLIENT --interactive_delay 1000 -q "insert into tab select number, toString(number) from system.numbers" || true; done

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
[ ! -z "$CLICKHOUSE_CLIENT_REDEFINED" ] && CLICKHOUSE_CLIENT=$CLICKHOUSE_CLIENT_REDEFINED
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=trace/g')
regexp="executeQuery|InterpreterSelectQuery"
$CLICKHOUSE_CLIENT --send_logs_source_regexp "$regexp" -q "SELECT 1;" 2> >(grep -v -E "$regexp" 1>&2)

View File

@ -0,0 +1,5 @@
data_horizontal all_1_1_0 NewPart Undecided
data_horizontal all_1_1_1 MergeParts Horizontal
data_vertical all_1_1_0 NewPart Undecided
data_vertical all_2_2_0 NewPart Undecided
data_vertical all_1_2_1 MergeParts Vertical

View File

@ -0,0 +1,26 @@
CREATE TABLE data_horizontal (
key Int
)
Engine=MergeTree()
ORDER BY key;
INSERT INTO data_horizontal VALUES (1);
OPTIMIZE TABLE data_horizontal FINAL;
SYSTEM FLUSH LOGS;
SELECT table, part_name, event_type, merge_algorithm FROM system.part_log WHERE event_date >= yesterday() AND database = currentDatabase() AND table = 'data_horizontal' ORDER BY event_time_microseconds;
CREATE TABLE data_vertical
(
key UInt64,
value String
)
ENGINE = MergeTree()
ORDER BY key
SETTINGS index_granularity_bytes = 0, enable_mixed_granularity_parts = 0, min_bytes_for_wide_part = 0,
vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1;
INSERT INTO data_vertical VALUES (1, '1');
INSERT INTO data_vertical VALUES (2, '2');
OPTIMIZE TABLE data_vertical FINAL;
SYSTEM FLUSH LOGS;
SELECT table, part_name, event_type, merge_algorithm FROM system.part_log WHERE event_date >= yesterday() AND database = currentDatabase() AND table = 'data_vertical' ORDER BY event_time_microseconds;

View File

@ -0,0 +1,10 @@
{'fruit':'apple','season':'autumn'}
{'fruit':'apple','season':'autumn'}
{'fruit':'apple','season':'autumn'}
{'fruit':'apple','season':'autumn'}
{'fruit':'apple','season':'autumn'}
{'season':'autumn','fruit':'apple'}
{'season':'autumn','fruit':'apple'}
{'season':'autumn','fruit':'apple'}
{'season':'autumn','fruit':'apple'}
{'season':'autumn','fruit':'apple'}

View File

@ -0,0 +1,11 @@
-- Tags: no-backward-compatibility-check
DROP TABLE IF EXISTS map_test;
CREATE TABLE map_test(`tags` Map(String, String)) ENGINE = MergeTree PRIMARY KEY tags ORDER BY tags SETTINGS index_granularity = 8192;
INSERT INTO map_test (tags) VALUES (map('fruit','apple','color','red'));
INSERT INTO map_test (tags) VALUES (map('fruit','apple','color','red'));
INSERT INTO map_test (tags) VALUES (map('fruit','apple','color','red'));
INSERT INTO map_test (tags) VALUES (map('fruit','apple','color','red'));
INSERT INTO map_test (tags) VALUES (map('fruit','apple','color','red'));
SELECT mapUpdate(mapFilter((k, v) -> (k in ('fruit')), tags), map('season', 'autumn')) FROM map_test;
SELECT mapUpdate(map('season','autumn'), mapFilter((k, v) -> (k in ('fruit')), tags)) FROM map_test;
DROP TABLE map_test;