Merge branch 'master' into trying_actions

This commit is contained in:
mergify[bot] 2021-09-23 15:48:57 +00:00 committed by GitHub
commit 88b7680eeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
321 changed files with 6925 additions and 2785 deletions

3
.gitignore vendored
View File

@ -33,6 +33,9 @@
/docs/ja/single.md
/docs/fa/single.md
/docs/en/development/cmake-in-clickhouse.md
/docs/ja/development/cmake-in-clickhouse.md
/docs/zh/development/cmake-in-clickhouse.md
/docs/ru/development/cmake-in-clickhouse.md
# callgrind files
callgrind.out.*

View File

@ -1,2 +1,2 @@
To see the list of authors who created the source code of ClickHouse, published and distributed by YANDEX LLC as the owner,
To see the list of authors who created the source code of ClickHouse, published and distributed by ClickHouse, Inc. as the owner,
run "SELECT * FROM system.contributors;" query on any ClickHouse server.

View File

@ -1,4 +1,4 @@
Copyright 2016-2021 Yandex LLC
Copyright 2016-2021 ClickHouse, Inc.
Apache License
Version 2.0, January 2004

View File

@ -28,15 +28,16 @@ The following versions of ClickHouse server are currently being supported with s
| 21.3 | ✅ |
| 21.4 | :x: |
| 21.5 | :x: |
| 21.6 | |
| 21.6 | :x: |
| 21.7 | ✅ |
| 21.8 | ✅ |
| 21.9 | ✅ |
## Reporting a Vulnerability
We're extremely grateful for security researchers and users that report vulnerabilities to the ClickHouse Open Source Community. All reports are thoroughly investigated by developers.
To report a potential vulnerability in ClickHouse please send the details about it to [clickhouse-feedback@yandex-team.com](mailto:clickhouse-feedback@yandex-team.com).
To report a potential vulnerability in ClickHouse please send the details about it to [security@clickhouse.com](mailto:security@clickhouse.com).
### When Should I Report a Vulnerability?

9
debian/control vendored
View File

@ -1,16 +1,13 @@
Source: clickhouse
Section: database
Priority: optional
Maintainer: Alexey Milovidov <milovidov@yandex-team.ru>
Maintainer: Alexey Milovidov <milovidov@clickhouse.com>
Build-Depends: debhelper (>= 9),
cmake | cmake3,
ninja-build,
clang-11,
llvm-11,
clang-13,
llvm-13,
libc6-dev,
libicu-dev,
libreadline-dev,
gperf,
tzdata
Standards-Version: 3.9.8

View File

@ -70,6 +70,9 @@ def compress_stress_logs(output_path, files_prefix):
def prepare_for_hung_check(drop_databases):
# FIXME this function should not exist, but...
# ThreadFuzzer significantly slows down server and causes false-positive hung check failures
call("clickhouse client -q 'SYSTEM STOP THREAD FUZZER'", shell=True, stderr=STDOUT)
# We attach gdb to clickhouse-server before running tests
# to print stacktraces of all crashes even if clickhouse cannot print it for some reason.
# However, it obstruct checking for hung queries.

View File

@ -6,4 +6,4 @@ toc_title: Cloud
# ClickHouse Cloud Service {#clickhouse-cloud-service}
!!! info "Info"
Detailed public description for ClickHouse cloud services is not ready yet, please [contact us](/company/#contact) to learn more.
Detailed public description for ClickHouse cloud services is not ready yet, please [contact us](https://clickhouse.com/company/#contact) to learn more.

View File

@ -6,4 +6,4 @@ toc_title: Support
# ClickHouse Commercial Support Service {#clickhouse-commercial-support-service}
!!! info "Info"
Detailed public description for ClickHouse support services is not ready yet, please [contact us](/company/#contact) to learn more.
Detailed public description for ClickHouse support services is not ready yet, please [contact us](https://clickhouse.com/company/#contact) to learn more.

View File

@ -114,15 +114,25 @@ To do so, create the `/Library/LaunchDaemons/limit.maxfiles.plist` file with the
</plist>
```
Execute the following command:
Give the file correct permissions:
``` bash
sudo chown root:wheel /Library/LaunchDaemons/limit.maxfiles.plist
```
Reboot.
Validate that the file is correct:
To check if its working, you can use `ulimit -n` command.
``` bash
plutil /Library/LaunchDaemons/limit.maxfiles.plist
```
Load the file (or reboot):
``` bash
sudo launchctl load -w /Library/LaunchDaemons/limit.maxfiles.plist
```
To check if its working, use the `ulimit -n` or `launchctl limit maxfiles` commands.
## Run ClickHouse server:

View File

@ -100,8 +100,8 @@ For a description of parameters, see the [CREATE query description](../../../sql
- `min_merge_bytes_to_use_direct_io` — The minimum data volume for merge operation that is required for using direct I/O access to the storage disk. When merging data parts, ClickHouse calculates the total storage volume of all the data to be merged. If the volume exceeds `min_merge_bytes_to_use_direct_io` bytes, ClickHouse reads and writes the data to the storage disk using the direct I/O interface (`O_DIRECT` option). If `min_merge_bytes_to_use_direct_io = 0`, then direct I/O is disabled. Default value: `10 * 1024 * 1024 * 1024` bytes.
<a name="mergetree_setting-merge_with_ttl_timeout"></a>
- `merge_with_ttl_timeout` — Minimum delay in seconds before repeating a merge with delete TTL. Default value: `14400` seconds (4 hours).
- `merge_with_recompression_ttl_timeout` — Minimum delay in seconds before repeating a merge with recompression TTL. Default value: `14400` seconds (4 hours).
- `try_fetch_recompressed_part_timeout` — Timeout (in seconds) before starting merge with recompression. During this time ClickHouse tries to fetch recompressed part from replica which assigned this merge with recompression. Default value: `7200` seconds (2 hours).
- `merge_with_recompression_ttl_timeout` — Minimum delay in seconds before repeating a merge with recompression TTL. Default value: `14400` seconds (4 hours).
- `try_fetch_recompressed_part_timeout` — Timeout (in seconds) before starting merge with recompression. During this time ClickHouse tries to fetch recompressed part from replica which assigned this merge with recompression. Default value: `7200` seconds (2 hours).
- `write_final_mark` — Enables or disables writing the final index mark at the end of data part (after the last byte). Default value: 1. Dont turn it off.
- `merge_max_block_size` — Maximum number of rows in block for merge operations. Default value: 8192.
- `storage_policy` — Storage policy. See [Using Multiple Block Devices for Data Storage](#table_engine-mergetree-multiple-volumes).
@ -335,7 +335,16 @@ SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
The optional `false_positive` parameter is the probability of receiving a false positive response from the filter. Possible values: (0, 1). Default value: 0.025.
Supported data types: `Int*`, `UInt*`, `Float*`, `Enum`, `Date`, `DateTime`, `String`, `FixedString`, `Array`, `LowCardinality`, `Nullable`, `UUID`.
Supported data types: `Int*`, `UInt*`, `Float*`, `Enum`, `Date`, `DateTime`, `String`, `FixedString`, `Array`, `LowCardinality`, `Nullable`, `UUID`, `Map`.
For `Map` data type client can specify if index should be created for keys or values using [mapKeys](../../../sql-reference/functions/tuple-map-functions.md#mapkeys) or [mapValues](../../../sql-reference/functions/tuple-map-functions.md#mapvalues) function.
Example of index creation for `Map` data type
```
INDEX map_key_index mapKeys(map_column) TYPE bloom_filter GRANULARITY 1
INDEX map_key_index mapValues(map_column) TYPE bloom_filter GRANULARITY 1
```
The following functions can use it: [equals](../../../sql-reference/functions/comparison-functions.md), [notEquals](../../../sql-reference/functions/comparison-functions.md), [in](../../../sql-reference/functions/in-functions.md), [notIn](../../../sql-reference/functions/in-functions.md), [has](../../../sql-reference/functions/array-functions.md).
@ -398,7 +407,7 @@ Projections are an experimental feature. To enable them you must set the [allow_
Projections are not supported in the `SELECT` statements with the [FINAL](../../../sql-reference/statements/select/from.md#select-from-final) modifier.
### Projection Query {#projection-query}
A projection query is what defines a projection. It implicitly selects data from the parent table.
A projection query is what defines a projection. It implicitly selects data from the parent table.
**Syntax**
```sql
@ -548,7 +557,7 @@ ORDER BY d
TTL d + INTERVAL 1 MONTH DELETE WHERE toDayOfWeek(d) = 1;
```
Creating a table, where expired rows are recompressed:
Creating a table, where expired rows are recompressed:
```sql
CREATE TABLE table_for_recompression

View File

@ -7,5 +7,5 @@ toc_title: GitHub Events
Dataset contains all events on GitHub from 2011 to Dec 6 2020, the size is 3.1 billion records. Download size is 75 GB and it will require up to 200 GB space on disk if stored in a table with lz4 compression.
Full dataset description, insights, download instruction and interactive queries are posted [here](https://github-sql.github.io/explorer/).
Full dataset description, insights, download instruction and interactive queries are posted [here](https://ghe.clickhouse.tech/).

View File

@ -1553,18 +1553,20 @@ ClickHouse supports reading and writing [MessagePack](https://msgpack.org/) data
### Data Types Matching {#data-types-matching-msgpack}
| MsgPack data type | ClickHouse data type |
|---------------------------------|----------------------------------------------------------------------------------|
| `uint N`, `positive fixint` | [UIntN](../sql-reference/data-types/int-uint.md) |
| `int N` | [IntN](../sql-reference/data-types/int-uint.md) |
| `fixstr`, `str 8`, `str 16`, `str 32` | [String](../sql-reference/data-types/string.md), [FixedString](../sql-reference/data-types/fixedstring.md) |
| `float 32` | [Float32](../sql-reference/data-types/float.md) |
| `float 64` | [Float64](../sql-reference/data-types/float.md) |
| `uint 16` | [Date](../sql-reference/data-types/date.md) |
| `uint 32` | [DateTime](../sql-reference/data-types/datetime.md) |
| `uint 64` | [DateTime64](../sql-reference/data-types/datetime.md) |
| `fixarray`, `array 16`, `array 32`| [Array](../sql-reference/data-types/array.md) |
| `nil` | [Nothing](../sql-reference/data-types/special-data-types/nothing.md) |
| MessagePack data type (`INSERT`) | ClickHouse data type | MessagePack data type (`SELECT`) |
|--------------------------------------------------------------------|-----------------------------------------------------------|------------------------------------|
| `uint N`, `positive fixint` | [UIntN](../sql-reference/data-types/int-uint.md) | `uint N` |
| `int N` | [IntN](../sql-reference/data-types/int-uint.md) | `int N` |
| `bool` | [UInt8](../sql-reference/data-types/int-uint.md) | `uint 8` |
| `fixstr`, `str 8`, `str 16`, `str 32`, `bin 8`, `bin 16`, `bin 32` | [String](../sql-reference/data-types/string.md) | `bin 8`, `bin 16`, `bin 32` |
| `fixstr`, `str 8`, `str 16`, `str 32`, `bin 8`, `bin 16`, `bin 32` | [FixedString](../sql-reference/data-types/fixedstring.md) | `bin 8`, `bin 16`, `bin 32` |
| `float 32` | [Float32](../sql-reference/data-types/float.md) | `float 32` |
| `float 64` | [Float64](../sql-reference/data-types/float.md) | `float 64` |
| `uint 16` | [Date](../sql-reference/data-types/date.md) | `uint 16` |
| `uint 32` | [DateTime](../sql-reference/data-types/datetime.md) | `uint 32` |
| `uint 64` | [DateTime64](../sql-reference/data-types/datetime.md) | `uint 64` |
| `fixarray`, `array 16`, `array 32` | [Array](../sql-reference/data-types/array.md) | `fixarray`, `array 16`, `array 32` |
| `fixmap`, `map 16`, `map 32` | [Map](../sql-reference/data-types/map.md) | `fixmap`, `map 16`, `map 32` |
Example:

View File

@ -810,7 +810,7 @@ If ClickHouse should read more than `merge_tree_max_bytes_to_use_cache` bytes in
The cache of uncompressed blocks stores data extracted for queries. ClickHouse uses this cache to speed up responses to repeated small queries. This setting protects the cache from trashing by queries that read a large amount of data. The [uncompressed_cache_size](../../operations/server-configuration-parameters/settings.md#server-settings-uncompressed_cache_size) server setting defines the size of the cache of uncompressed blocks.
Possible value:
Possible values:
- Any positive integer.
@ -818,23 +818,23 @@ Default value: 2013265920.
## merge_tree_clear_old_temporary_directories_interval_seconds {#setting-merge-tree-clear-old-temporary-directories-interval-seconds}
The interval in seconds for ClickHouse to execute the cleanup old temporary directories.
Sets the interval in seconds for ClickHouse to execute the cleanup of old temporary directories.
Possible value:
Possible values:
- Any positive integer.
Default value: 60.
Default value: `60` seconds.
## merge_tree_clear_old_parts_interval_seconds {#setting-merge-tree-clear-old-parts-interval-seconds}
The interval in seconds for ClickHouse to execute the cleanup old parts, WALs, and mutations.
Sets the interval in seconds for ClickHouse to execute the cleanup of old parts, WALs, and mutations.
Possible value:
Possible values:
- Any positive integer.
Default value: 1.
Default value: `1` second.
## min_bytes_to_use_direct_io {#settings-min-bytes-to-use-direct-io}
@ -2833,6 +2833,43 @@ Possible values:
Default value: `1`.
## output_format_csv_null_representation {#output_format_csv_null_representation}
Defines the representation of `NULL` for [CSV](../../interfaces/formats.md#csv) output format. User can set any string as a value, for example, `My NULL`.
Default value: `\N`.
**Examples**
Query
```sql
SELECT * from csv_custom_null FORMAT CSV;
```
Result
```text
788
\N
\N
```
Query
```sql
SET output_format_csv_null_representation = 'My NULL';
SELECT * FROM csv_custom_null FORMAT CSV;
```
Result
```text
788
My NULL
My NULL
```
## output_format_tsv_null_representation {#output_format_tsv_null_representation}
Defines the representation of `NULL` for [TSV](../../interfaces/formats.md#tabseparated) output format. User can set any string as a value, for example, `My NULL`.
@ -3306,7 +3343,7 @@ Result:
└─────┘
```
## optimize_fuse_sum_count_avg {#optimize_fuse_sum_count_avg}
## optimize_syntax_fuse_functions {#optimize_syntax_fuse_functions}
Enables to fuse aggregate functions with identical argument. It rewrites query contains at least two aggregate functions from [sum](../../sql-reference/aggregate-functions/reference/sum.md#agg_function-sum), [count](../../sql-reference/aggregate-functions/reference/count.md#agg_function-count) or [avg](../../sql-reference/aggregate-functions/reference/avg.md#agg_function-avg) with identical argument to [sumCount](../../sql-reference/aggregate-functions/reference/sumcount.md#agg_function-sumCount).
@ -3323,7 +3360,7 @@ Query:
``` sql
CREATE TABLE fuse_tbl(a Int8, b Int8) Engine = Log;
SET optimize_fuse_sum_count_avg = 1;
SET optimize_syntax_fuse_functions = 1;
EXPLAIN SYNTAX SELECT sum(a), sum(b), count(b), avg(b) from fuse_tbl FORMAT TSV;
```
@ -3567,6 +3604,18 @@ Possible values:
Default value: `1000`.
## short_circuit_function_evaluation {#short-circuit-function-evaluation}
Allows calculating the [if](../../sql-reference/functions/conditional-functions.md#if), [multiIf](../../sql-reference/functions/conditional-functions.md#multiif), [and](../../sql-reference/functions/logical-functions.md#logical-and-function), and [or](../../sql-reference/functions/logical-functions.md#logical-or-function) functions according to a [short scheme](https://en.wikipedia.org/wiki/Short-circuit_evaluation). This helps optimize the execution of complex expressions in these functions and prevent possible exceptions (such as division by zero when it is not expected).
Possible values:
- `enable` — Enables short-circuit function evaluation for functions that are suitable for it (can throw an exception or computationally heavy).
- `force_enable` — Enables short-circuit function evaluation for all functions.
- `disable` — Disables short-circuit function evaluation.
Default value: `enable`.
## max_hyperscan_regexp_length {#max-hyperscan-regexp-length}
Defines the maximum length for each regular expression in the [hyperscan multi-match functions](../../sql-reference/functions/string-search-functions.md#multimatchanyhaystack-pattern1-pattern2-patternn).
@ -3592,7 +3641,6 @@ Result:
┌─multiMatchAny('abcd', ['ab', 'bcd', 'c', 'd'])─┐
│ 1 │
└────────────────────────────────────────────────┘
```
Query:
@ -3611,7 +3659,6 @@ Exception: Regexp length too large.
- [max_hyperscan_regexp_total_length](#max-hyperscan-regexp-total-length)
## max_hyperscan_regexp_total_length {#max-hyperscan-regexp-total-length}
Sets the maximum length total of all regular expressions in each [hyperscan multi-match function](../../sql-reference/functions/string-search-functions.md#multimatchanyhaystack-pattern1-pattern2-patternn).

View File

@ -66,6 +66,8 @@ ClickHouse-specific aggregate functions:
- [quantileDeterministic](../../../sql-reference/aggregate-functions/reference/quantiledeterministic.md)
- [quantileTDigest](../../../sql-reference/aggregate-functions/reference/quantiletdigest.md)
- [quantileTDigestWeighted](../../../sql-reference/aggregate-functions/reference/quantiletdigestweighted.md)
- [quantileBFloat16](../../../sql-reference/aggregate-functions/reference/quantilebfloat16.md#quantilebfloat16)
- [quantileBFloat16Weighted](../../../sql-reference/aggregate-functions/reference/quantilebfloat16.md#quantilebfloat16weighted)
- [simpleLinearRegression](../../../sql-reference/aggregate-functions/reference/simplelinearregression.md)
- [stochasticLinearRegression](../../../sql-reference/aggregate-functions/reference/stochasticlinearregression.md)
- [stochasticLogisticRegression](../../../sql-reference/aggregate-functions/reference/stochasticlogisticregression.md)

View File

@ -58,6 +58,10 @@ Result:
```
Note that all floating point values in the example are truncated to 1.0 when converting to `bfloat16`.
# quantileBFloat16Weighted {#quantilebfloat16weighted}
Like `quantileBFloat16` but takes into account the weight of each sequence member.
**See Also**
- [median](../../../sql-reference/aggregate-functions/reference/median.md#median)

View File

@ -43,4 +43,4 @@ Result:
**See also**
- [optimize_fuse_sum_count_avg](../../../operations/settings/settings.md#optimize_fuse_sum_count_avg) setting.
- [optimize_syntax_fuse_functions](../../../operations/settings/settings.md#optimize_syntax_fuse_functions) setting.

View File

@ -107,7 +107,7 @@ bitmapSubsetLimit(bitmap, range_start, cardinality_limit)
The subset.
Type: `Bitmap object`.
Type: [Bitmap object](#bitmap_functions-bitmapbuild).
**Example**
@ -125,9 +125,9 @@ Result:
└───────────────────────────┘
```
## subBitmap {#subBitmap}
## subBitmap {#subbitmap}
Creates a subset of bitmap limit the results to `cardinality_limit` with offset of `offset`.
Returns the bitmap elements, starting from the `offset` position. The number of returned elements is limited by the `cardinality_limit` parameter. Analog of the [substring](string-functions.md#substring)) string function, but for bitmap.
**Syntax**
@ -137,15 +137,15 @@ subBitmap(bitmap, offset, cardinality_limit)
**Arguments**
- `bitmap` [Bitmap object](#bitmap_functions-bitmapbuild).
- `offset` the number of offsets. Type: [UInt32](../../sql-reference/data-types/int-uint.md).
- `cardinality_limit` The subset cardinality upper limit. Type: [UInt32](../../sql-reference/data-types/int-uint.md).
- `bitmap` The bitmap. Type: [Bitmap object](#bitmap_functions-bitmapbuild).
- `offset` The position of the first element of the subset. Type: [UInt32](../../sql-reference/data-types/int-uint.md).
- `cardinality_limit` The maximum number of elements in the subset. Type: [UInt32](../../sql-reference/data-types/int-uint.md).
**Returned value**
The subset.
Type: `Bitmap object`.
Type: [Bitmap object](#bitmap_functions-bitmapbuild).
**Example**

View File

@ -12,11 +12,13 @@ Controls conditional branching. Unlike most systems, ClickHouse always evaluate
**Syntax**
``` sql
SELECT if(cond, then, else)
if(cond, then, else)
```
If the condition `cond` evaluates to a non-zero value, returns the result of the expression `then`, and the result of the expression `else`, if present, is skipped. If the `cond` is zero or `NULL`, then the result of the `then` expression is skipped and the result of the `else` expression, if present, is returned.
You can use the [short_circuit_function_evaluation](../../operations/settings/settings.md#short-circuit-function-evaluation) setting to calculate the `if` function according to a short scheme. If this setting is enabled, `then` expression is evaluated only on rows where `cond` is true, `else` expression where `cond` is false. For example, an exception about division by zero is not thrown when executing the query `SELECT if(number = 0, 0, intDiv(42, number)) FROM numbers(10)`, because `intDiv(42, number)` will be evaluated only for numbers that doesn't satisfy condition `number = 0`.
**Arguments**
- `cond` The condition for evaluation that can be zero or not. The type is UInt8, Nullable(UInt8) or NULL.
@ -115,9 +117,15 @@ Returns `then` if the `cond` evaluates to be true (greater than zero), otherwise
Allows you to write the [CASE](../../sql-reference/operators/index.md#operator_case) operator more compactly in the query.
Syntax: `multiIf(cond_1, then_1, cond_2, then_2, ..., else)`
**Syntax**
**Arguments:**
``` sql
multiIf(cond_1, then_1, cond_2, then_2, ..., else)
```
You can use the [short_circuit_function_evaluation](../../operations/settings/settings.md#short-circuit-function-evaluation) setting to calculate the `multiIf` function according to a short scheme. If this setting is enabled, `then_i` expression is evaluated only on rows where `((NOT cond_1) AND (NOT cond_2) AND ... AND (NOT cond_{i-1}) AND cond_i)` is true, `cond_i` will be evaluated only on rows where `((NOT cond_1) AND (NOT cond_2) AND ... AND (NOT cond_{i-1}))` is true. For example, an exception about division by zero is not thrown when executing the query `SELECT multiIf(number = 2, intDiv(1, number), number = 5) FROM numbers(10)`.
**Arguments**
- `cond_N` — The condition for the function to return `then_N`.
- `then_N` — The result of the function when executed.
@ -201,4 +209,3 @@ FROM LEFT_RIGHT
│ 4 │ ᴺᵁᴸᴸ │ Both equal │
└──────┴───────┴──────────────────┘
```

View File

@ -19,6 +19,8 @@ Calculates the result of the logical conjunction between two or more values. Cor
and(val1, val2...)
```
You can use the [short_circuit_function_evaluation](../../operations/settings/settings.md#short-circuit-function-evaluation) setting to calculate the `and` function according to a short scheme. If this setting is enabled, `vali` is evaluated only on rows where `(val1 AND val2 AND ... AND val{i-1})` is true. For example, an exception about division by zero is not thrown when executing the query `SELECT and(number = 2, intDiv(1, number)) FROM numbers(10)`.
**Arguments**
- `val1, val2, ...` — List of at least two values. [Int](../../sql-reference/data-types/int-uint.md), [UInt](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Nullable](../../sql-reference/data-types/nullable.md).
@ -68,9 +70,11 @@ Calculates the result of the logical disjunction between two or more values. Cor
**Syntax**
``` sql
and(val1, val2...)
or(val1, val2...)
```
You can use the [short_circuit_function_evaluation](../../operations/settings/settings.md#short-circuit-function-evaluation) setting to calculate the `or` function according to a short scheme. If this setting is enabled, `vali` is evaluated only on rows where `((NOT val1) AND (NOT val2) AND ... AND (NOT val{i-1}))` is true. For example, an exception about division by zero is not thrown when executing the query `SELECT or(number = 0, intDiv(1, number) != 0) FROM numbers(10)`.
**Arguments**
- `val1, val2, ...` — List of at least two values. [Int](../../sql-reference/data-types/int-uint.md), [UInt](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Nullable](../../sql-reference/data-types/nullable.md).

View File

@ -176,7 +176,7 @@ roundBankers(4.5) = 4
roundBankers(3.55, 1) = 3.6
roundBankers(3.65, 1) = 3.6
roundBankers(10.35, 1) = 10.4
roundBankers(10.755, 2) = 11,76
roundBankers(10.755, 2) = 10.76
```
**See Also**

View File

@ -11,7 +11,7 @@ ALTER TABLE [db].name [ON CLUSTER cluster] MODIFY ORDER BY new_expression
The command changes the [sorting key](../../../engines/table-engines/mergetree-family/mergetree.md) of the table to `new_expression` (an expression or a tuple of expressions). Primary key remains the same.
The command is lightweight in a sense that it only changes metadata. To keep the property that data part rows are ordered by the sorting key expression you cannot add expressions containing existing columns to the sorting key (only columns added by the `ADD COLUMN` command in the same `ALTER` query).
The command is lightweight in a sense that it only changes metadata. To keep the property that data part rows are ordered by the sorting key expression you cannot add expressions containing existing columns to the sorting key (only columns added by the `ADD COLUMN` command in the same `ALTER` query, without default column value).
!!! note "Note"
It only works for tables in the [`MergeTree`](../../../engines/table-engines/mergetree-family/mergetree.md) family (including [replicated](../../../engines/table-engines/mergetree-family/replication.md) tables).

View File

@ -104,6 +104,28 @@ There are many nuances to processing `NULL`. For example, if at least one of the
In queries, you can check `NULL` using the [IS NULL](../sql-reference/operators/index.md#operator-is-null) and [IS NOT NULL](../sql-reference/operators/index.md) operators and the related functions `isNull` and `isNotNull`.
### Heredoc {#heredeoc}
A [heredoc](https://en.wikipedia.org/wiki/Here_document) is a way to define a string (often multiline), while maintaining the original formatting. A heredoc is defined as a custom string literal, placed between two `$` symbols, for example `$heredoc$`. A value between two heredocs is processed "as-is".
You can use a heredoc to embed snippets of SQL, HTML, or XML code, etc.
**Example**
Query:
```sql
SELECT $smth$SHOW CREATE VIEW my_view$smth$;
```
Result:
```text
┌─'SHOW CREATE VIEW my_view'─┐
│ SHOW CREATE VIEW my_view │
└────────────────────────────┘
```
## Functions {#functions}
Function calls are written like an identifier with a list of arguments (possibly empty) in round brackets. In contrast to standard SQL, the brackets are required, even for an empty argument list. Example: `now()`.

View File

@ -8,4 +8,4 @@ toc_title: "\u30AF\u30E9\u30A6\u30C9"
# ClickHouse Cloud Service {#clickhouse-cloud-service}
!!! info "Info"
Detailed public description for ClickHouse cloud services is not ready yet, please [contact us](/company/#contact) to learn more.
Detailed public description for ClickHouse cloud services is not ready yet, please [contact us](https://clickhouse.com/company/#contact) to learn more.

View File

@ -1 +0,0 @@
../../en/development/cmake-in-clickhouse.md

View File

@ -6,4 +6,4 @@ toc_title: "Поставщики облачных услуг ClickHouse"
# Поставщики облачных услуг ClickHouse {#clickhouse-cloud-service-providers}
!!! info "Info"
Detailed public description for ClickHouse cloud services is not ready yet, please [contact us](/company/#contact) to learn more.
Detailed public description for ClickHouse cloud services is not ready yet, please [contact us](https://clickhouse.com/company/#contact) to learn more.

View File

@ -1 +0,0 @@
../../en/development/cmake-in-clickhouse.md

View File

@ -94,7 +94,7 @@ ClickHouse Keeper может использоваться как равноце
## Как запустить
ClickHouse Keeper входит в пакет` clickhouse-server`, просто добавьте кофигурацию `<keeper_server>` и запустите сервер ClickHouse как обычно. Если вы хотите запустить ClickHouse Keeper автономно, сделайте это аналогичным способом:
ClickHouse Keeper входит в пакет `clickhouse-server`, просто добавьте кофигурацию `<keeper_server>` и запустите сервер ClickHouse как обычно. Если вы хотите запустить ClickHouse Keeper автономно, сделайте это аналогичным способом:
```bash
clickhouse-keeper --config /etc/your_path_to_config/config.xml --daemon
@ -116,4 +116,4 @@ clickhouse-keeper-converter --zookeeper-logs-dir /var/lib/zookeeper/version-2 --
4. Скопируйте снэпшот на узлы сервера ClickHouse с настроенным `keeper` или запустите ClickHouse Keeper вместо ZooKeeper. Снэпшот должен сохраняться на всех узлах: в противном случае пустые узлы могут захватить лидерство и сконвертированные данные могут быть отброшены на старте.
[Original article](https://clickhouse.com/docs/en/operations/clickhouse-keeper/) <!--hide-->
[Original article](https://clickhouse.com/docs/en/operations/clickhouse-keeper/) <!--hide-->

View File

@ -801,12 +801,32 @@ ClickHouse может парсить только базовый формат `Y
Кэш несжатых блоков хранит данные, извлечённые при выполнении запросов. ClickHouse использует кэш для ускорения ответов на повторяющиеся небольшие запросы. Настройка защищает кэш от переполнения. Настройка сервера [uncompressed_cache_size](../server-configuration-parameters/settings.md#server-settings-uncompressed_cache_size) определяет размер кэша несжатых блоков.
Возможное значение:
Возможные значения:
- Положительное целое число.
Значение по умолчанию: 2013265920.
## merge_tree_clear_old_temporary_directories_interval_seconds {#setting-merge-tree-clear-old-temporary-directories-interval-seconds}
Задает интервал в секундах для удаления старых временных каталогов на сервере ClickHouse.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: `60` секунд.
## merge_tree_clear_old_parts_interval_seconds {#setting-merge-tree-clear-old-parts-interval-seconds}
Задает интервал в секундах для удаления старых кусков данных, журналов предзаписи (WAL) и мутаций на сервере ClickHouse .
Возможные значения:
- Положительное целое число.
Значение по умолчанию: `1` секунда.
## min_bytes_to_use_direct_io {#settings-min-bytes-to-use-direct-io}
Минимальный объём данных, необходимый для прямого (небуферизованного) чтения/записи (direct I/O) на диск.
@ -3122,7 +3142,7 @@ SELECT * FROM test LIMIT 10 OFFSET 100;
Значение по умолчанию: `1800`.
## optimize_fuse_sum_count_avg {#optimize_fuse_sum_count_avg}
## optimize_syntax_fuse_functions {#optimize_syntax_fuse_functions}
Позволяет объединить агрегатные функции с одинаковым аргументом. Запрос, содержащий по крайней мере две агрегатные функции: [sum](../../sql-reference/aggregate-functions/reference/sum.md#agg_function-sum), [count](../../sql-reference/aggregate-functions/reference/count.md#agg_function-count) или [avg](../../sql-reference/aggregate-functions/reference/avg.md#agg_function-avg) с одинаковым аргументом, перезаписывается как [sumCount](../../sql-reference/aggregate-functions/reference/sumcount.md#agg_function-sumCount).
@ -3139,7 +3159,7 @@ SELECT * FROM test LIMIT 10 OFFSET 100;
``` sql
CREATE TABLE fuse_tbl(a Int8, b Int8) Engine = Log;
SET optimize_fuse_sum_count_avg = 1;
SET optimize_syntax_fuse_functions = 1;
EXPLAIN SYNTAX SELECT sum(a), sum(b), count(b), avg(b) from fuse_tbl FORMAT TSV;
```
@ -3333,7 +3353,7 @@ SETTINGS index_granularity = 8192 │
## force_optimize_projection {#force-optimize-projection}
Включает или отключает обязательное использование [проекций](../../engines/table-engines/mergetree-family/mergetree.md#projections) в запросах `SELECT`, если поддержка проекций включена (см. настройку [allow_experimental_projection_optimization](#allow-experimental-projection-optimization)).
Включает или отключает обязательное использование [проекций](../../engines/table-engines/mergetree-family/mergetree.md#projections) в запросах `SELECT`, если поддержка проекций включена (см. настройку [allow_experimental_projection_optimization](#allow-experimental-projection-optimization)).
Возможные значения:
@ -3376,6 +3396,18 @@ SETTINGS index_granularity = 8192 │
Значение по умолчанию: `1000`.
## short_circuit_function_evaluation {#short-circuit-function-evaluation}
Позволяет вычислять функции [if](../../sql-reference/functions/conditional-functions.md#if), [multiIf](../../sql-reference/functions/conditional-functions.md#multiif), [and](../../sql-reference/functions/logical-functions.md#logical-and-function) и [or](../../sql-reference/functions/logical-functions.md#logical-or-function) по [короткой схеме](https://ru-wikipedia-org.turbopages.org/ru.wikipedia.org/s/wiki/Вычисления_поороткой_схеме). Это помогает оптимизировать выполнение сложных выражений в этих функциях и предотвратить возможные исключения (например, деление на ноль, когда оно не ожидается).
Возможные значения:
- `enable` — по короткой схеме вычисляются функции, которые подходят для этого (могут сгенерировать исключение или требуют сложных вычислений).
- `force_enable` — все функции вычисляются по короткой схеме.
- `disable` — вычисление функций по короткой схеме отключено.
Значение по умолчанию: `enable`.
## max_hyperscan_regexp_length {#max-hyperscan-regexp-length}
Задает максимальную длину каждого регулярного выражения в [hyperscan-функциях](../../sql-reference/functions/string-search-functions.md#multimatchanyhaystack-pattern1-pattern2-patternn) поиска множественных совпадений в строке.

View File

@ -61,6 +61,8 @@ toc_hidden: true
- [quantileDeterministic](../../../sql-reference/aggregate-functions/reference/quantiledeterministic.md)
- [quantileTDigest](../../../sql-reference/aggregate-functions/reference/quantiletdigest.md)
- [quantileTDigestWeighted](../../../sql-reference/aggregate-functions/reference/quantiletdigestweighted.md)
- [quantileBFloat16](../../../sql-reference/aggregate-functions/reference/quantilebfloat16.md#quantilebfloat16)
- [quantileBFloat16Weighted](../../../sql-reference/aggregate-functions/reference/quantilebfloat16.md#quantilebfloat16weighted)
- [simpleLinearRegression](../../../sql-reference/aggregate-functions/reference/simplelinearregression.md)
- [stochasticLinearRegression](../../../sql-reference/aggregate-functions/reference/stochasticlinearregression.md)
- [stochasticLogisticRegression](../../../sql-reference/aggregate-functions/reference/stochasticlogisticregression.md)

View File

@ -58,6 +58,10 @@ SELECT quantileBFloat16(0.75)(a), quantileBFloat16(0.75)(b) FROM example_table;
```
Обратите внимание, что все числа с плавающей точкой в примере были округлены до 1.0 при преобразовании к `bfloat16`.
# quantileBFloat16Weighted {#quantilebfloat16weighted}
Версия функции `quantileBFloat16`, которая учитывает вес каждого элемента последовательности.
**См. также**
- [median](../../../sql-reference/aggregate-functions/reference/median.md#median)

View File

@ -43,4 +43,4 @@ SELECT sumCount(x) from s_table;
**Смотрите также**
- Настройка [optimize_fuse_sum_count_avg](../../../operations/settings/settings.md#optimize_fuse_sum_count_avg)
- Настройка [optimize_syntax_fuse_functions](../../../operations/settings/settings.md#optimize_syntax_fuse_functions)

View File

@ -66,15 +66,14 @@ bitmapSubsetLimit(bitmap, range_start, cardinality_limit)
**Аргументы**
- `bitmap` битмап. [Bitmap object](#bitmap_functions-bitmapbuild).
- `range_start` начальная точка подмножества. [UInt32](../../sql-reference/functions/bitmap-functions.md#bitmap-functions).
- `cardinality_limit` Верхний предел подмножества. [UInt32](../../sql-reference/functions/bitmap-functions.md#bitmap-functions).
- `cardinality_limit` верхний предел подмножества. [UInt32](../../sql-reference/functions/bitmap-functions.md#bitmap-functions).
**Возвращаемое значение**
Подмножество битмапа.
Тип: `Bitmap object`.
Тип: [Bitmap object](#bitmap_functions-bitmapbuild).
**Пример**
@ -92,6 +91,44 @@ SELECT bitmapToArray(bitmapSubsetLimit(bitmapBuild([0,1,2,3,4,5,6,7,8,9,10,11,12
└───────────────────────────┘
```
## subBitmap {#subbitmap}
Возвращает элементы битмапа, начиная с позиции `offset`. Число возвращаемых элементов ограничивается параметром `cardinality_limit`. Аналог строковой функции [substring](string-functions.md#substring)), но для битмапа.
**Синтаксис**
``` sql
subBitmap(bitmap, offset, cardinality_limit)
```
**Аргументы**
- `bitmap` битмап. Тип: [Bitmap object](#bitmap_functions-bitmapbuild).
- `offset` позиция первого элемента возвращаемого подмножества. Тип: [UInt32](../../sql-reference/data-types/int-uint.md).
- `cardinality_limit` максимальное число элементов возвращаемого подмножества. Тип: [UInt32](../../sql-reference/data-types/int-uint.md).
**Возвращаемое значение**
Подмножество битмапа.
Тип: [Bitmap object](#bitmap_functions-bitmapbuild).
**Пример**
Запрос:
``` sql
SELECT bitmapToArray(subBitmap(bitmapBuild([0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,100,200,500]), toUInt32(10), toUInt32(10))) AS res;
```
Результат:
``` text
┌─res─────────────────────────────┐
│ [10,11,12,13,14,15,16,17,18,19] │
└─────────────────────────────────┘
```
## bitmapContains {#bitmap_functions-bitmapcontains}
Проверяет вхождение элемента в битовый массив.

View File

@ -12,11 +12,13 @@ toc_title: "Условные функции"
**Синтаксис**
``` sql
SELECT if(cond, then, else)
if(cond, then, else)
```
Если условие `cond` не равно нулю, то возвращается результат выражения `then`. Если условие `cond` равно нулю или является NULL, то результат выражения `then` пропускается и возвращается результат выражения `else`.
Чтобы вычислять функцию `if` по короткой схеме, используйте настройку [short_circuit_function_evaluation](../../operations/settings/settings.md#short-circuit-function-evaluation). Если настройка включена, то выражение `then` вычисляется только для строк, где условие `cond` верно, а выражение `else` для строк, где условие `cond` неверно. Например, при выполнении запроса `SELECT if(number = 0, 0, intDiv(42, number)) FROM numbers(10)` не будет сгенерировано исключение из-за деления на ноль, так как `intDiv(42, number)` будет вычислено только для чисел, которые не удовлетворяют условию `number = 0`.
**Аргументы**
- `cond` проверяемое условие. Может быть [UInt8](../../sql-reference/functions/conditional-functions.md) или `NULL`.
@ -77,7 +79,13 @@ SELECT if(0, plus(2, 2), plus(2, 6));
Позволяет более компактно записать оператор [CASE](../operators/index.md#operator_case) в запросе.
multiIf(cond_1, then_1, cond_2, then_2...else)
**Синтаксис**
``` sql
multiIf(cond_1, then_1, cond_2, then_2, ..., else)
```
Чтобы вычислять функцию `multiIf` по короткой схеме, используйте настройку [short_circuit_function_evaluation](../../operations/settings/settings.md#short-circuit-function-evaluation). Если настройка включена, то выражение `then_i` вычисляется только для строк, где условие `((NOT cond_1) AND (NOT cond_2) AND ... AND (NOT cond_{i-1}) AND cond_i)` верно, `cond_i` вычисляется только для строк, где условие `((NOT cond_1) AND (NOT cond_2) AND ... AND (NOT cond_{i-1}))` верно. Например, при выполнении запроса `SELECT multiIf(number = 2, intDiv(1, number), number = 5) FROM numbers(10)` не будет сгенерировано исключение из-за деления на ноль.
**Аргументы**
@ -110,4 +118,3 @@ SELECT if(0, plus(2, 2), plus(2, 6));
│ ᴺᵁᴸᴸ │
└────────────────────────────────────────────┘
```

View File

@ -19,6 +19,8 @@ toc_title: "Логические функции"
and(val1, val2...)
```
Чтобы вычислять функцию `and` по короткой схеме, используйте настройку [short_circuit_function_evaluation](../../operations/settings/settings.md#short-circuit-function-evaluation). Если настройка включена, то выражение `vali` вычисляется только для строк, где условие `(val1 AND val2 AND ... AND val{i-1})` верно. Например, при выполнении запроса `SELECT and(number = 2, intDiv(1, number)) FROM numbers(10)` не будет сгенерировано исключение из-за деления на ноль.
**Аргументы**
- `val1, val2, ...` — список из как минимум двух значений. [Int](../../sql-reference/data-types/int-uint.md), [UInt](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) или [Nullable](../../sql-reference/data-types/nullable.md).
@ -71,6 +73,8 @@ SELECT and(NULL, 1, 10, -2);
and(val1, val2...)
```
Чтобы вычислять функцию `or` по короткой схеме, используйте настройку [short_circuit_function_evaluation](../../operations/settings/settings.md#short-circuit-function-evaluation). Если настройка включена, то выражение `vali` вычисляется только для строк, где условие `((NOT val1) AND (NOT val2) AND ... AND (NOT val{i-1}))` верно. Например, при выполнении запроса `SELECT or(number = 0, intDiv(1, number) != 0) FROM numbers(10)` не будет сгенерировано исключение из-за деления на ноль.
**Аргументы**
- `val1, val2, ...` — список из как минимум двух значений. [Int](../../sql-reference/data-types/int-uint.md), [UInt](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) или [Nullable](../../sql-reference/data-types/nullable.md).

View File

@ -3,7 +3,7 @@ toc_priority: 31
toc_title: "Синтаксис"
---
# Синтаксис {#sintaksis}
# Синтаксис {#syntax}
В системе есть два вида парсеров: полноценный парсер SQL (recursive descent parser) и парсер форматов данных (быстрый потоковый парсер).
Во всех случаях кроме запроса INSERT, используется только полноценный парсер SQL.
@ -21,11 +21,11 @@ INSERT INTO t VALUES (1, 'Hello, world'), (2, 'abc'), (3, 'def')
Далее пойдёт речь о полноценном парсере. О парсерах форматов, смотри раздел «Форматы».
## Пробелы {#probely}
## Пробелы {#spaces}
Между синтаксическими конструкциями (в том числе, в начале и конце запроса) может быть расположено произвольное количество пробельных символов. К пробельным символам относятся пробел, таб, перевод строки, CR, form feed.
## Комментарии {#kommentarii}
## Комментарии {#comments}
Поддерживаются комментарии в SQL-стиле и C-стиле.
Комментарии в SQL-стиле: от `--` до конца строки. Пробел после `--` может не ставиться.
@ -63,7 +63,7 @@ INSERT INTO t VALUES (1, 'Hello, world'), (2, 'abc'), (3, 'def')
Существуют: числовые, строковые, составные литералы и `NULL`.
### Числовые {#chislovye}
### Числовые {#numeric}
Числовой литерал пытается распарситься:
@ -83,7 +83,7 @@ INSERT INTO t VALUES (1, 'Hello, world'), (2, 'abc'), (3, 'def')
Минимальный набор символов, которых вам необходимо экранировать в строковых литералах: `'` и `\`. Одинарная кавычка может быть экранирована одинарной кавычкой, литералы `'It\'s'` и `'It''s'` эквивалентны.
### Составные {#sostavnye}
### Составные {#compound}
Поддерживаются конструкции для массивов: `[1, 2, 3]` и кортежей: `(1, 'Hello, world!', 2)`.
На самом деле, это вовсе не литералы, а выражение с оператором создания массива и оператором создания кортежа, соответственно.
@ -102,17 +102,39 @@ INSERT INTO t VALUES (1, 'Hello, world'), (2, 'abc'), (3, 'def')
В запросах можно проверить `NULL` с помощью операторов [IS NULL](operators/index.md#operator-is-null) и [IS NOT NULL](operators/index.md), а также соответствующих функций `isNull` и `isNotNull`.
## Функции {#funktsii}
### Heredoc {#heredeoc}
Синтаксис [heredoc](https://ru.wikipedia.org/wiki/Heredoc-синтаксис) — это способ определения строк с сохранением исходного формата (часто с переносом строки). `Heredoc` задается как произвольный строковый литерал между двумя символами `$`, например `$heredoc$`. Значение между двумя `heredoc` обрабатывается "как есть".
Синтаксис `heredoc` часто используют для вставки кусков кода SQL, HTML, XML и т.п.
**Пример**
Запрос:
```sql
SELECT $smth$SHOW CREATE VIEW my_view$smth$;
```
Результат:
```text
┌─'SHOW CREATE VIEW my_view'─┐
│ SHOW CREATE VIEW my_view │
└────────────────────────────┘
```
## Функции {#functions}
Функции записываются как идентификатор со списком аргументов (возможно, пустым) в скобках. В отличие от стандартного SQL, даже в случае пустого списка аргументов, скобки обязательны. Пример: `now()`.
Бывают обычные и агрегатные функции (смотрите раздел «Агрегатные функции»). Некоторые агрегатные функции могут содержать два списка аргументов в круглых скобках. Пример: `quantile(0.9)(x)`. Такие агрегатные функции называются «параметрическими», а первый список аргументов называется «параметрами». Синтаксис агрегатных функций без параметров ничем не отличается от обычных функций.
## Операторы {#operatory}
## Операторы {#operators}
Операторы преобразуются в соответствующие им функции во время парсинга запроса, с учётом их приоритета и ассоциативности.
Например, выражение `1 + 2 * 3 + 4` преобразуется в `plus(plus(1, multiply(2, 3)), 4)`.
## Типы данных и движки таблиц {#tipy-dannykh-i-dvizhki-tablits}
## Типы данных и движки таблиц {#data_types-and-database-table-engines}
Типы данных и движки таблиц в запросе `CREATE` записываются также, как идентификаторы или также как функции. То есть, могут содержать или не содержать список аргументов в круглых скобках. Подробнее смотрите разделы «Типы данных», «Движки таблиц», «CREATE».

View File

@ -5,4 +5,4 @@ toc_priority: 82
# Что нового в ClickHouse?
Планы развития вкратце изложены [здесь](extended-roadmap.md), а новости по предыдущим релизам подробно описаны в [журнале изменений](changelog/index.md).
Планы развития вкратце изложены [здесь](https://github.com/ClickHouse/ClickHouse/issues/17623), а новости по предыдущим релизам подробно описаны в [журнале изменений](changelog/index.md).

View File

@ -51,7 +51,7 @@ def build_for_lang(lang, args):
if args.htmlproofer:
plugins.append('htmlproofer')
website_url = 'https://clickhouse.tech'
website_url = 'https://clickhouse.com'
site_name = site_names.get(lang, site_names['en'])
blog_nav, post_meta = nav.build_blog_nav(lang, args)
raw_config = dict(
@ -62,7 +62,7 @@ def build_for_lang(lang, args):
strict=True,
theme=theme_cfg,
nav=blog_nav,
copyright='©20162021 Yandex LLC',
copyright='©20162021 ClickHouse, Inc.',
use_directory_urls=True,
repo_name='ClickHouse/ClickHouse',
repo_url='https://github.com/ClickHouse/ClickHouse/',

View File

@ -203,6 +203,7 @@ if __name__ == '__main__':
arg_parser.add_argument('--verbose', action='store_true')
args = arg_parser.parse_args()
args.minify = False # TODO remove
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,

View File

@ -155,6 +155,12 @@ def generate_cmake_flags_files() -> None:
with open(footer_file_name, "r") as footer:
f.write(footer.read())
other_languages = ["docs/ja/development/cmake-in-clickhouse.md",
"docs/zh/development/cmake-in-clickhouse.md",
"docs/ru/development/cmake-in-clickhouse.md"]
for lang in other_languages:
os.symlink(output_file_name, os.path.join(root_path, lang))
if __name__ == '__main__':
generate_cmake_flags_files()

View File

@ -8,6 +8,9 @@ import subprocess
def test_single_page(input_path, lang):
if not (lang == 'en' or lang == 'ru'):
return
with open(input_path) as f:
soup = bs4.BeautifulSoup(
f,
@ -33,11 +36,8 @@ def test_single_page(input_path, lang):
logging.info('Link to nowhere: %s' % href)
if links_to_nowhere:
if lang == 'en' or lang == 'ru':
logging.error(f'Found {links_to_nowhere} links to nowhere in {lang}')
# TODO: restore sys.exit(1) here
else:
logging.warning(f'Found {links_to_nowhere} links to nowhere in {lang}')
logging.error(f'Found {links_to_nowhere} links to nowhere in {lang}')
sys.exit(1)
if len(anchor_points) <= 10:
logging.error('Html parsing is probably broken')

View File

@ -215,10 +215,12 @@ def minify_file(path, css_digest, js_digest):
content = minify_html(content)
content = content.replace('base.css?css_digest', f'base.css?{css_digest}')
content = content.replace('base.js?js_digest', f'base.js?{js_digest}')
elif path.endswith('.css'):
content = cssmin.cssmin(content)
elif path.endswith('.js'):
content = jsmin.jsmin(content)
# TODO: restore cssmin
# elif path.endswith('.css'):
# content = cssmin.cssmin(content)
# TODO: restore jsmin
# elif path.endswith('.js'):
# content = jsmin.jsmin(content)
with open(path, 'wb') as f:
f.write(content.encode('utf-8'))
@ -240,7 +242,7 @@ def minify_website(args):
js_in = get_js_in(args)
js_out = f'{args.output_dir}/js/base.js'
if args.minify:
if args.minify and False: # TODO: return closure
js_in = [js[1:-1] for js in js_in]
closure_args = [
'--js', *js_in, '--js_output_file', js_out,

View File

@ -8,4 +8,4 @@ toc_title: 云
# ClickHouse Cloud Service {#clickhouse-cloud-service}
!!! info "Info"
Detailed public description for ClickHouse cloud services is not ready yet, please [contact us](/company/#contact) to learn more.
Detailed public description for ClickHouse cloud services is not ready yet, please [contact us](https://clickhouse.com/company/#contact) to learn more.

View File

@ -6,4 +6,4 @@ toc_title: 支持
# ClickHouse 商业支持服务提供商 {#clickhouse-commercial-support-service-providers}
!!! info "Info"
Detailed public description for ClickHouse support services is not ready yet, please [contact us](/company/#contact) to learn more.
Detailed public description for ClickHouse support services is not ready yet, please [contact us](https://clickhouse.com/company/#contact) to learn more.

View File

@ -1 +0,0 @@
../../en/development/cmake-in-clickhouse.md

View File

@ -1964,7 +1964,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
}
catch (...)
{
LOG_WARNING(log, "Seemns like node with address {} is unreachable.", node.host_name);
LOG_WARNING(log, "Node with address {} seems to be unreachable.", node.host_name);
continue;
}

View File

@ -306,6 +306,7 @@ try
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
loadMetadata(global_context);
startupSystemTables();
DatabaseCatalog::instance().loadDatabases();
LOG_DEBUG(log, "Loaded metadata.");
}

View File

@ -918,7 +918,7 @@ if (ThreadFuzzer::instance().isEffective())
global_context,
settings.async_insert_threads,
settings.async_insert_max_data_size,
AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout, .stale = settings.async_insert_stale_timeout}));
AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout_ms, .stale = settings.async_insert_stale_timeout_ms}));
/// Size of cache for marks (index of MergeTree family of tables). It is mandatory.
size_t mark_cache_size = config().getUInt64("mark_cache_size");
@ -1116,6 +1116,7 @@ if (ThreadFuzzer::instance().isEffective())
database_catalog.loadMarkedAsDroppedTables();
/// Then, load remaining databases
loadMetadata(global_context, default_database);
startupSystemTables();
database_catalog.loadDatabases();
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);

View File

@ -0,0 +1,98 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <Common/ExponentiallySmoothedCounter.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
/** See the comments in ExponentiallySmoothedCounter.h
*/
class AggregateFunctionExponentialMovingAverage final
: public IAggregateFunctionDataHelper<ExponentiallySmoothedAverage, AggregateFunctionExponentialMovingAverage>
{
private:
String name;
Float64 half_decay;
public:
AggregateFunctionExponentialMovingAverage(const DataTypes & argument_types_, const Array & params)
: IAggregateFunctionDataHelper<ExponentiallySmoothedAverage, AggregateFunctionExponentialMovingAverage>(argument_types_, params)
{
if (params.size() != 1)
throw Exception{"Aggregate function " + getName() + " requires exactly one parameter: half decay time.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH};
half_decay = applyVisitor(FieldVisitorConvertToNumber<Float64>(), params[0]);
}
String getName() const override
{
return "exponentialMovingAverage";
}
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeNumber<Float64>>();
}
bool allocatesMemoryInArena() const override { return false; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const auto & value = columns[0]->getFloat64(row_num);
const auto & time = columns[1]->getFloat64(row_num);
this->data(place).add(value, time, half_decay);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs), half_decay);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf) const override
{
writeBinary(this->data(place).value, buf);
writeBinary(this->data(place).time, buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, Arena *) const override
{
readBinary(this->data(place).value, buf);
readBinary(this->data(place).time, buf);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
auto & column = assert_cast<ColumnVector<Float64> &>(to);
column.getData().push_back(this->data(place).get(half_decay));
}
};
void registerAggregateFunctionExponentialMovingAverage(AggregateFunctionFactory & factory)
{
factory.registerFunction("exponentialMovingAverage",
[](const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *) -> AggregateFunctionPtr
{
assertBinary(name, argument_types);
for (const auto & type : argument_types)
if (!isNumber(*type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Both arguments for aggregate function {} must have numeric type, got {}", name, type->getName());
return std::make_shared<AggregateFunctionExponentialMovingAverage>(argument_types, params);
});
}
}

View File

@ -102,7 +102,7 @@ namespace
// This range is hardcoded below
if (precision_param > 20 || precision_param < 12)
throw Exception(
"Parameter for aggregate function " + name + " is out or range: [12, 20].", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
"Parameter for aggregate function " + name + " is out of range: [12, 20].", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
precision = precision_param;
}

View File

@ -50,7 +50,9 @@ void registerAggregateFunctionWelchTTest(AggregateFunctionFactory &);
void registerAggregateFunctionStudentTTest(AggregateFunctionFactory &);
void registerAggregateFunctionSingleValueOrNull(AggregateFunctionFactory &);
void registerAggregateFunctionSequenceNextNode(AggregateFunctionFactory &);
void registerAggregateFunctionExponentialMovingAverage(AggregateFunctionFactory &);
void registerAggregateFunctionSparkbar(AggregateFunctionFactory &);
void registerAggregateFunctionIntervalLengthSum(AggregateFunctionFactory &);
class AggregateFunctionCombinatorFactory;
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
@ -66,8 +68,6 @@ void registerAggregateFunctionCombinatorDistinct(AggregateFunctionCombinatorFact
void registerWindowFunctions(AggregateFunctionFactory & factory);
void registerAggregateFunctionIntervalLengthSum(AggregateFunctionFactory &);
void registerAggregateFunctions()
{
{
@ -116,11 +116,11 @@ void registerAggregateFunctions()
registerAggregateFunctionWelchTTest(factory);
registerAggregateFunctionStudentTTest(factory);
registerAggregateFunctionSingleValueOrNull(factory);
registerAggregateFunctionIntervalLengthSum(factory);
registerAggregateFunctionExponentialMovingAverage(factory);
registerAggregateFunctionSparkbar(factory);
registerWindowFunctions(factory);
registerAggregateFunctionIntervalLengthSum(factory);
registerAggregateFunctionSparkbar(factory);
}
{

View File

@ -160,26 +160,29 @@ namespace
if (args.size() <= db_name_index)
return;
String db_name = evaluateConstantExpressionForDatabaseName(args[db_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
String name = evaluateConstantExpressionForDatabaseName(args[db_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
String table_name;
size_t table_name_index = static_cast<size_t>(-1);
size_t dot = String::npos;
if (function.name != "Distributed")
dot = db_name.find('.');
if (dot != String::npos)
{
table_name = db_name.substr(dot + 1);
db_name.resize(dot);
}
QualifiedTableName qualified_name;
if (function.name == "Distributed")
qualified_name.table = name;
else
qualified_name = QualifiedTableName::parseFromString(name);
if (qualified_name.database.empty())
{
std::swap(qualified_name.database, qualified_name.table);
table_name_index = 2;
if (args.size() <= table_name_index)
return;
table_name = evaluateConstantExpressionForDatabaseName(args[table_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
qualified_name.table = evaluateConstantExpressionForDatabaseName(args[table_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
}
const String & db_name = qualified_name.database;
const String & table_name = qualified_name.table;
if (db_name.empty() || table_name.empty())
return;

View File

@ -130,10 +130,16 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
}
catch (Poco::TimeoutException & e)
{
/// disconnect() will reset the socket, get timeouts before.
const std::string & message = fmt::format("{} ({}, receive timeout {} ms, send timeout {} ms)",
e.displayText(), getDescription(),
socket->getReceiveTimeout().totalMilliseconds(),
socket->getSendTimeout().totalMilliseconds());
disconnect();
/// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost.
throw NetException(e.displayText() + " (" + getDescription() + ")", ErrorCodes::SOCKET_TIMEOUT);
throw NetException(message, ErrorCodes::SOCKET_TIMEOUT);
}
}
@ -413,7 +419,12 @@ void Connection::sendQuery(
if (!connected)
connect(timeouts);
TimeoutSetter timeout_setter(*socket, timeouts.send_timeout, timeouts.receive_timeout, true);
/// Query is not executed within sendQuery() function.
///
/// And what this means that temporary timeout (via TimeoutSetter) is not
/// enough, since next query can use timeout from the previous query in this case.
socket->setReceiveTimeout(timeouts.receive_timeout);
socket->setSendTimeout(timeouts.send_timeout);
if (settings)
{

View File

@ -156,7 +156,7 @@ Poco::AutoPtr<Poco::XML::Document> YAMLParser::parse(const String& path)
throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "Unable to open YAML configuration file {}", path);
}
Poco::AutoPtr<Poco::XML::Document> xml = new Document;
Poco::AutoPtr<Poco::XML::Element> root_node = xml->createElement("yandex");
Poco::AutoPtr<Poco::XML::Element> root_node = xml->createElement("clickhouse");
xml->appendChild(root_node);
try
{

View File

@ -534,6 +534,13 @@ ExecutionStatus ExecutionStatus::fromCurrentException(const std::string & start_
return ExecutionStatus(getCurrentExceptionCode(), msg);
}
ExecutionStatus ExecutionStatus::fromText(const std::string & data)
{
ExecutionStatus status;
status.deserializeText(data);
return status;
}
ParsingException::ParsingException() = default;
ParsingException::ParsingException(const std::string & msg, int code)
: Exception(msg, code)

View File

@ -184,6 +184,8 @@ struct ExecutionStatus
static ExecutionStatus fromCurrentException(const std::string & start_of_message = "");
static ExecutionStatus fromText(const std::string & data);
std::string serializeText() const;
void deserializeText(const std::string & data);

View File

@ -0,0 +1,114 @@
#pragma once
#include <cmath>
#include <limits>
namespace DB
{
/** https://en.wikipedia.org/wiki/Exponential_smoothing
*
* Exponentially smoothed average over time is weighted average with weight proportional to negative exponent of the time passed.
* For example, the last value is taken with weight 1/2, the value one second ago with weight 1/4, two seconds ago - 1/8, etc.
* It can be understood as an average over sliding window, but with different kernel.
*
* As an advantage, it is easy to update. Instead of collecting values and calculating a series of x1 / 2 + x2 / 4 + x3 / 8...
* just calculate x_old / 2 + x_new / 2.
*
* It is often used for resource usage metrics. For example, "load average" in Linux is exponentially smoothed moving average.
* We can use exponentially smoothed counters in query scheduler.
*/
struct ExponentiallySmoothedAverage
{
/// The sum. It contains the last value and all previous values scaled accordingly to the difference of their time to the reference time.
/// Older values are summed with exponentially smaller coefficients.
/// To obtain the average, you have to divide this value to the sum of all coefficients (see 'sumWeights').
double value = 0;
/// The point of reference. You can translate the value to a different point of reference (see 'remap').
/// You can imagine that the value exponentially decays over time.
/// But it is also meaningful to treat the whole counters as constants over time but in another non-linear coordinate system,
/// that inflates over time, while the counter itself does not change
/// (it continues to be the same physical quantity, but only changes its representation in the "usual" coordinate system).
/// Recap: the whole counter is one dimensional and it can be represented as a curve formed by two dependent coordinates in 2d plane,
/// the space can be represented by (value, time) coordinates, and the curves will be exponentially decaying over time,
/// alternatively the space can be represented by (exponentially_adjusted_value, time) and then the curves will be constant over time.
/// Also useful analogy is the exponential representation of a number: x = a * exp(b) = a * e (where e = exp(b))
/// a number x is represented by a curve in 2d plane that can be parametrized by coordinates (a, b) or (a, e).
double time = 0;
ExponentiallySmoothedAverage()
{
}
ExponentiallySmoothedAverage(double current_value, double current_time)
: value(current_value), time(current_time)
{
}
/// How much value decays after time_passed.
static double scale(double time_passed, double half_decay_time)
{
return exp2(-time_passed / half_decay_time);
}
/// Sum of weights of all values. Divide by it to get the average.
static double sumWeights(double half_decay_time)
{
double k = scale(1.0, half_decay_time);
return 1 / (1 - k);
}
/// Obtain the same counter in another point of reference.
ExponentiallySmoothedAverage remap(double current_time, double half_decay_time) const
{
return ExponentiallySmoothedAverage(value * scale(current_time - time, half_decay_time), current_time);
}
/// Merge two counters. It is done by moving to the same point of reference and summing the values.
static ExponentiallySmoothedAverage merge(const ExponentiallySmoothedAverage & a, const ExponentiallySmoothedAverage & b, double half_decay_time)
{
if (a.time > b.time)
return ExponentiallySmoothedAverage(a.value + b.remap(a.time, half_decay_time).value, a.time);
if (a.time < b.time)
return ExponentiallySmoothedAverage(b.value + a.remap(b.time, half_decay_time).value, b.time);
return ExponentiallySmoothedAverage(a.value + b.value, a.time);
}
void merge(const ExponentiallySmoothedAverage & other, double half_decay_time)
{
*this = merge(*this, other, half_decay_time);
}
void add(double new_value, double current_time, double half_decay_time)
{
merge(ExponentiallySmoothedAverage(new_value, current_time), half_decay_time);
}
/// Calculate the average from the sum.
double get(double half_decay_time) const
{
return value / sumWeights(half_decay_time);
}
double get(double current_time, double half_decay_time) const
{
return remap(current_time, half_decay_time).get(half_decay_time);
}
/// Compare two counters (by moving to the same point of reference and comparing sums).
/// You can store the counters in container and sort it without changing the stored values over time.
bool less(const ExponentiallySmoothedAverage & other, double half_decay_time) const
{
return remap(other.time, half_decay_time).value < other.value;
}
};
}

View File

@ -111,7 +111,7 @@ void FileChecker::save() const
std::unique_ptr<WriteBuffer> out = disk->writeFile(tmp_files_info_path);
/// So complex JSON structure - for compatibility with the old format.
writeCString("{\"yandex\":{", *out);
writeCString("{\"clickhouse\":{", *out);
auto settings = FormatSettings();
for (auto it = map.begin(); it != map.end(); ++it)
@ -153,7 +153,7 @@ void FileChecker::load()
}
JSON json(out.str());
JSON files = json["yandex"];
JSON files = json.has("clickhouse") ? json["clickhouse"] : json["yandex"];
for (const JSON file : files) // NOLINT
map[unescapeForFileName(file.getName())] = file.getValue()["size"].toUInt();
}

View File

@ -1,6 +1,5 @@
#include <Common/SettingsChanges.h>
namespace DB
{
namespace

View File

@ -5,6 +5,9 @@
namespace DB
{
class IColumn;
struct SettingChange
{
String name;

View File

@ -128,6 +128,9 @@ void ThreadFuzzer::initConfiguration()
bool ThreadFuzzer::isEffective() const
{
if (!isStarted())
return false;
#if THREAD_FUZZER_WRAP_PTHREAD
# define CHECK_WRAPPER_PARAMS(RET, NAME, ...) \
if (NAME##_before_yield_probability.load(std::memory_order_relaxed)) \
@ -159,6 +162,20 @@ bool ThreadFuzzer::isEffective() const
|| (sleep_probability > 0 && sleep_time_us > 0));
}
void ThreadFuzzer::stop()
{
started.store(false, std::memory_order_relaxed);
}
void ThreadFuzzer::start()
{
started.store(true, std::memory_order_relaxed);
}
bool ThreadFuzzer::isStarted()
{
return started.load(std::memory_order_relaxed);
}
static void injection(
double yield_probability,
@ -166,6 +183,10 @@ static void injection(
double sleep_probability,
double sleep_time_us [[maybe_unused]])
{
DENY_ALLOCATIONS_IN_SCOPE;
if (!ThreadFuzzer::isStarted())
return;
if (yield_probability > 0
&& std::bernoulli_distribution(yield_probability)(thread_local_rng))
{

View File

@ -1,6 +1,6 @@
#pragma once
#include <cstdint>
#include <atomic>
namespace DB
{
@ -54,6 +54,10 @@ public:
bool isEffective() const;
static void stop();
static void start();
static bool isStarted();
private:
uint64_t cpu_time_period_us = 0;
double yield_probability = 0;
@ -61,6 +65,8 @@ private:
double sleep_probability = 0;
double sleep_time_us = 0;
inline static std::atomic<bool> started{true};
ThreadFuzzer();
void initConfiguration();

View File

@ -10,6 +10,7 @@
#include <common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <unistd.h>
@ -277,6 +278,8 @@ public:
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
UInt32 getSessionUptime() const { return session_uptime.elapsedSeconds(); }
private:
friend class EphemeralNodeHolder;
@ -307,6 +310,8 @@ private:
Poco::Logger * log = nullptr;
std::shared_ptr<DB::ZooKeeperLog> zk_log;
AtomicStopwatch session_uptime;
};

View File

@ -5,6 +5,8 @@
#include "Utils.h"
#include <Common/parseRemoteDescription.h>
#include <Common/Exception.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
namespace DB
{
@ -18,7 +20,7 @@ namespace postgres
{
PoolWithFailover::PoolWithFailover(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix,
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
@ -26,45 +28,19 @@ PoolWithFailover::PoolWithFailover(
LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_);
auto db = config.getString(config_prefix + ".db", "");
auto host = config.getString(config_prefix + ".host", "");
auto port = config.getUInt(config_prefix + ".port", 0);
auto user = config.getString(config_prefix + ".user", "");
auto password = config.getString(config_prefix + ".password", "");
if (config.has(config_prefix + ".replica"))
for (const auto & [priority, configurations] : configurations_by_priority)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_prefix, config_keys);
for (const auto & config_key : config_keys)
for (const auto & replica_configuration : configurations)
{
if (config_key.starts_with("replica"))
{
std::string replica_name = config_prefix + "." + config_key;
size_t priority = config.getInt(replica_name + ".priority", 0);
auto replica_host = config.getString(replica_name + ".host", host);
auto replica_port = config.getUInt(replica_name + ".port", port);
auto replica_user = config.getString(replica_name + ".user", user);
auto replica_password = config.getString(replica_name + ".password", password);
auto connection_string = formatConnectionString(db, replica_host, replica_port, replica_user, replica_password).first;
replicas_with_priority[priority].emplace_back(connection_string, pool_size);
}
auto connection_string = formatConnectionString(replica_configuration.database,
replica_configuration.host, replica_configuration.port, replica_configuration.username, replica_configuration.password).first;
replicas_with_priority[priority].emplace_back(connection_string, pool_size, getConnectionForLog(replica_configuration.host, replica_configuration.port));
}
}
else
{
auto connection_string = formatConnectionString(db, host, port, user, password).first;
replicas_with_priority[0].emplace_back(connection_string, pool_size);
}
}
PoolWithFailover::PoolWithFailover(
const std::string & database,
const RemoteDescription & addresses,
const std::string & user, const std::string & password,
const DB::StoragePostgreSQLConfiguration & configuration,
size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
@ -73,11 +49,11 @@ PoolWithFailover::PoolWithFailover(
pool_size, pool_wait_timeout, max_tries_);
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
for (const auto & [host, port] : addresses)
for (const auto & [host, port] : configuration.addresses)
{
LOG_DEBUG(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port);
auto connection_string = formatConnectionString(database, host, port, user, password).first;
replicas_with_priority[0].emplace_back(connection_string, pool_size);
auto connection_string = formatConnectionString(configuration.database, host, port, configuration.username, configuration.password).first;
replicas_with_priority[0].emplace_back(connection_string, pool_size, getConnectionForLog(host, port));
}
}
@ -85,6 +61,7 @@ ConnectionHolderPtr PoolWithFailover::get()
{
std::lock_guard lock(mutex);
DB::WriteBufferFromOwnString error_message;
for (size_t try_idx = 0; try_idx < max_tries; ++try_idx)
{
for (auto & priority : replicas_with_priority)
@ -115,6 +92,7 @@ ConnectionHolderPtr PoolWithFailover::get()
catch (const pqxx::broken_connection & pqxx_error)
{
LOG_ERROR(log, "Connection error: {}", pqxx_error.what());
error_message << "Try " << try_idx + 1 << ". Connection to `" << replica.name_for_log << "` failed: " << pqxx_error.what() << "\n";
replica.pool->returnObject(std::move(connection));
continue;
@ -136,7 +114,7 @@ ConnectionHolderPtr PoolWithFailover::get()
}
}
throw DB::Exception(DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas");
throw DB::Exception(DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, error_message.str());
}
}

View File

@ -11,6 +11,7 @@
#include <mutex>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace postgres
@ -27,17 +28,13 @@ public:
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
PoolWithFailover(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(
const std::string & database,
const RemoteDescription & addresses,
const std::string & user,
const std::string & password,
const DB::StoragePostgreSQLConfiguration & configuration,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
@ -51,9 +48,10 @@ private:
{
String connection_string;
PoolPtr pool;
String name_for_log;
PoolHolder(const String & connection_string_, size_t pool_size)
: connection_string(connection_string_), pool(std::make_shared<Pool>(pool_size)) {}
PoolHolder(const String & connection_string_, size_t pool_size, const String & name_for_log_)
: connection_string(connection_string_), pool(std::make_shared<Pool>(pool_size)), name_for_log(name_for_log_) {}
};
/// Highest priority is 0, the bigger the number in map, the less the priority

View File

@ -3,6 +3,7 @@
#if USE_LIBPQXX
#include <IO/Operators.h>
#include <IO/WriteHelpers.h>
namespace postgres
{
@ -19,6 +20,11 @@ ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, S
return std::make_pair(out.str(), host + ':' + DB::toString(port));
}
String getConnectionForLog(const String & host, UInt16 port)
{
return host + ":" + DB::toString(port);
}
String formatNameForLogs(const String & postgres_database_name, const String & postgres_table_name)
{
/// Logger for StorageMaterializedPostgreSQL - both db and table names.

View File

@ -22,6 +22,8 @@ namespace postgres
ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password);
String getConnectionForLog(const String & host, UInt16 port);
String formatNameForLogs(const String & postgres_database_name, const String & postgres_table_name);
}

View File

@ -2,11 +2,20 @@
#include <string>
#include <tuple>
#include <optional>
#include <Common/Exception.h>
#include <Common/SipHash.h>
#include <Common/quoteString.h>
#include <fmt/format.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}
//TODO replace with StorageID
struct QualifiedTableName
{
@ -30,6 +39,46 @@ struct QualifiedTableName
hash_state.update(table.data(), table.size());
return hash_state.get64();
}
/// NOTE: It's different from compound identifier parsing and does not support escaping and dots in name.
/// Usually it's better to use ParserIdentifier instead,
/// but we parse DDL dictionary name (and similar things) this way for historical reasons.
static std::optional<QualifiedTableName> tryParseFromString(const String & maybe_qualified_name)
{
if (maybe_qualified_name.empty())
return {};
/// Do not allow dot at the beginning and at the end
auto pos = maybe_qualified_name.find('.');
if (pos == 0 || pos == (maybe_qualified_name.size() - 1))
return {};
QualifiedTableName name;
if (pos == std::string::npos)
{
name.table = std::move(maybe_qualified_name);
}
else if (maybe_qualified_name.find('.', pos + 1) != std::string::npos)
{
/// Do not allow multiple dots
return {};
}
else
{
name.database = maybe_qualified_name.substr(0, pos);
name.table = maybe_qualified_name.substr(pos + 1);
}
return name;
}
static QualifiedTableName parseFromString(const String & maybe_qualified_name)
{
auto name = tryParseFromString(maybe_qualified_name);
if (!name)
throw Exception(ErrorCodes::SYNTAX_ERROR, "Invalid qualified name: {}", maybe_qualified_name);
return *name;
}
};
}
@ -47,5 +96,23 @@ template <> struct hash<DB::QualifiedTableName>
return qualified_table.hash();
}
};
}
namespace fmt
{
template <>
struct formatter<DB::QualifiedTableName>
{
constexpr auto parse(format_parse_context & ctx)
{
return ctx.begin();
}
template <typename FormatContext>
auto format(const DB::QualifiedTableName & name, FormatContext & ctx)
{
return format_to(ctx.out(), "{}.{}", DB::backQuoteIfNeed(name.database), DB::backQuoteIfNeed(name.table));
}
};
}

View File

@ -386,6 +386,7 @@ class IColumn;
M(Bool, low_cardinality_allow_in_native_format, true, "Use LowCardinality type in Native format. Otherwise, convert LowCardinality columns to ordinary for select query, and convert ordinary columns to required LowCardinality for insert query.", 0) \
M(Bool, cancel_http_readonly_queries_on_client_close, false, "Cancel HTTP readonly queries when a client closes the connection without waiting for response.", 0) \
M(Bool, external_table_functions_use_nulls, true, "If it is set to true, external table functions will implicitly use Nullable type if needed. Otherwise NULLs will be substituted with default values. Currently supported only by 'mysql', 'postgresql' and 'odbc' table functions.", 0) \
M(Bool, external_table_strict_query, false, "If it is set to true, transforming expression to local filter is forbidden for queries to external tables.", 0) \
\
M(Bool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.", 0) \
M(UInt64, max_hyperscan_regexp_length, 0, "Max length of regexp than can be used in hyperscan multi-match functions. Zero means unlimited.", 0) \
@ -455,7 +456,7 @@ class IColumn;
M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \
M(Bool, enable_global_with_statement, true, "Propagate WITH statements to UNION queries and all subqueries", 0) \
M(Bool, aggregate_functions_null_for_empty, false, "Rewrite all aggregate functions in a query, adding -OrNull suffix to them", 0) \
M(Bool, optimize_fuse_sum_count_avg, false, "Fuse aggregate functions sum(), avg(), count() with identical arguments into one sumCount() call, if the query has at least two different functions", 0) \
M(Bool, optimize_syntax_fuse_functions, false, "Fuse aggregate functions (`sum, avg, count` with identical arguments into one `sumCount`, quantile-family functions with the same argument into `quantiles*(...)[...]`)", 0) \
M(Bool, flatten_nested, true, "If true, columns of type Nested will be flatten to separate array columns instead of one array of tuples", 0) \
M(Bool, asterisk_include_materialized_columns, false, "Include MATERIALIZED columns for wildcard query", 0) \
M(Bool, asterisk_include_alias_columns, false, "Include ALIAS columns for wildcard query", 0) \
@ -507,8 +508,16 @@ class IColumn;
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
\
M(Int64, remote_disk_read_backoff_threashold, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(Int64, remote_disk_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 100000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_stale_timeout_ms, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \
\
M(Int64, remote_fs_read_backoff_threshold, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(Int64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
@ -525,6 +534,7 @@ class IColumn;
M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "Obsolete setting, does nothing.", 0) \
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing.", 0) \
M(UInt64, replication_alter_columns_timeout, 60, "Obsolete setting, does nothing.", 0) \
M(Bool, optimize_fuse_sum_count_avg, false, "Obsolete, use optimize_syntax_fuse_functions", 0) \
/** The section above is for obsolete settings. Do not add anything there. */
@ -577,6 +587,7 @@ class IColumn;
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
M(Bool, output_format_tsv_crlf_end_of_line, false, "If it is set true, end of line in TSV format will be \\r\\n instead of \\n.", 0) \
M(String, output_format_csv_null_representation, "\\N", "Custom NULL representation in CSV format", 0) \
M(String, output_format_tsv_null_representation, "\\N", "Custom NULL representation in TSV format", 0) \
M(Bool, output_format_decimal_trailing_zeros, false, "Output trailing zeros when printing Decimal values. E.g. 1.230000 instead of 1.23.", 0) \
\
@ -605,14 +616,6 @@ class IColumn;
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_mode, false, "Insert query is processed almost instantly, but an actual data queued for later asynchronous insertion", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_stale_timeout, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \
\
M(Bool, cross_to_inner_join_rewrite, true, "Use inner join instead of comma/cross join if possible", 0) \
\
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \

View File

@ -327,7 +327,7 @@ void SerializationNullable::serializeTextCSV(const IColumn & column, size_t row_
const ColumnNullable & col = assert_cast<const ColumnNullable &>(column);
if (col.isNullAt(row_num))
writeCString("\\N", ostr);
writeString(settings.csv.null_representation, ostr);
else
nested->serializeTextCSV(col.getNestedColumn(), row_num, ostr, settings);
}

View File

@ -0,0 +1,103 @@
#include <Databases/DDLDependencyVisitor.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
#include <Poco/String.h>
namespace DB
{
void DDLDependencyVisitor::visit(const ASTPtr & ast, Data & data)
{
/// Looking for functions in column default expressions and dictionary source definition
if (const auto * function = ast->as<ASTFunction>())
visit(*function, data);
else if (const auto * dict_source = ast->as<ASTFunctionWithKeyValueArguments>())
visit(*dict_source, data);
}
bool DDLDependencyVisitor::needChildVisit(const ASTPtr & node, const ASTPtr & /*child*/)
{
return !node->as<ASTStorage>();
}
void DDLDependencyVisitor::visit(const ASTFunction & function, Data & data)
{
if (function.name == "joinGet" ||
function.name == "dictHas" ||
function.name == "dictIsIn" ||
function.name.starts_with("dictGet"))
{
extractTableNameFromArgument(function, data, 0);
}
else if (Poco::toLower(function.name) == "in")
{
extractTableNameFromArgument(function, data, 1);
}
}
void DDLDependencyVisitor::visit(const ASTFunctionWithKeyValueArguments & dict_source, Data & data)
{
if (dict_source.name != "clickhouse")
return;
if (!dict_source.elements)
return;
auto config = getDictionaryConfigurationFromAST(data.create_query->as<ASTCreateQuery &>(), data.global_context);
auto info = getInfoIfClickHouseDictionarySource(config, data.global_context);
if (!info || !info->is_local)
return;
if (info->table_name.database.empty())
info->table_name.database = data.default_database;
data.dependencies.emplace(std::move(info->table_name));
}
void DDLDependencyVisitor::extractTableNameFromArgument(const ASTFunction & function, Data & data, size_t arg_idx)
{
/// Just ignore incorrect arguments, proper exception will be thrown later
if (!function.arguments || function.arguments->children.size() <= arg_idx)
return;
QualifiedTableName qualified_name;
const auto * arg = function.arguments->as<ASTExpressionList>()->children[arg_idx].get();
if (const auto * literal = arg->as<ASTLiteral>())
{
if (literal->value.getType() != Field::Types::String)
return;
auto maybe_qualified_name = QualifiedTableName::tryParseFromString(literal->value.get<String>());
/// Just return if name if invalid
if (!maybe_qualified_name)
return;
qualified_name = std::move(*maybe_qualified_name);
}
else if (const auto * identifier = arg->as<ASTIdentifier>())
{
auto table_identifier = identifier->createTable();
/// Just return if table identified is invalid
if (!table_identifier)
return;
qualified_name.database = table_identifier->getDatabaseName();
qualified_name.table = table_identifier->shortName();
}
else
{
assert(false);
return;
}
if (qualified_name.database.empty())
qualified_name.database = data.default_database;
data.dependencies.emplace(std::move(qualified_name));
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Core/QualifiedTableName.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
class ASTFunction;
class ASTFunctionWithKeyValueArguments;
/// Visits ASTCreateQuery and extracts names of table (or dictionary) dependencies
/// from column default expressions (joinGet, dictGet, etc)
/// or dictionary source (for dictionaries from local ClickHouse table).
/// Does not validate AST, works a best-effort way.
class DDLDependencyVisitor
{
public:
struct Data
{
using TableNamesSet = std::set<QualifiedTableName>;
String default_database;
TableNamesSet dependencies;
ContextPtr global_context;
ASTPtr create_query;
};
using Visitor = ConstInDepthNodeVisitor<DDLDependencyVisitor, true>;
static void visit(const ASTPtr & ast, Data & data);
static bool needChildVisit(const ASTPtr & node, const ASTPtr & child);
private:
static void visit(const ASTFunction & function, Data & data);
static void visit(const ASTFunctionWithKeyValueArguments & dict_source, Data & data);
static void extractTableNameFromArgument(const ASTFunction & function, Data & data, size_t arg_idx);
};
using TableLoadingDependenciesVisitor = DDLDependencyVisitor::Visitor;
}

View File

@ -416,40 +416,49 @@ UUID DatabaseAtomic::tryGetTableUUID(const String & table_name) const
return UUIDHelpers::Nil;
}
void DatabaseAtomic::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseAtomic::beforeLoadingMetadata(ContextMutablePtr /*context*/, bool force_restore, bool /*force_attach*/)
{
if (!force_restore)
return;
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
if (has_force_restore_data_flag)
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
{
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
if (!fs::is_symlink(table_path))
{
if (!fs::is_symlink(table_path))
{
throw Exception(ErrorCodes::ABORTED,
"'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path.path()));
}
fs::remove(table_path);
}
}
DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
if (has_force_restore_data_flag)
{
NameToPathMap table_names;
{
std::lock_guard lock{mutex};
table_names = table_name_to_path;
throw Exception(ErrorCodes::ABORTED,
"'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path.path()));
}
fs::create_directories(path_to_table_symlinks);
for (const auto & table : table_names)
tryCreateSymlink(table.first, table.second, true);
fs::remove(table_path);
}
}
void DatabaseAtomic::loadStoredObjects(
ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
beforeLoadingMetadata(local_context, force_restore, force_attach);
DatabaseOrdinary::loadStoredObjects(local_context, force_restore, force_attach, skip_startup_tables);
}
void DatabaseAtomic::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
DatabaseOrdinary::startupTables(thread_pool, force_restore, force_attach);
if (!force_restore)
return;
NameToPathMap table_names;
{
std::lock_guard lock{mutex};
table_names = table_name_to_path;
}
fs::create_directories(path_to_table_symlinks);
for (const auto & table : table_names)
tryCreateSymlink(table.first, table.second, true);
}
void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist)
{
try

View File

@ -47,7 +47,11 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void beforeLoadingMetadata(ContextMutablePtr context, bool force_restore, bool force_attach) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
/// Atomic database cannot be detached if there is detached table which still in use
void assertCanBeDetached(bool cleanup) override;

View File

@ -13,6 +13,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/formatAST.h>
#include <Common/Macros.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <filesystem>
#if !defined(ARCADIA_BUILD)
@ -38,6 +39,7 @@
#include <Databases/PostgreSQL/DatabasePostgreSQL.h> // Y_IGNORE
#include <Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h>
#include <Storages/PostgreSQL/MaterializedPostgreSQLSettings.h>
#include <Storages/StoragePostgreSQL.h>
#endif
#if USE_SQLITE
@ -141,40 +143,66 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
else if (engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "MaterializedMySQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 4)
throw Exception(
engine_name + " Database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.",
ErrorCodes::BAD_ARGUMENTS);
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
StorageMySQLConfiguration configuration;
ASTs & arguments = engine->arguments->children;
arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], context);
const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name);
const auto & mysql_database_name = safeGetLiteralValue<String>(arguments[1], engine_name);
const auto & mysql_user_name = safeGetLiteralValue<String>(arguments[2], engine_name);
const auto & mysql_user_password = safeGetLiteralValue<String>(arguments[3], engine_name);
if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
if (!storage_specific_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.");
}
else
{
if (arguments.size() != 4)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.");
arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], context);
const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name);
if (engine_name == "MySQL")
{
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
}
else
{
const auto & [remote_host, remote_port] = parseAddress(host_port, 3306);
configuration.host = remote_host;
configuration.port = remote_port;
}
configuration.database = safeGetLiteralValue<String>(arguments[1], engine_name);
configuration.username = safeGetLiteralValue<String>(arguments[2], engine_name);
configuration.password = safeGetLiteralValue<String>(arguments[3], engine_name);
}
try
{
if (engine_name == "MySQL")
{
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
/// Split into replicas if needed.
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
auto mysql_pool = mysqlxx::PoolWithFailover(mysql_database_name, addresses, mysql_user_name, mysql_user_password);
auto mysql_pool = mysqlxx::PoolWithFailover(configuration.database, configuration.addresses, configuration.username, configuration.password);
mysql_database_settings->loadFromQueryContext(context);
mysql_database_settings->loadFromQuery(*engine_define); /// higher priority
return std::make_shared<DatabaseMySQL>(
context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_database_settings), std::move(mysql_pool));
context, database_name, metadata_path, engine_define, configuration.database, std::move(mysql_database_settings), std::move(mysql_pool));
}
const auto & [remote_host_name, remote_port] = parseAddress(host_port, 3306);
MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password);
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port);
MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password);
auto mysql_pool = mysqlxx::Pool(configuration.database, configuration.host, configuration.username, configuration.password, configuration.port);
auto materialize_mode_settings = std::make_unique<MaterializedMySQLSettings>();
@ -183,12 +211,12 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (create.uuid == UUIDHelpers::Nil)
return std::make_shared<DatabaseMaterializedMySQL<DatabaseOrdinary>>(
context, database_name, metadata_path, uuid, mysql_database_name,
std::move(mysql_pool), std::move(client), std::move(materialize_mode_settings));
context, database_name, metadata_path, uuid, configuration.database, std::move(mysql_pool),
std::move(client), std::move(materialize_mode_settings));
else
return std::make_shared<DatabaseMaterializedMySQL<DatabaseAtomic>>(
context, database_name, metadata_path, uuid, mysql_database_name,
std::move(mysql_pool), std::move(client), std::move(materialize_mode_settings));
context, database_name, metadata_path, uuid, configuration.database, std::move(mysql_pool),
std::move(client), std::move(materialize_mode_settings));
}
catch (...)
{
@ -242,77 +270,109 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
else if (engine_name == "PostgreSQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() < 4 || engine->arguments->children.size() > 6)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{} Database require `host:port`, `database_name`, `username`, `password` [, `schema` = "", `use_table_cache` = 0].",
engine_name);
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
ASTs & engine_args = engine->arguments->children;
auto use_table_cache = false;
StoragePostgreSQLConfiguration configuration;
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name);
const auto & postgres_database_name = safeGetLiteralValue<String>(engine_args[1], engine_name);
const auto & username = safeGetLiteralValue<String>(engine_args[2], engine_name);
const auto & password = safeGetLiteralValue<String>(engine_args[3], engine_name);
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
String schema;
if (engine->arguments->children.size() >= 5)
schema = safeGetLiteralValue<String>(engine_args[4], engine_name);
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "use_table_cache")
use_table_cache = true;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unexpected key-value argument."
"Got: {}, but expected one of:"
"host, port, username, password, database, schema, use_table_cache.", arg_name);
}
}
else
{
if (engine_args.size() < 4 || engine_args.size() > 6)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"PostgreSQL Database require `host:port`, `database_name`, `username`, `password`"
"[, `schema` = "", `use_table_cache` = 0");
auto use_table_cache = 0;
if (engine->arguments->children.size() >= 6)
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name);
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432);
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
if (engine_args.size() >= 5)
configuration.schema = safeGetLiteralValue<String>(engine_args[4], engine_name);
}
if (engine_args.size() >= 6)
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name);
/// Split into replicas if needed.
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432);
/// no connection is made here
auto connection_pool = std::make_shared<postgres::PoolWithFailover>(
postgres_database_name,
addresses,
username, password,
auto pool = std::make_shared<postgres::PoolWithFailover>(configuration,
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, postgres_database_name, schema, connection_pool, use_table_cache);
context, metadata_path, engine_define, database_name, configuration, pool, use_table_cache);
}
else if (engine_name == "MaterializedPostgreSQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 4)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{} Database require `host:port`, `database_name`, `username`, `password`.",
engine_name);
}
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
ASTs & engine_args = engine->arguments->children;
StoragePostgreSQLConfiguration configuration;
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name);
const auto & postgres_database_name = safeGetLiteralValue<String>(engine_args[1], engine_name);
const auto & username = safeGetLiteralValue<String>(engine_args[2], engine_name);
const auto & password = safeGetLiteralValue<String>(engine_args[3], engine_name);
if (!storage_specific_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MaterializedPostgreSQL Database requires only `host`, `port`, `database_name`, `username`, `password`.");
}
else
{
if (engine_args.size() != 4)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MaterializedPostgreSQL Database require `host:port`, `database_name`, `username`, `password`.");
auto parsed_host_port = parseAddress(host_port, 5432);
auto connection_info = postgres::formatConnectionString(postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
auto parsed_host_port = parseAddress(safeGetLiteralValue<String>(engine_args[0], engine_name), 5432);
configuration.host = parsed_host_port.first;
configuration.port = parsed_host_port.second;
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
}
auto connection_info = postgres::formatConnectionString(
configuration.database, configuration.host, configuration.port, configuration.username, configuration.password);
auto postgresql_replica_settings = std::make_unique<MaterializedPostgreSQLSettings>();
if (engine_define->settings)
postgresql_replica_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializedPostgreSQL>(
context, metadata_path, uuid, create.attach,
database_name, postgres_database_name, connection_info,
database_name, configuration.database, connection_info,
std::move(postgresql_replica_settings));
}

View File

@ -36,7 +36,7 @@ DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_,
void DatabaseLazy::loadStoredObjects(
ContextMutablePtr local_context, bool /* has_force_restore_data_flag */, bool /*force_attach*/, bool /* skip_startup_tables */)
ContextMutablePtr local_context, bool /* force_restore */, bool /*force_attach*/, bool /* skip_startup_tables */)
{
iterateMetadataFiles(local_context, [this](const String & file_name)
{

View File

@ -26,7 +26,7 @@ public:
bool canContainDistributedTables() const override { return false; }
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void createTable(
ContextPtr context,

View File

@ -42,12 +42,17 @@ void DatabaseMemory::dropTable(
try
{
table->drop();
fs::path table_data_dir{getTableDataPath(table_name)};
if (fs::exists(table_data_dir))
fs::remove_all(table_data_dir);
if (table->storesDataOnDisk())
{
assert(database_name != DatabaseCatalog::TEMPORARY_DATABASE);
fs::path table_data_dir{getTableDataPath(table_name)};
if (fs::exists(table_data_dir))
fs::remove_all(table_data_dir);
}
}
catch (...)
{
assert(database_name != DatabaseCatalog::TEMPORARY_DATABASE);
attachTableUnlocked(table_name, table, lock);
throw;
}

View File

@ -46,7 +46,7 @@ std::pair<String, StoragePtr> createTableFromAST(
const String & database_name,
const String & table_data_path_relative,
ContextMutablePtr context,
bool has_force_restore_data_flag)
bool force_restore)
{
ast_create_query.attach = true;
ast_create_query.database = database_name;
@ -88,7 +88,7 @@ std::pair<String, StoragePtr> createTableFromAST(
context->getGlobalContext(),
columns,
constraints,
has_force_restore_data_flag)
force_restore)
};
}

View File

@ -16,7 +16,7 @@ std::pair<String, StoragePtr> createTableFromAST(
const String & database_name,
const String & table_data_path_relative,
ContextMutablePtr context,
bool has_force_restore_data_flag);
bool force_restore);
/** Get the string with the table definition based on the CREATE query.
* It is an ATTACH query that you can execute to create a table from the correspondent database.

View File

@ -4,6 +4,8 @@
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabasesCommon.h>
#include <Databases/DDLDependencyVisitor.h>
#include <Databases/TablesLoader.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -27,8 +29,6 @@ namespace fs = std::filesystem;
namespace DB
{
static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
namespace
@ -39,7 +39,7 @@ namespace
DatabaseOrdinary & database,
const String & database_name,
const String & metadata_path,
bool has_force_restore_data_flag)
bool force_restore)
{
try
{
@ -48,7 +48,7 @@ namespace
database_name,
database.getTableDataPath(query),
context,
has_force_restore_data_flag);
force_restore);
database.attachTable(table_name, table, database.getTableDataPath(query));
}
@ -60,15 +60,6 @@ namespace
throw;
}
}
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch)
{
if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, "{}%", processed * 100.0 / total);
watch.restart();
}
}
}
@ -84,20 +75,88 @@ DatabaseOrdinary::DatabaseOrdinary(
}
void DatabaseOrdinary::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool /*force_attach*/, bool skip_startup_tables)
ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
/** Tables load faster if they are loaded in sorted (by name) order.
* Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
* which does not correspond to order tables creation and does not correspond to order of their location on disk.
*/
using FileNames = std::map<std::string, ASTPtr>;
std::mutex file_names_mutex;
FileNames file_names;
size_t total_dictionaries = 0;
ParsedTablesMetadata metadata;
loadTablesMetadata(local_context, metadata);
auto process_metadata = [&file_names, &total_dictionaries, &file_names_mutex, this](
const String & file_name)
size_t total_tables = metadata.parsed_tables.size() - metadata.total_dictionaries;
AtomicStopwatch watch;
std::atomic<size_t> dictionaries_processed{0};
std::atomic<size_t> tables_processed{0};
ThreadPool pool;
/// We must attach dictionaries before attaching tables
/// because while we're attaching tables we may need to have some dictionaries attached
/// (for example, dictionaries can be used in the default expressions for some tables).
/// On the other hand we can attach any dictionary (even sourced from ClickHouse table)
/// without having any tables attached. It is so because attaching of a dictionary means
/// loading of its config only, it doesn't involve loading the dictionary itself.
/// Attach dictionaries.
for (const auto & name_with_path_and_query : metadata.parsed_tables)
{
const auto & name = name_with_path_and_query.first;
const auto & path = name_with_path_and_query.second.path;
const auto & ast = name_with_path_and_query.second.ast;
const auto & create_query = ast->as<const ASTCreateQuery &>();
if (create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
loadTableFromMetadata(local_context, path, name, ast, force_restore);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++dictionaries_processed, metadata.total_dictionaries, watch);
});
}
}
pool.wait();
/// Attach tables.
for (const auto & name_with_path_and_query : metadata.parsed_tables)
{
const auto & name = name_with_path_and_query.first;
const auto & path = name_with_path_and_query.second.path;
const auto & ast = name_with_path_and_query.second.ast;
const auto & create_query = ast->as<const ASTCreateQuery &>();
if (!create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
loadTableFromMetadata(local_context, path, name, ast, force_restore);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++tables_processed, total_tables, watch);
});
}
}
pool.wait();
if (!skip_startup_tables)
{
/// After all tables was basically initialized, startup them.
startupTables(pool, force_restore, force_attach);
}
}
void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTablesMetadata & metadata)
{
size_t prev_tables_count = metadata.parsed_tables.size();
size_t prev_total_dictionaries = metadata.total_dictionaries;
auto process_metadata = [&metadata, this](const String & file_name)
{
fs::path path(getMetadataPath());
fs::path file_path(file_name);
@ -122,9 +181,29 @@ void DatabaseOrdinary::loadStoredObjects(
return;
}
std::lock_guard lock{file_names_mutex};
file_names[file_name] = ast;
total_dictionaries += create_query->is_dictionary;
TableLoadingDependenciesVisitor::Data data;
data.default_database = metadata.default_database;
data.create_query = ast;
data.global_context = getContext();
TableLoadingDependenciesVisitor visitor{data};
visitor.visit(ast);
QualifiedTableName qualified_name{database_name, create_query->table};
std::lock_guard lock{metadata.mutex};
metadata.parsed_tables[qualified_name] = ParsedTableMetadata{full_path.string(), ast};
if (data.dependencies.empty())
{
metadata.independent_database_objects.emplace_back(std::move(qualified_name));
}
else
{
for (const auto & dependency : data.dependencies)
{
metadata.dependencies_info[dependency].dependent_database_objects.push_back(qualified_name);
++metadata.dependencies_info[qualified_name].dependencies_count;
}
}
metadata.total_dictionaries += create_query->is_dictionary;
}
}
catch (Exception & e)
@ -136,86 +215,29 @@ void DatabaseOrdinary::loadStoredObjects(
iterateMetadataFiles(local_context, process_metadata);
size_t total_tables = file_names.size() - total_dictionaries;
size_t objects_in_database = metadata.parsed_tables.size() - prev_tables_count;
size_t dictionaries_in_database = metadata.total_dictionaries - prev_total_dictionaries;
size_t tables_in_database = objects_in_database - dictionaries_in_database;
LOG_INFO(log, "Total {} tables and {} dictionaries.", total_tables, total_dictionaries);
AtomicStopwatch watch;
std::atomic<size_t> tables_processed{0};
ThreadPool pool;
/// We must attach dictionaries before attaching tables
/// because while we're attaching tables we may need to have some dictionaries attached
/// (for example, dictionaries can be used in the default expressions for some tables).
/// On the other hand we can attach any dictionary (even sourced from ClickHouse table)
/// without having any tables attached. It is so because attaching of a dictionary means
/// loading of its config only, it doesn't involve loading the dictionary itself.
/// Attach dictionaries.
for (const auto & name_with_query : file_names)
{
const auto & create_query = name_with_query.second->as<const ASTCreateQuery &>();
if (create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
tryAttachTable(
local_context,
create_query,
*this,
database_name,
getMetadataPath() + name_with_query.first,
has_force_restore_data_flag);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++tables_processed, total_tables, watch);
});
}
}
pool.wait();
/// Attach tables.
for (const auto & name_with_query : file_names)
{
const auto & create_query = name_with_query.second->as<const ASTCreateQuery &>();
if (!create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
tryAttachTable(
local_context,
create_query,
*this,
database_name,
getMetadataPath() + name_with_query.first,
has_force_restore_data_flag);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++tables_processed, total_tables, watch);
});
}
}
pool.wait();
if (!skip_startup_tables)
{
/// After all tables was basically initialized, startup them.
startupTablesImpl(pool);
}
LOG_INFO(log, "Metadata processed, database {} has {} tables and {} dictionaries in total.",
database_name, tables_in_database, dictionaries_in_database);
}
void DatabaseOrdinary::startupTables()
void DatabaseOrdinary::loadTableFromMetadata(ContextMutablePtr local_context, const String & file_path, const QualifiedTableName & name, const ASTPtr & ast, bool force_restore)
{
ThreadPool pool;
startupTablesImpl(pool);
assert(name.database == database_name);
const auto & create_query = ast->as<const ASTCreateQuery &>();
tryAttachTable(
local_context,
create_query,
*this,
name.database,
file_path,
force_restore);
}
void DatabaseOrdinary::startupTablesImpl(ThreadPool & thread_pool)
void DatabaseOrdinary::startupTables(ThreadPool & thread_pool, bool /*force_restore*/, bool /*force_attach*/)
{
LOG_INFO(log, "Starting up tables.");
@ -240,6 +262,7 @@ void DatabaseOrdinary::startupTablesImpl(ThreadPool & thread_pool)
}
catch (...)
{
/// We have to wait for jobs to finish here, because job function has reference to variables on the stack of current thread.
thread_pool.wait();
throw;
}

View File

@ -21,9 +21,15 @@ public:
String getEngineName() const override { return "Ordinary"; }
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void startupTables() override;
bool supportsLoadingInTopologicalOrder() const override { return true; }
void loadTablesMetadata(ContextPtr context, ParsedTablesMetadata & metadata) override;
void loadTableFromMetadata(ContextMutablePtr local_context, const String & file_path, const QualifiedTableName & name, const ASTPtr & ast, bool force_restore) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
void alterTable(
ContextPtr context,
@ -37,8 +43,6 @@ protected:
const String & table_metadata_path,
const String & statement,
ContextPtr query_context);
void startupTablesImpl(ThreadPool & thread_pool);
};
}

View File

@ -305,13 +305,21 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
createEmptyLogEntry(current_zookeeper);
}
void DatabaseReplicated::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseReplicated::beforeLoadingMetadata(ContextMutablePtr /*context*/, bool /*force_restore*/, bool force_attach)
{
tryConnectToZooKeeperAndInitDatabase(force_attach);
}
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
void DatabaseReplicated::loadStoredObjects(
ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
beforeLoadingMetadata(local_context, force_restore, force_attach);
DatabaseAtomic::loadStoredObjects(local_context, force_restore, force_attach, skip_startup_tables);
}
void DatabaseReplicated::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
DatabaseAtomic::startupTables(thread_pool, force_restore, force_attach);
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
ddl_worker->startup();
}

View File

@ -57,7 +57,12 @@ public:
void drop(ContextPtr /*context*/) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void beforeLoadingMetadata(ContextMutablePtr context, bool force_restore, bool force_attach) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
void shutdown() override;
friend struct DatabaseReplicatedTask;

View File

@ -5,6 +5,7 @@
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include <Core/UUID.h>
#include <ctime>
@ -27,11 +28,14 @@ class ASTCreateQuery;
class AlterCommands;
class SettingsChanges;
using DictionariesWithID = std::vector<std::pair<String, UUID>>;
struct ParsedTablesMetadata;
struct QualifiedTableName;
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int LOGICAL_ERROR;
}
class IDatabaseTablesIterator
@ -127,13 +131,32 @@ public:
/// You can call only once, right after the object is created.
virtual void loadStoredObjects(
ContextMutablePtr /*context*/,
bool /*has_force_restore_data_flag*/,
bool /*force_restore*/,
bool /*force_attach*/ = false,
bool /* skip_startup_tables */ = false)
{
}
virtual void startupTables() {}
virtual bool supportsLoadingInTopologicalOrder() const { return false; }
virtual void beforeLoadingMetadata(
ContextMutablePtr /*context*/,
bool /*force_restore*/,
bool /*force_attach*/)
{
}
virtual void loadTablesMetadata(ContextPtr /*local_context*/, ParsedTablesMetadata & /*metadata*/)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented");
}
virtual void loadTableFromMetadata(ContextMutablePtr /*local_context*/, const String & /*file_path*/, const QualifiedTableName & /*name*/, const ASTPtr & /*ast*/, bool /*force_restore*/)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented");
}
virtual void startupTables(ThreadPool & /*thread_pool*/, bool /*force_restore*/, bool /*force_attach*/) {}
/// Check the existence of the table.
virtual bool isTableExist(const String & name, ContextPtr context) const = 0;

View File

@ -94,10 +94,10 @@ void DatabaseMaterializedMySQL<Base>::setException(const std::exception_ptr & ex
}
template <typename Base>
void DatabaseMaterializedMySQL<Base>::loadStoredObjects(
ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseMaterializedMySQL<Base>::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
Base::loadStoredObjects(context_, has_force_restore_data_flag, force_attach, skip_startup_tables);
Base::startupTables(thread_pool, force_restore, force_attach);
if (!force_attach)
materialize_thread.assertMySQLAvailable();

View File

@ -43,7 +43,7 @@ protected:
public:
String getEngineName() const override { return "MaterializedMySQL"; }
void loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
void createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) override;

View File

@ -108,11 +108,9 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
}
void DatabaseMaterializedPostgreSQL::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseMaterializedPostgreSQL::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
DatabaseAtomic::startupTables(thread_pool, force_restore, force_attach);
try
{
startSynchronization();

View File

@ -42,7 +42,7 @@ public:
String getMetadataPath() const override { return metadata_path; }
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
DatabaseTablesIteratorPtr
getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;

View File

@ -39,16 +39,14 @@ DatabasePostgreSQL::DatabasePostgreSQL(
const String & metadata_path_,
const ASTStorage * database_engine_define_,
const String & dbname_,
const String & postgres_dbname_,
const String & postgres_schema_,
const StoragePostgreSQLConfiguration & configuration_,
postgres::PoolWithFailoverPtr pool_,
bool cache_tables_)
: IDatabase(dbname_)
, WithContext(context_->getGlobalContext())
, metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone())
, postgres_dbname(postgres_dbname_)
, postgres_schema(postgres_schema_)
, configuration(configuration_)
, pool(std::move(pool_))
, cache_tables(cache_tables_)
{
@ -59,17 +57,17 @@ DatabasePostgreSQL::DatabasePostgreSQL(
String DatabasePostgreSQL::getTableNameForLogs(const String & table_name) const
{
if (postgres_schema.empty())
return fmt::format("{}.{}", postgres_dbname, table_name);
return fmt::format("{}.{}.{}", postgres_dbname, postgres_schema, table_name);
if (configuration.schema.empty())
return fmt::format("{}.{}", configuration.database, table_name);
return fmt::format("{}.{}.{}", configuration.database, configuration.schema, table_name);
}
String DatabasePostgreSQL::formatTableName(const String & table_name) const
{
if (postgres_schema.empty())
if (configuration.schema.empty())
return doubleQuoteString(table_name);
return fmt::format("{}.{}", doubleQuoteString(postgres_schema), doubleQuoteString(table_name));
return fmt::format("{}.{}", doubleQuoteString(configuration.schema), doubleQuoteString(table_name));
}
@ -78,7 +76,7 @@ bool DatabasePostgreSQL::empty() const
std::lock_guard<std::mutex> lock(mutex);
auto connection_holder = pool->get();
auto tables_list = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema);
auto tables_list = fetchPostgreSQLTablesList(connection_holder->get(), configuration.schema);
for (const auto & table_name : tables_list)
if (!detached_or_dropped.count(table_name))
@ -94,7 +92,7 @@ DatabaseTablesIteratorPtr DatabasePostgreSQL::getTablesIterator(ContextPtr local
Tables tables;
auto connection_holder = pool->get();
auto table_names = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema);
auto table_names = fetchPostgreSQLTablesList(connection_holder->get(), configuration.schema);
for (const auto & table_name : table_names)
if (!detached_or_dropped.count(table_name))
@ -125,7 +123,7 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
"WHERE schemaname != 'pg_catalog' AND {} "
"AND tablename = '{}'",
formatTableName(table_name),
(postgres_schema.empty() ? "schemaname != 'information_schema'" : "schemaname = " + quoteString(postgres_schema)),
(configuration.schema.empty() ? "schemaname != 'information_schema'" : "schemaname = " + quoteString(configuration.schema)),
formatTableName(table_name)));
}
catch (pqxx::undefined_table const &)
@ -179,7 +177,7 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr,
auto storage = StoragePostgreSQL::create(
StorageID(database_name, table_name), pool, table_name,
ColumnsDescription{*columns}, ConstraintsDescription{}, String{}, postgres_schema);
ColumnsDescription{*columns}, ConstraintsDescription{}, String{}, configuration.schema, configuration.on_conflict);
if (cache_tables)
cached_tables[table_name] = storage;
@ -306,7 +304,7 @@ void DatabasePostgreSQL::removeOutdatedTables()
{
std::lock_guard<std::mutex> lock{mutex};
auto connection_holder = pool->get();
auto actual_tables = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema);
auto actual_tables = fetchPostgreSQLTablesList(connection_holder->get(), configuration.schema);
if (cache_tables)
{

View File

@ -10,7 +10,7 @@
#include <Core/BackgroundSchedulePool.h>
#include <Parsers/ASTCreateQuery.h>
#include <Core/PostgreSQL/PoolWithFailover.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB
{
@ -32,8 +32,7 @@ public:
const String & metadata_path_,
const ASTStorage * database_engine_define,
const String & dbname_,
const String & postgres_dbname_,
const String & postgres_schema_,
const StoragePostgreSQLConfiguration & configuration,
postgres::PoolWithFailoverPtr pool_,
bool cache_tables_);
@ -70,8 +69,7 @@ protected:
private:
String metadata_path;
ASTPtr database_engine_define;
String postgres_dbname;
String postgres_schema;
StoragePostgreSQLConfiguration configuration;
postgres::PoolWithFailoverPtr pool;
const bool cache_tables;

View File

@ -0,0 +1,255 @@
#include <Databases/TablesLoader.h>
#include <Databases/IDatabase.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <numeric>
namespace DB
{
namespace ErrorCodes
{
extern const int INFINITE_LOOP;
extern const int LOGICAL_ERROR;
}
static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch)
{
if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, "{}%", processed * 100.0 / total);
watch.restart();
}
}
TablesLoader::TablesLoader(ContextMutablePtr global_context_, Databases databases_, bool force_restore_, bool force_attach_)
: global_context(global_context_)
, databases(std::move(databases_))
, force_restore(force_restore_)
, force_attach(force_attach_)
{
metadata.default_database = global_context->getCurrentDatabase();
log = &Poco::Logger::get("TablesLoader");
}
void TablesLoader::loadTables()
{
bool need_resolve_dependencies = !global_context->getConfigRef().has("ignore_table_dependencies_on_metadata_loading");
/// Load all Lazy, MySQl, PostgreSQL, SQLite, etc databases first.
for (auto & database : databases)
{
if (need_resolve_dependencies && database.second->supportsLoadingInTopologicalOrder())
databases_to_load.push_back(database.first);
else
database.second->loadStoredObjects(global_context, force_restore, force_attach, true);
}
if (databases_to_load.empty())
return;
/// Read and parse metadata from Ordinary, Atomic, Materialized*, Replicated, etc databases. Build dependency graph.
for (auto & database_name : databases_to_load)
{
databases[database_name]->beforeLoadingMetadata(global_context, force_restore, force_attach);
databases[database_name]->loadTablesMetadata(global_context, metadata);
}
LOG_INFO(log, "Parsed metadata of {} tables in {} databases in {} sec",
metadata.parsed_tables.size(), databases_to_load.size(), stopwatch.elapsedSeconds());
stopwatch.restart();
logDependencyGraph();
/// Some tables were loaded by database with loadStoredObjects(...). Remove them from graph if necessary.
removeUnresolvableDependencies();
loadTablesInTopologicalOrder(pool);
}
void TablesLoader::startupTables()
{
/// Startup tables after all tables are loaded. Background tasks (merges, mutations, etc) may slow down data parts loading.
for (auto & database : databases)
database.second->startupTables(pool, force_restore, force_attach);
}
void TablesLoader::removeUnresolvableDependencies()
{
auto need_exclude_dependency = [this](const QualifiedTableName & dependency_name, const DependenciesInfo & info)
{
/// Table exists and will be loaded
if (metadata.parsed_tables.contains(dependency_name))
return false;
/// Table exists and it's already loaded
if (DatabaseCatalog::instance().isTableExist(StorageID(dependency_name.database, dependency_name.table), global_context))
return true;
/// It's XML dictionary. It was loaded before tables and DDL dictionaries.
if (dependency_name.database == metadata.default_database &&
global_context->getExternalDictionariesLoader().has(dependency_name.table))
return true;
/// Some tables depends on table "dependency_name", but there is no such table in DatabaseCatalog and we don't have its metadata.
/// We will ignore it and try to load dependent tables without "dependency_name"
/// (but most likely dependent tables will fail to load).
LOG_WARNING(log, "Tables {} depend on {}, but seems like the it does not exist. Will ignore it and try to load existing tables",
fmt::join(info.dependent_database_objects, ", "), dependency_name);
if (info.dependencies_count)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not exist, but we have seen its AST and found {} dependencies."
"It's a bug", dependency_name, info.dependencies_count);
if (info.dependent_database_objects.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not have dependencies and dependent tables as it expected to."
"It's a bug", dependency_name);
return true;
};
auto table_it = metadata.dependencies_info.begin();
while (table_it != metadata.dependencies_info.end())
{
auto & info = table_it->second;
if (need_exclude_dependency(table_it->first, info))
table_it = removeResolvedDependency(table_it, metadata.independent_database_objects);
else
++table_it;
}
}
void TablesLoader::loadTablesInTopologicalOrder(ThreadPool & pool)
{
/// Load independent tables in parallel.
/// Then remove loaded tables from dependency graph, find tables/dictionaries that do not have unresolved dependencies anymore,
/// move them to the list of independent tables and load.
/// Repeat until we have some tables to load.
/// If we do not, then either all objects are loaded or there is cyclic dependency.
/// Complexity: O(V + E)
size_t level = 0;
do
{
assert(metadata.parsed_tables.size() == tables_processed + metadata.independent_database_objects.size() + getNumberOfTablesWithDependencies());
logDependencyGraph();
startLoadingIndependentTables(pool, level);
TableNames new_independent_database_objects;
for (const auto & table_name : metadata.independent_database_objects)
{
auto info_it = metadata.dependencies_info.find(table_name);
if (info_it == metadata.dependencies_info.end())
{
/// No tables depend on table_name and it was not even added to dependencies_info
continue;
}
removeResolvedDependency(info_it, new_independent_database_objects);
}
pool.wait();
metadata.independent_database_objects = std::move(new_independent_database_objects);
++level;
} while (!metadata.independent_database_objects.empty());
checkCyclicDependencies();
}
DependenciesInfosIter TablesLoader::removeResolvedDependency(const DependenciesInfosIter & info_it, TableNames & independent_database_objects)
{
auto & info = info_it->second;
if (info.dependencies_count)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} is in list of independent tables, but dependencies count is {}."
"It's a bug", info_it->first, info.dependencies_count);
if (info.dependent_database_objects.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not have dependent tables. It's a bug", info_it->first);
/// Decrement number of dependencies for each dependent table
for (auto & dependent_table : info.dependent_database_objects)
{
auto & dependent_info = metadata.dependencies_info[dependent_table];
auto & dependencies_count = dependent_info.dependencies_count;
if (dependencies_count == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to decrement 0 dependencies counter for {}. It's a bug", dependent_table);
--dependencies_count;
if (dependencies_count == 0)
{
independent_database_objects.push_back(dependent_table);
if (dependent_info.dependent_database_objects.empty())
metadata.dependencies_info.erase(dependent_table);
}
}
return metadata.dependencies_info.erase(info_it);
}
void TablesLoader::startLoadingIndependentTables(ThreadPool & pool, size_t level)
{
size_t total_tables = metadata.parsed_tables.size();
LOG_INFO(log, "Loading {} tables with {} dependency level", metadata.independent_database_objects.size(), level);
for (const auto & table_name : metadata.independent_database_objects)
{
pool.scheduleOrThrowOnError([this, total_tables, &table_name]()
{
const auto & path_and_query = metadata.parsed_tables[table_name];
databases[table_name.database]->loadTableFromMetadata(global_context, path_and_query.path, table_name, path_and_query.ast, force_restore);
logAboutProgress(log, ++tables_processed, total_tables, stopwatch);
});
}
}
size_t TablesLoader::getNumberOfTablesWithDependencies() const
{
size_t number_of_tables_with_dependencies = 0;
for (const auto & info : metadata.dependencies_info)
if (info.second.dependencies_count)
++number_of_tables_with_dependencies;
return number_of_tables_with_dependencies;
}
void TablesLoader::checkCyclicDependencies() const
{
/// Loading is finished if all dependencies are resolved
if (metadata.dependencies_info.empty())
return;
for (const auto & info : metadata.dependencies_info)
{
LOG_WARNING(log, "Cannot resolve dependencies: Table {} have {} dependencies and {} dependent tables. List of dependent tables: {}",
info.first, info.second.dependencies_count,
info.second.dependent_database_objects.size(), fmt::join(info.second.dependent_database_objects, ", "));
assert(info.second.dependencies_count == 0);
}
throw Exception(ErrorCodes::INFINITE_LOOP, "Cannot attach {} tables due to cyclic dependencies. "
"See server log for details.", metadata.dependencies_info.size());
}
void TablesLoader::logDependencyGraph() const
{
LOG_TEST(log, "Have {} independent tables: {}",
metadata.independent_database_objects.size(),
fmt::join(metadata.independent_database_objects, ", "));
for (const auto & dependencies : metadata.dependencies_info)
{
LOG_TEST(log,
"Table {} have {} dependencies and {} dependent tables. List of dependent tables: {}",
dependencies.first,
dependencies.second.dependencies_count,
dependencies.second.dependent_database_objects.size(),
fmt::join(dependencies.second.dependent_database_objects, ", "));
}
}
}

View File

@ -0,0 +1,112 @@
#pragma once
#include <Core/Types.h>
#include <Core/QualifiedTableName.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <map>
#include <unordered_map>
#include <unordered_set>
#include <mutex>
namespace Poco
{
class Logger;
}
class AtomicStopwatch;
namespace DB
{
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch);
class IDatabase;
using DatabasePtr = std::shared_ptr<IDatabase>;
struct ParsedTableMetadata
{
String path;
ASTPtr ast;
};
using ParsedMetadata = std::map<QualifiedTableName, ParsedTableMetadata>;
using TableNames = std::vector<QualifiedTableName>;
struct DependenciesInfo
{
/// How many dependencies this table have
size_t dependencies_count = 0;
/// List of tables/dictionaries which depend on this table/dictionary
TableNames dependent_database_objects;
};
using DependenciesInfos = std::unordered_map<QualifiedTableName, DependenciesInfo>;
using DependenciesInfosIter = std::unordered_map<QualifiedTableName, DependenciesInfo>::iterator;
struct ParsedTablesMetadata
{
String default_database;
std::mutex mutex;
ParsedMetadata parsed_tables;
/// For logging
size_t total_dictionaries = 0;
/// List of tables/dictionaries that do not have any dependencies and can be loaded
TableNames independent_database_objects;
/// Actually it contains two different maps (with, probably, intersecting keys):
/// 1. table/dictionary name -> number of dependencies
/// 2. table/dictionary name -> dependent tables/dictionaries list (adjacency list of dependencies graph).
/// If table A depends on table B, then there is an edge B --> A, i.e. dependencies_info[B].dependent_database_objects contains A.
/// And dependencies_info[C].dependencies_count is a number of incoming edges for vertex C (how many tables we have to load before C).
DependenciesInfos dependencies_info;
};
/// Loads tables (and dictionaries) from specified databases
/// taking into account dependencies between them.
class TablesLoader
{
public:
using Databases = std::map<String, DatabasePtr>;
TablesLoader(ContextMutablePtr global_context_, Databases databases_, bool force_restore_ = false, bool force_attach_ = false);
TablesLoader() = delete;
void loadTables();
void startupTables();
private:
ContextMutablePtr global_context;
Databases databases;
bool force_restore;
bool force_attach;
Strings databases_to_load;
ParsedTablesMetadata metadata;
Poco::Logger * log;
std::atomic<size_t> tables_processed{0};
AtomicStopwatch stopwatch;
ThreadPool pool;
void removeUnresolvableDependencies();
void loadTablesInTopologicalOrder(ThreadPool & pool);
DependenciesInfosIter removeResolvedDependency(const DependenciesInfosIter & info_it, TableNames & independent_database_objects);
void startLoadingIndependentTables(ThreadPool & pool, size_t level);
void checkCyclicDependencies() const;
size_t getNumberOfTablesWithDependencies() const;
void logDependencyGraph() const;
};
}

View File

@ -9,6 +9,7 @@ PEERDIR(
SRCS(
DDLDependencyVisitor.cpp
DatabaseAtomic.cpp
DatabaseDictionary.cpp
DatabaseFactory.cpp
@ -30,6 +31,7 @@ SRCS(
SQLite/DatabaseSQLite.cpp
SQLite/SQLiteUtils.cpp
SQLite/fetchSQLiteTableStructure.cpp
TablesLoader.cpp
)

View File

@ -2,6 +2,8 @@
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
#include "registerDictionaries.h"
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB
{
@ -13,19 +15,20 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & root_config_prefix,
Block & sample_block,
ContextPtr,
ContextPtr context,
const std::string & /* default_database */,
bool /* created_from_ddl */)
{
const auto config_prefix = root_config_prefix + ".mongodb";
auto configuration = getExternalDataSourceConfiguration(config, config_prefix, context);
return std::make_unique<MongoDBDictionarySource>(dict_struct,
config.getString(config_prefix + ".uri", ""),
config.getString(config_prefix + ".host", ""),
config.getUInt(config_prefix + ".port", 0),
config.getString(config_prefix + ".user", ""),
config.getString(config_prefix + ".password", ""),
configuration.host,
configuration.port,
configuration.username,
configuration.password,
config.getString(config_prefix + ".method", ""),
config.getString(config_prefix + ".db", ""),
configuration.database,
config.getString(config_prefix + ".collection"),
sample_block);
};

View File

@ -12,6 +12,8 @@
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Processors/Pipe.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB
{
@ -32,38 +34,43 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
[[maybe_unused]] const std::string & config_prefix,
[[maybe_unused]] Block & sample_block,
[[maybe_unused]] ContextPtr global_context,
const std::string & /* default_database */,
bool /* created_from_ddl */) -> DictionarySourcePtr {
const std::string & /* default_database */,
[[maybe_unused]] bool created_from_ddl) -> DictionarySourcePtr {
#if USE_MYSQL
StreamSettings mysql_input_stream_settings(global_context->getSettingsRef()
, config.getBool(config_prefix + ".mysql.close_connection", false) || config.getBool(config_prefix + ".mysql.share_connection", false)
, false
, config.getBool(config_prefix + ".mysql.fail_on_connection_loss", false) ? 1 : default_num_tries_on_connection_loss);
StreamSettings mysql_input_stream_settings(
global_context->getSettingsRef(),
config.getBool(config_prefix + ".mysql.close_connection", false) || config.getBool(config_prefix + ".mysql.share_connection", false),
false,
config.getBool(config_prefix + ".mysql.fail_on_connection_loss", false) ? 1 : default_num_tries_on_connection_loss);
auto settings_config_prefix = config_prefix + ".mysql";
auto table = config.getString(settings_config_prefix + ".table", "");
auto where = config.getString(settings_config_prefix + ".where", "");
auto configuration = getExternalDataSourceConfiguration(config, settings_config_prefix, global_context);
auto query = config.getString(settings_config_prefix + ".query", "");
if (query.empty() && table.empty())
if (query.empty() && configuration.table.empty())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "MySQL dictionary source configuration must contain table or query field");
MySQLDictionarySource::Configuration configuration
MySQLDictionarySource::Configuration dictionary_configuration
{
.db = config.getString(settings_config_prefix + ".db", ""),
.table = table,
.db = configuration.database,
.table = configuration.table,
.query = query,
.where = where,
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false)
};
auto pool = std::make_shared<mysqlxx::PoolWithFailover>(mysqlxx::PoolFactory::instance().get(config, settings_config_prefix));
std::shared_ptr<mysqlxx::PoolWithFailover> pool;
if (created_from_ddl)
{
std::vector<std::pair<String, UInt16>> addresses{std::make_pair(configuration.host, configuration.port)};
pool = std::make_shared<mysqlxx::PoolWithFailover>(configuration.database, addresses, configuration.username, configuration.password);
}
else
pool = std::make_shared<mysqlxx::PoolWithFailover>(mysqlxx::PoolFactory::instance().get(config, settings_config_prefix));
return std::make_unique<MySQLDictionarySource>(dict_struct, configuration, std::move(pool), sample_block, mysql_input_stream_settings);
return std::make_unique<MySQLDictionarySource>(dict_struct, dictionary_configuration, std::move(pool), sample_block, mysql_input_stream_settings);
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Dictionary source of type `mysql` is disabled because ClickHouse was built without mysql support.");

View File

@ -1,6 +1,7 @@
#include "PostgreSQLDictionarySource.h"
#include <Poco/Util/AbstractConfiguration.h>
#include <Core/QualifiedTableName.h>
#include "DictionarySourceFactory.h"
#include "registerDictionaries.h"
@ -10,6 +11,7 @@
#include <DataStreams/PostgreSQLSource.h>
#include "readInvalidateQuery.h"
#include <Interpreters/Context.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#endif
@ -29,19 +31,13 @@ namespace
{
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct, const String & schema, const String & table, const String & query, const String & where)
{
auto schema_value = schema;
auto table_value = table;
QualifiedTableName qualified_name{schema, table};
if (qualified_name.database.empty() && !qualified_name.table.empty())
qualified_name = QualifiedTableName::parseFromString(qualified_name.table);
if (schema_value.empty())
{
if (auto pos = table_value.find('.'); pos != std::string::npos)
{
schema_value = table_value.substr(0, pos);
table_value = table_value.substr(pos + 1);
}
}
/// Do not need db because it is already in a connection string.
return {dict_struct, "", schema_value, table_value, query, where, IdentifierQuotingStyle::DoubleQuotes};
return {dict_struct, "", qualified_name.database, qualified_name.table, query, where, IdentifierQuotingStyle::DoubleQuotes};
}
}
@ -182,22 +178,24 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
ContextPtr global_context,
ContextPtr context,
const std::string & /* default_database */,
bool /* created_from_ddl */) -> DictionarySourcePtr
{
#if USE_LIBPQXX
const auto settings_config_prefix = config_prefix + ".postgresql";
auto pool = std::make_shared<postgres::PoolWithFailover>(
config, settings_config_prefix,
global_context->getSettingsRef().postgresql_connection_pool_size,
global_context->getSettingsRef().postgresql_connection_pool_wait_timeout);
PostgreSQLDictionarySource::Configuration configuration
auto configuration = getExternalDataSourceConfigurationByPriority(config, settings_config_prefix, context);
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration.replicas_configurations,
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
PostgreSQLDictionarySource::Configuration dictionary_configuration
{
.db = config.getString(fmt::format("{}.db", settings_config_prefix), ""),
.schema = config.getString(fmt::format("{}.schema", settings_config_prefix), ""),
.table = config.getString(fmt::format("{}.table", settings_config_prefix), ""),
.db = configuration.database,
.schema = configuration.schema,
.table = configuration.table,
.query = config.getString(fmt::format("{}.query", settings_config_prefix), ""),
.where = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
.invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""),
@ -205,13 +203,13 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
.update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1)
};
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, configuration, pool, sample_block);
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, dictionary_configuration, pool, sample_block);
#else
(void)dict_struct;
(void)config;
(void)config_prefix;
(void)sample_block;
(void)global_context;
(void)context;
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Dictionary source of type `postgresql` is disabled because ClickHouse was built without postgresql support.");
#endif

View File

@ -38,29 +38,22 @@ namespace
const std::string & where_,
IXDBCBridgeHelper & bridge_)
{
std::string schema = schema_;
std::string table = table_;
QualifiedTableName qualified_name{schema_, table_};
if (bridge_.isSchemaAllowed())
{
if (schema.empty())
{
if (auto pos = table.find('.'); pos != std::string::npos)
{
schema = table.substr(0, pos);
table = table.substr(pos + 1);
}
}
if (qualified_name.database.empty())
qualified_name = QualifiedTableName::parseFromString(qualified_name.table);
}
else
{
if (!schema.empty())
if (!qualified_name.database.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Dictionary source of type {} specifies a schema but schema is not supported by {}-driver",
bridge_.getName());
}
return {dict_struct_, db_, schema, table, query_, where_, bridge_.getIdentifierQuotingStyle()};
return {dict_struct_, db_, qualified_name.database, qualified_name.table, query_, where_, bridge_.getIdentifierQuotingStyle()};
}
}

View File

@ -4,7 +4,6 @@
#include <Poco/DOM/Document.h>
#include <Poco/DOM/Element.h>
#include <Poco/DOM/Text.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/XMLConfiguration.h>
#include <IO/WriteHelpers.h>
#include <Parsers/queryToString.h>
@ -16,6 +15,8 @@
#include <Parsers/ASTDictionaryAttributeDeclaration.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Functions/FunctionFactory.h>
#include <Common/isLocalAddress.h>
#include <Interpreters/Context.h>
namespace DB
@ -576,4 +577,28 @@ getDictionaryConfigurationFromAST(const ASTCreateQuery & query, ContextPtr conte
return conf;
}
std::optional<ClickHouseDictionarySourceInfo>
getInfoIfClickHouseDictionarySource(DictionaryConfigurationPtr & config, ContextPtr global_context)
{
ClickHouseDictionarySourceInfo info;
String host = config->getString("dictionary.source.clickhouse.host", "");
UInt16 port = config->getUInt("dictionary.source.clickhouse.port", 0);
String database = config->getString("dictionary.source.clickhouse.db", "");
String table = config->getString("dictionary.source.clickhouse.table", "");
bool secure = config->getBool("dictionary.source.clickhouse.secure", false);
if (host.empty() || port == 0 || table.empty())
return {};
info.table_name = {database, table};
UInt16 default_port = secure ? global_context->getTCPPortSecure().value_or(0) : global_context->getTCPPort();
if (!isLocalAddress({host, port}, default_port))
return info;
info.is_local = true;
return info;
}
}

View File

@ -15,4 +15,13 @@ using DictionaryConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfigurati
DictionaryConfigurationPtr
getDictionaryConfigurationFromAST(const ASTCreateQuery & query, ContextPtr context, const std::string & database_ = "");
struct ClickHouseDictionarySourceInfo
{
QualifiedTableName table_name;
bool is_local = false;
};
std::optional<ClickHouseDictionarySourceInfo>
getInfoIfClickHouseDictionarySource(DictionaryConfigurationPtr & config, ContextPtr global_context);
}

View File

@ -112,23 +112,29 @@ public:
const String & uri_,
RemoteMetadata metadata_,
ContextPtr context_,
size_t buf_size_)
size_t buf_size_,
size_t backoff_threshold_,
size_t max_tries_)
: ReadIndirectBufferFromRemoteFS<ReadIndirectBufferFromWebServer>(metadata_)
, uri(uri_)
, context(context_)
, buf_size(buf_size_)
, backoff_threshold(backoff_threshold_)
, max_tries(max_tries_)
{
}
std::unique_ptr<ReadIndirectBufferFromWebServer> createReadBuffer(const String & path) override
{
return std::make_unique<ReadIndirectBufferFromWebServer>(fs::path(uri) / path, context, buf_size);
return std::make_unique<ReadIndirectBufferFromWebServer>(fs::path(uri) / path, context, buf_size, backoff_threshold, max_tries);
}
private:
String uri;
ContextPtr context;
size_t buf_size;
size_t backoff_threshold;
size_t max_tries;
};
@ -190,7 +196,8 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
RemoteMetadata meta(path, remote_path);
meta.remote_fs_objects.emplace_back(std::make_pair(remote_path, iter->second.size));
auto reader = std::make_unique<ReadBufferFromWebServer>(url, meta, getContext(), read_settings.remote_fs_buffer_size);
auto reader = std::make_unique<ReadBufferFromWebServer>(url, meta, getContext(),
read_settings.remote_fs_buffer_size, read_settings.remote_fs_backoff_threshold, read_settings.remote_fs_backoff_max_tries);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), min_bytes_for_seek);
}

Some files were not shown because too many files have changed in this diff Show More