Merge branch 'master' into custom-key-parallel-replicas

This commit is contained in:
Antonio Andelic 2023-02-17 15:06:41 +01:00 committed by GitHub
commit ab51c1d975
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
212 changed files with 3407 additions and 996 deletions

3
.gitmodules vendored
View File

@ -296,6 +296,9 @@
[submodule "contrib/libdivide"]
path = contrib/libdivide
url = https://github.com/ridiculousfish/libdivide
[submodule "contrib/ulid-c"]
path = contrib/ulid-c
url = https://github.com/ClickHouse/ulid-c.git
[submodule "contrib/aws-crt-cpp"]
path = contrib/aws-crt-cpp
url = https://github.com/ClickHouse/aws-crt-cpp

View File

@ -191,6 +191,8 @@ add_contrib (xxHash-cmake xxHash)
add_contrib (google-benchmark-cmake google-benchmark)
add_contrib (ulid-c-cmake ulid-c)
# Put all targets defined here and in subdirectories under "contrib/<immediate-subdir>" folders in GUI-based IDEs.
# Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they would not appear
# in "contrib/..." as originally planned, so we workaround this by fixing FOLDER properties of all targets manually,

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit 06a6610e6fb3385e22ad85014a67aa307825ffb1
Subproject commit ecccfc026a42b30023289410a67024d561f4bf3e

1
contrib/ulid-c vendored Submodule

@ -0,0 +1 @@
Subproject commit c433b6783cf918b8f996dacd014cb2b68c7de419

View File

@ -0,0 +1,16 @@
option (ENABLE_ULID "Enable ulid" ${ENABLE_LIBRARIES})
if (NOT ENABLE_ULID)
message(STATUS "Not using ulid")
return()
endif()
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/ulid-c")
set (SRCS
"${LIBRARY_DIR}/src/ulid.c"
)
add_library(_ulid ${SRCS})
target_include_directories(_ulid SYSTEM PUBLIC "${LIBRARY_DIR}/include")
add_library(ch_contrib::ulid ALIAS _ulid)

2
contrib/unixodbc vendored

@ -1 +1 @@
Subproject commit a2cd5395e8c7f7390025ec93af5bfebef3fb5fcd
Subproject commit 18e0ebe2a1fb53b9072ff60a558f6bd6ad2a0551

View File

@ -98,7 +98,7 @@ ccache_status
if [ -n "$MAKE_DEB" ]; then
# No quotes because I want it to expand to nothing if empty.
# shellcheck disable=SC2086
DESTDIR=/build/packages/root ninja $NINJA_FLAGS install
DESTDIR=/build/packages/root ninja $NINJA_FLAGS programs/install
cp /build/programs/clickhouse-diagnostics /build/packages/root/usr/bin
cp /build/programs/clickhouse-diagnostics /output
bash -x /build/packages/build

View File

@ -101,11 +101,7 @@ def run_docker_image_with_env(
def is_release_build(build_type, package_type, sanitizer):
return (
build_type == ""
and package_type == "deb"
and sanitizer == ""
)
return build_type == "" and package_type == "deb" and sanitizer == ""
def parse_env_variables(
@ -216,6 +212,12 @@ def parse_env_variables(
cmake_flags.append("-DCMAKE_INSTALL_PREFIX=/usr")
cmake_flags.append("-DCMAKE_INSTALL_SYSCONFDIR=/etc")
cmake_flags.append("-DCMAKE_INSTALL_LOCALSTATEDIR=/var")
# Reduce linking and building time by avoid *install/all dependencies
cmake_flags.append("-DCMAKE_SKIP_INSTALL_ALL_DEPENDENCY=ON")
# Add bridges to the build target
build_target = (
f"{build_target} clickhouse-odbc-bridge clickhouse-library-bridge"
)
if is_release_build(build_type, package_type, sanitizer):
cmake_flags.append("-DSPLIT_DEBUG_SYMBOLS=ON")
result.append("WITH_PERFORMANCE=1")
@ -305,7 +307,7 @@ def parse_env_variables(
cmake_flags.append("-DCLICKHOUSE_OFFICIAL_BUILD=1")
result.append('CMAKE_FLAGS="' + " ".join(cmake_flags) + '"')
result.append(f"BUILD_TARGET={build_target}")
result.append(f"BUILD_TARGET='{build_target}'")
return result

View File

@ -172,7 +172,19 @@ if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then
# so the container can't be finished by ctrl+c
CLICKHOUSE_WATCHDOG_ENABLE=${CLICKHOUSE_WATCHDOG_ENABLE:-0}
export CLICKHOUSE_WATCHDOG_ENABLE
exec /usr/bin/clickhouse su "${USER}:${GROUP}" /usr/bin/clickhouse-server --config-file="$CLICKHOUSE_CONFIG" "$@"
# An option for easy restarting and replacing clickhouse-server in a container, especially in Kubernetes.
# For example, you can replace the clickhouse-server binary to another and restart it while keeping the container running.
if [[ "${CLICKHOUSE_DOCKER_RESTART_ON_EXIT:-0}" -eq "1" ]]; then
while true; do
# This runs the server as a child process of the shell script:
/usr/bin/clickhouse su "${USER}:${GROUP}" /usr/bin/clickhouse-server --config-file="$CLICKHOUSE_CONFIG" "$@" ||:
echo >&2 'ClickHouse Server exited, and the environment variable CLICKHOUSE_DOCKER_RESTART_ON_EXIT is set to 1. Restarting the server.'
done
else
# This replaces the shell script with the server:
exec /usr/bin/clickhouse su "${USER}:${GROUP}" /usr/bin/clickhouse-server --config-file="$CLICKHOUSE_CONFIG" "$@"
fi
fi
# Otherwise, we assume the user want to run his own process, for example a `bash` shell to explore this image

View File

@ -645,7 +645,7 @@ if [ "$DISABLE_BC_CHECK" -ne "1" ]; then
-e "} <Error> TCPHandler: Code:" \
-e "} <Error> executeQuery: Code:" \
-e "Missing columns: 'v3' while processing query: 'v3, k, v1, v2, p'" \
-e "[Queue = DB::MergeMutateRuntimeQueue]: Code: 235. DB::Exception: Part" \
-e "[Queue = DB::DynamicRuntimeQueue]: Code: 235. DB::Exception: Part" \
-e "The set of parts restored in place of" \
-e "(ReplicatedMergeTreeAttachThread): Initialization failed. Error" \
-e "Code: 269. DB::Exception: Destination table is myself" \

View File

@ -226,7 +226,6 @@ if __name__ == "__main__":
)
parser.add_argument("--test-cmd", default="/usr/bin/clickhouse-test")
parser.add_argument("--skip-func-tests", default="")
parser.add_argument("--client-cmd", default="clickhouse-client")
parser.add_argument("--server-log-folder", default="/var/log/clickhouse-server")
parser.add_argument("--output-folder")
parser.add_argument("--global-time-limit", type=int, default=1800)
@ -294,7 +293,6 @@ if __name__ == "__main__":
# Use system database to avoid CREATE/DROP DATABASE queries
"--database=system",
"--hung-check",
"--stress",
"--report-logs-stats",
"00001_select_1",
]

View File

@ -0,0 +1,33 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v22.3.18.37-lts (fe512717551) FIXME as compared to v22.3.17.13-lts (fcc4de7e805)
#### Performance Improvement
* Backported in [#46372](https://github.com/ClickHouse/ClickHouse/issues/46372): Fix too big memory usage for vertical merges on non-remote disk. Respect `max_insert_delayed_streams_for_parallel_write` for the remote disk. [#46275](https://github.com/ClickHouse/ClickHouse/pull/46275) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Backported in [#46357](https://github.com/ClickHouse/ClickHouse/issues/46357): Allow using Vertical merge algorithm with parts in Compact format. This will allow ClickHouse server to use much less memory for background operations. This closes [#46084](https://github.com/ClickHouse/ClickHouse/issues/46084). [#46282](https://github.com/ClickHouse/ClickHouse/pull/46282) ([Anton Popov](https://github.com/CurtizJ)).
#### Build/Testing/Packaging Improvement
* Backported in [#45856](https://github.com/ClickHouse/ClickHouse/issues/45856): Fix zookeeper downloading, update the version, and optimize the image size. [#44853](https://github.com/ClickHouse/ClickHouse/pull/44853) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
* Backported in [#45620](https://github.com/ClickHouse/ClickHouse/issues/45620): Another fix for `Cannot read all data` error which could happen while reading `LowCardinality` dictionary from remote fs. Fixes [#44709](https://github.com/ClickHouse/ClickHouse/issues/44709). [#44875](https://github.com/ClickHouse/ClickHouse/pull/44875) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Backported in [#45549](https://github.com/ClickHouse/ClickHouse/issues/45549): Fix `SELECT ... FROM system.dictionaries` exception when there is a dictionary with a bad structure (e.g. incorrect type in xml config). [#45399](https://github.com/ClickHouse/ClickHouse/pull/45399) ([Aleksei Filatov](https://github.com/aalexfvk)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Automatically merge green backport PRs and green approved PRs [#41110](https://github.com/ClickHouse/ClickHouse/pull/41110) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Fix wrong approved_at, simplify conditions [#45302](https://github.com/ClickHouse/ClickHouse/pull/45302) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Get rid of artifactory in favor of r2 + ch-repos-manager [#45421](https://github.com/ClickHouse/ClickHouse/pull/45421) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Another attempt to fix automerge, or at least to have debug footprint [#45476](https://github.com/ClickHouse/ClickHouse/pull/45476) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Trim refs/tags/ from GITHUB_TAG in release workflow [#45636](https://github.com/ClickHouse/ClickHouse/pull/45636) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Add check for running workflows to merge_pr.py [#45803](https://github.com/ClickHouse/ClickHouse/pull/45803) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Get rid of progress timestamps in release publishing [#45818](https://github.com/ClickHouse/ClickHouse/pull/45818) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Add helping logging to auto-merge script [#46080](https://github.com/ClickHouse/ClickHouse/pull/46080) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Fix write buffer destruction order for vertical merge. [#46205](https://github.com/ClickHouse/ClickHouse/pull/46205) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).

View File

@ -56,13 +56,13 @@ Gcc cannot be used.
### Checkout ClickHouse Sources {#checkout-clickhouse-sources}
``` bash
git clone --recursive git@github.com:ClickHouse/ClickHouse.git
git clone --recursive --shallow-submodules git@github.com:ClickHouse/ClickHouse.git
```
or
``` bash
git clone --recursive https://github.com/ClickHouse/ClickHouse.git
git clone --recursive --shallow-submodules https://github.com/ClickHouse/ClickHouse.git
```
### Build ClickHouse {#build-clickhouse}

View File

@ -39,7 +39,7 @@ Next, you need to download the source files onto your working machine. This is c
In the command line terminal run:
git clone --recursive git@github.com:your_github_username/ClickHouse.git
git clone --recursive --shallow-submodules git@github.com:your_github_username/ClickHouse.git
cd ClickHouse
Note: please, substitute *your_github_username* with what is appropriate!
@ -67,7 +67,7 @@ It generally means that the SSH keys for connecting to GitHub are missing. These
You can also clone the repository via https protocol:
git clone --recursive https://github.com/ClickHouse/ClickHouse.git
git clone --recursive--shallow-submodules https://github.com/ClickHouse/ClickHouse.git
This, however, will not let you send your changes to the server. You can still use it temporarily and add the SSH keys later replacing the remote address of the repository with `git remote` command.

View File

@ -76,6 +76,7 @@ Engines in the family:
- [View](../../engines/table-engines/special/view.md#table_engines-view)
- [Memory](../../engines/table-engines/special/memory.md#memory)
- [Buffer](../../engines/table-engines/special/buffer.md#buffer)
- [KeeperMap](../../engines/table-engines/special/keepermap.md)
## Virtual Columns {#table_engines-virtual_columns}

View File

@ -84,3 +84,39 @@ You can also change any [rocksdb options](https://github.com/facebook/rocksdb/wi
</tables>
</rocksdb>
```
## Supported operations {#table_engine-EmbeddedRocksDB-supported-operations}
### Inserts
When new rows are inserted into `EmbeddedRocksDB`, if the key already exists, the value will be updated, otherwise a new key is created.
Example:
```sql
INSERT INTO test VALUES ('some key', 1, 'value', 3.2);
```
### Deletes
Rows can be deleted using `DELETE` query or `TRUNCATE`.
```sql
DELETE FROM test WHERE key LIKE 'some%' AND v1 > 1;
```
```sql
ALTER TABLE test DELETE WHERE key LIKE 'some%' AND v1 > 1;
```
```sql
TRUNCATE TABLE test;
```
### Updates
Values can be updated using the `ALTER TABLE` query. The primary key cannot be updated.
```sql
ALTER TABLE test UPDATE v1 = v1 * 10 + 2 WHERE key LIKE 'some%' AND v3 > 3.1;
```

View File

@ -0,0 +1,111 @@
---
slug: /en/engines/table-engines/special/keeper-map
sidebar_position: 150
sidebar_label: KeeperMap
---
# KeeperMap {#keepermap}
This engine allows you to use Keeper/ZooKeeper cluster as consistent key-value store with linearizable writes and sequentially consistent reads.
To enable KeeperMap storage engine, you need to define a ZooKeeper path where the tables will be stored using `<keeper_map_path_prefix>` config.
For example:
```xml
<clickhouse>
<keeper_map_path_prefix>/keeper_map_tables</keeper_map_path_prefix>
</clickhouse>
```
where path can be any other valid ZooKeeper path.
## Creating a Table {#table_engine-KeeperMap-creating-a-table}
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = KeeperMap(root_path, [keys_limit]) PRIMARY KEY(primary_key_name)
```
Engine parameters:
- `root_path` - ZooKeeper path where the `table_name` will be stored.
This path should not contain the prefix defined by `<keeper_map_path_prefix>` config because the prefix will be automatically appended to the `root_path`.
Additionally, format of `auxiliary_zookeper_cluster_name:/some/path` is also supported where `auxiliary_zookeper_cluster` is a ZooKeeper cluster defined inside `<auxiliary_zookeepers>` config.
By default, ZooKeeper cluster defined inside `<zookeeper>` config is used.
- `keys_limit` - number of keys allowed inside the table.
This limit is a soft limit and it can be possible that more keys will end up in the table for some edge cases.
- `primary_key_name` any column name in the column list.
- `primary key` must be specified, it supports only one column in the primary key. The primary key will be serialized in binary as a `node name` inside ZooKeeper.
- columns other than the primary key will be serialized to binary in corresponding order and stored as a value of the resulting node defined by the serialized key.
- queries with key `equals` or `in` filtering will be optimized to multi keys lookup from `Keeper`, otherwise all values will be fetched.
Example:
``` sql
CREATE TABLE keeper_map_table
(
`key` String,
`v1` UInt32,
`v2` String,
`v3` Float32
)
ENGINE = KeeperMap(/keeper_map_table, 4)
PRIMARY KEY key
```
with
```xml
<clickhouse>
<keeper_map_path_prefix>/keeper_map_tables</keeper_map_path_prefix>
</clickhouse>
```
Each value, which is binary serialization of `(v1, v2, v3)`, will be stored inside `/keeper_map_tables/keeper_map_table/data/serialized_key` in `Keeper`.
Additionally, number of keys will have a soft limit of 4 for the number of keys.
If multiple tables are created on the same ZooKeeper path, the values are persisted until there exists at least 1 table using it.
As a result, it is possible to use `ON CLUSTER` clause when creating the table and sharing the data from multiple ClickHouse instances.
Of course, it's possible to manually run `CREATE TABLE` with same path on nonrelated ClickHouse instances to have same data sharing effect.
## Supported operations {#table_engine-KeeperMap-supported-operations}
### Inserts
When new rows are inserted into `KeeperMap`, if the key already exists, the value will be updated, otherwise new key is created.
Example:
```sql
INSERT INTO keeper_map_table VALUES ('some key', 1, 'value', 3.2);
```
### Deletes
Rows can be deleted using `DELETE` query or `TRUNCATE`.
```sql
DELETE FROM keeper_map_table WHERE key LIKE 'some%' AND v1 > 1;
```
```sql
ALTER TABLE keeper_map_table DELETE WHERE key LIKE 'some%' AND v1 > 1;
```
```sql
TRUNCATE TABLE keeper_map_table;
```
### Updates
Values can be updated using `ALTER TABLE` query. Primary key cannot be updated.
```sql
ALTER TABLE keeper_map_table UPDATE v1 = v1 * 10 + 2 WHERE key LIKE 'some%' AND v3 > 3.1;
```

View File

@ -16,16 +16,13 @@ You can monitor:
## Resource Utilization {#resource-utilization}
ClickHouse does not monitor the state of hardware resources by itself.
It is highly recommended to set up monitoring for:
ClickHouse also monitors the state of hardware resources by itself such as:
- Load and temperature on processors.
You can use [dmesg](https://en.wikipedia.org/wiki/Dmesg), [turbostat](https://www.linux.org/docs/man8/turbostat.html) or other instruments.
- Utilization of storage system, RAM and network.
This data is collected in the `system.asynchronous_metric_log` table.
## ClickHouse Server Metrics {#clickhouse-server-metrics}
ClickHouse server has embedded instruments for self-state monitoring.

View File

@ -1012,6 +1012,24 @@ Default value: 2.
<background_merges_mutations_concurrency_ratio>3</background_merges_mutations_concurrency_ratio>
```
## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy}
Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart.
Could be applied from the `default` profile for backward compatibility.
Possible values:
- "round_robin" — Every concurrent merge and mutation is executed in round-robin order to ensure starvation-free operation. Smaller merges are completed faster than bigger ones just because they have fewer blocks to merge.
- "shortest_task_first" — Always execute smaller merge or mutation. Merges and mutations are assigned priorities based on their resulting size. Merges with smaller sizes are strictly preferred over bigger ones. This policy ensures the fastest possible merge of small parts but can lead to indefinite starvation of big merges in partitions heavily overloaded by INSERTs.
Default value: "round_robin".
**Example**
```xml
<background_merges_mutations_scheduling_policy>shortest_task_first</background_merges_mutations_scheduling_policy>
```
## background_move_pool_size {#background_move_pool_size}
Sets the number of threads performing background moves for tables with MergeTree engines. Could be increased at runtime and could be applied at server startup from the `default` profile for backward compatibility.

View File

@ -488,6 +488,23 @@ Possible values:
Default value: 0.
## group_by_use_nulls {#group_by_use_nulls}
Changes the way the [GROUP BY clause](/docs/en/sql-reference/statements/select/group-by.md) treats the types of aggregation keys.
When the `ROLLUP`, `CUBE`, or `GROUPING SETS` specifiers are used, some aggregation keys may not be used to produce some result rows.
Columns for these keys are filled with either default value or `NULL` in corresponding rows depending on this setting.
Possible values:
- 0 — The default value for the aggregation key type is used to produce missing values.
- 1 — ClickHouse executes `GROUP BY` the same way as the SQL standard says. The types of aggregation keys are converted to [Nullable](/docs/en/sql-reference/data-types/nullable.md/#data_type-nullable). Columns for corresponding aggregation keys are filled with [NULL](/docs/en/sql-reference/syntax.md) for rows that didn't use it.
Default value: 0.
See also:
- [GROUP BY clause](/docs/en/sql-reference/statements/select/group-by.md)
## partial_merge_join_optimizations {#partial_merge_join_optimizations}
Disables optimizations in partial merge join algorithm for [JOIN](../../sql-reference/statements/select/join.md) queries.

View File

@ -1205,6 +1205,8 @@ Formats a Time according to the given Format string. Format is a constant expres
formatDateTime uses MySQL datetime format style, refer to https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_date-format.
Alias: `DATE_FORMAT`.
**Syntax**
``` sql
@ -1220,29 +1222,39 @@ Using replacement fields, you can define a pattern for the resulting string. “
| Placeholder | Description | Example |
|----------|---------------------------------------------------------|------------|
| %a | abbreviated weekday name (Mon-Sun) | Mon |
| %b | abbreviated month name (Jan-Dec) | Jan |
| %c | month as a decimal number (01-12) | 01 |
| %C | year divided by 100 and truncated to integer (00-99) | 20 |
| %d | day of the month, zero-padded (01-31) | 02 |
| %D | Short MM/DD/YY date, equivalent to %m/%d/%y | 01/02/18 |
| %e | day of the month, space-padded ( 1-31) | &nbsp; 2 |
| %e | day of the month, space-padded (1-31) | &nbsp; 2 |
| %f | fractional second from the fractional part of DateTime64 | 1234560 |
| %F | short YYYY-MM-DD date, equivalent to %Y-%m-%d | 2018-01-02 |
| %G | four-digit year format for ISO week number, calculated from the week-based year [defined by the ISO 8601](https://en.wikipedia.org/wiki/ISO_8601#Week_dates) standard, normally useful only with %V | 2018 |
| %g | two-digit year format, aligned to ISO 8601, abbreviated from four-digit notation | 18 |
| %h | hour in 12h format (01-12) | 09 |
| %H | hour in 24h format (00-23) | 22 |
| %i | minute (00-59) | 33 |
| %I | hour in 12h format (01-12) | 10 |
| %j | day of the year (001-366) | 002 |
| %k | hour in 24h format (00-23) | 22 |
| %l | hour in 12h format (01-12) | 09 |
| %m | month as a decimal number (01-12) | 01 |
| %M | minute (00-59) | 33 |
| %n | new-line character () | |
| %p | AM or PM designation | PM |
| %Q | Quarter (1-4) | 1 |
| %r | 12-hour HH:MM AM/PM time, equivalent to %H:%M %p | 10:30 PM |
| %R | 24-hour HH:MM time, equivalent to %H:%M | 22:33 |
| %s | second (00-59) | 44 |
| %S | second (00-59) | 44 |
| %t | horizontal-tab character () | |
| %T | ISO 8601 time format (HH:MM:SS), equivalent to %H:%M:%S | 22:33:44 |
| %u | ISO 8601 weekday as number with Monday as 1 (1-7) | 2 |
| %V | ISO 8601 week number (01-53) | 01 |
| %w | weekday as a decimal number with Sunday as 0 (0-6) | 2 |
| %W | full weekday name (Monday-Sunday) | Monday |
| %y | Year, last two digits (00-99) | 18 |
| %Y | Year | 2018 |
| %z | Time offset from UTC as +HHMM or -HHMM | -0500 |

View File

@ -1841,6 +1841,10 @@ Result:
## catboostEvaluate(path_to_model, feature_1, feature_2, …, feature_n)
:::note
This function is not available in ClickHouse Cloud.
:::
Evaluate external catboost model. [CatBoost](https://catboost.ai) is an open-source gradient boosting library developed by Yandex for machine learing.
Accepts a path to a catboost model and model arguments (features). Returns Float64.

View File

@ -0,0 +1,53 @@
---
slug: /en/sql-reference/functions/ulid-functions
sidebar_position: 54
sidebar_label: ULID
---
# Functions for Working with ULID
## generateULID
Generates the [ULID](https://github.com/ulid/spec).
**Syntax**
``` sql
generateULID([x])
```
**Arguments**
- `x` — [Expression](../../sql-reference/syntax.md#syntax-expressions) resulting in any of the [supported data types](../../sql-reference/data-types/index.md#data_types). The resulting value is discarded, but the expression itself if used for bypassing [common subexpression elimination](../../sql-reference/functions/index.md#common-subexpression-elimination) if the function is called multiple times in one query. Optional parameter.
**Returned value**
The [FixedString](../data-types/fixedstring.md) type value.
**Usage example**
``` sql
SELECT generateULID()
```
``` text
┌─generateULID()─────────────┐
│ 01GNB2S2FGN2P93QPXDNB4EN2R │
└────────────────────────────┘
```
**Usage example if it is needed to generate multiple values in one row**
```sql
SELECT generateULID(1), generateULID(2)
```
``` text
┌─generateULID(1)────────────┬─generateULID(2)────────────┐
│ 01GNB2SGG4RHKVNT9ZGA4FFMNP │ 01GNB2SGG4V0HMQVH4VBVPSSRB │
└────────────────────────────┴────────────────────────────┘
```
## See Also
- [UUID](../../sql-reference/functions/uuid-functions.md)

View File

@ -9,7 +9,7 @@ sidebar_label: GROUP BY
- `GROUP BY` clause contains a list of expressions (or a single expression, which is considered to be the list of length one). This list acts as a “grouping key”, while each individual expression will be referred to as a “key expression”.
- All the expressions in the [SELECT](../../../sql-reference/statements/select/index.md), [HAVING](../../../sql-reference/statements/select/having), and [ORDER BY](../../../sql-reference/statements/select/order-by.md) clauses **must** be calculated based on key expressions **or** on [aggregate functions](../../../sql-reference/aggregate-functions/index.md) over non-key expressions (including plain columns). In other words, each column selected from the table must be used either in a key expression or inside an aggregate function, but not both.
- Result of aggregating `SELECT` query will contain as many rows as there were unique values of “grouping key” in source table. Usually this signficantly reduces the row count, often by orders of magnitude, but not necessarily: row count stays the same if all “grouping key” values were distinct.
- Result of aggregating `SELECT` query will contain as many rows as there were unique values of “grouping key” in source table. Usually, this significantly reduces the row count, often by orders of magnitude, but not necessarily: row count stays the same if all “grouping key” values were distinct.
When you want to group data in the table by column numbers instead of column names, enable the setting [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
@ -115,6 +115,10 @@ The same query also can be written using `WITH` keyword.
SELECT year, month, day, count(*) FROM t GROUP BY year, month, day WITH ROLLUP;
```
**See also**
- [group_by_use_nulls](/docs/en/operations/settings/settings.md#group_by_use_nulls) setting for SQL standard compatibility.
## CUBE Modifier
`CUBE` modifier is used to calculate subtotals for every combination of the key expressions in the `GROUP BY` list. The subtotals rows are added after the result table.
@ -206,6 +210,9 @@ The same query also can be written using `WITH` keyword.
SELECT year, month, day, count(*) FROM t GROUP BY year, month, day WITH CUBE;
```
**See also**
- [group_by_use_nulls](/docs/en/operations/settings/settings.md#group_by_use_nulls) setting for SQL standard compatibility.
## WITH TOTALS Modifier
@ -321,12 +328,12 @@ For every different key value encountered, `GROUP BY` calculates a set of aggreg
## GROUPING SETS modifier
This is the most general modifier.
This modifier allows to manually specify several aggregation key sets (grouping sets).
Aggregation is performed separately for each grouping set, after that all results are combined.
This modifier allows manually specifying several aggregation key sets (grouping sets).
Aggregation is performed separately for each grouping set, and after that, all results are combined.
If a column is not presented in a grouping set, it's filled with a default value.
In other words, modifiers described above can be represented via `GROUPING SETS`.
Despite the fact that queries with `ROLLUP`, `CUBE` and `GROUPING SETS` modifiers are syntactically equal, they may have different performance.
Despite the fact that queries with `ROLLUP`, `CUBE` and `GROUPING SETS` modifiers are syntactically equal, they may perform differently.
When `GROUPING SETS` try to execute everything in parallel, `ROLLUP` and `CUBE` are executing the final merging of the aggregates in a single thread.
In the situation when source columns contain default values, it might be hard to distinguish if a row is a part of the aggregation which uses those columns as keys or not.
@ -351,6 +358,10 @@ GROUPING SETS
);
```
**See also**
- [group_by_use_nulls](/docs/en/operations/settings/settings.md#group_by_use_nulls) setting for SQL standard compatibility.
## Implementation Details
Aggregation is one of the most important features of a column-oriented DBMS, and thus its implementation is one of the most heavily optimized parts of ClickHouse. By default, aggregation is done in memory using a hash-table. It has 40+ specializations that are chosen automatically depending on “grouping key” data types.

View File

@ -26,3 +26,44 @@ The `PREWHERE` section is executed before `FINAL`, so the results of `FROM ... F
## Limitations
`PREWHERE` is only supported by tables from the [*MergeTree](../../../engines/table-engines/mergetree-family/index.md) family.
## Example
```sql
CREATE TABLE mydata
(
`A` Int64,
`B` Int8,
`C` String
)
ENGINE = MergeTree
ORDER BY A AS
SELECT
number,
0,
if(number between 1000 and 2000, 'x', toString(number))
FROM numbers(10000000);
SELECT count()
FROM mydata
WHERE (B = 0) AND (C = 'x');
1 row in set. Elapsed: 0.074 sec. Processed 10.00 million rows, 168.89 MB (134.98 million rows/s., 2.28 GB/s.)
-- let's enable tracing to see which predicate are moved to PREWHERE
set send_logs_level='debug';
MergeTreeWhereOptimizer: condition "B = 0" moved to PREWHERE
-- Clickhouse moves automatically `B = 0` to PREWHERE, but it has no sense because B is always 0.
-- Let's move other predicate `C = 'x'`
SELECT count()
FROM mydata
PREWHERE C = 'x'
WHERE B = 0;
1 row in set. Elapsed: 0.069 sec. Processed 10.00 million rows, 158.89 MB (144.90 million rows/s., 2.30 GB/s.)
-- This query with manual `PREWHERE` processes slightly less data: 158.89 MB VS 168.89 MB
```

View File

@ -27,18 +27,21 @@ A table with the specified structure for reading or writing data in the specifie
**Examples**
Select the data from all files in the cluster `cluster_simple`:
Select the data from all the files in the `/root/data/clickhouse` and `/root/data/database/` folders, using all the nodes in the `cluster_simple` cluster:
``` sql
SELECT * FROM s3Cluster('cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon);
SELECT * FROM s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio',
'minio123',
'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon
);
```
Count the total amount of rows in all files in the cluster `cluster_simple`:
``` sql
SELECT count(*) FROM s3Cluster('cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))');
```
:::warning
If your listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`.
:::

View File

@ -21,6 +21,7 @@ ClickHouse supports the standard grammar for defining windows and window functio
| `lag/lead(value, offset)` | Not supported. Workarounds: |
| | 1) replace with `any(value) over (.... rows between <offset> preceding and <offset> preceding)`, or `following` for `lead` |
| | 2) use `lagInFrame/leadInFrame`, which are analogous, but respect the window frame. To get behavior identical to `lag/lead`, use `rows between unbounded preceding and unbounded following` |
| ntile(buckets) | Supported. Specify window like, (partition by x order by y rows between unbounded preceding and unounded following). |
## ClickHouse-specific Window Functions

View File

@ -61,7 +61,7 @@ ClickHouse 中的物化视图更像是插入触发器。 如果视图查询中
请注意,物化视图受[optimize_on_insert](../../../operations/settings/settings.md#optimize-on-insert)设置的影响。 在插入视图之前合并数据。
视图看起来与普通表相同。 例如,它们列在1SHOW TABLES1查询的结果中。
视图看起来与普通表相同。 例如,它们列在`SHOW TABLES`查询的结果中。
删除视图,使用[DROP VIEW](../../../sql-reference/statements/drop#drop-view). `DROP TABLE`也适用于视图。

View File

@ -17,7 +17,8 @@ User=clickhouse
Group=clickhouse
Restart=always
RestartSec=30
RuntimeDirectory=%p # %p is resolved to the systemd unit name
# %p is resolved to the systemd unit name
RuntimeDirectory=%p
ExecStart=/usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml --pid-file=%t/%p/%p.pid
# Minus means that this file is optional.
EnvironmentFile=-/etc/default/%p

View File

@ -59,6 +59,8 @@ option (ENABLE_CLICKHOUSE_SU "A tool similar to 'su'" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_DISKS "A tool to manage disks" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_REPORT "A tiny tool to collect a clickhouse-server state" ${ENABLE_CLICKHOUSE_ALL})
if (NOT ENABLE_NURAFT)
# RECONFIGURE_MESSAGE_LEVEL should not be used here,
# since ENABLE_NURAFT is set to OFF for FreeBSD and Darwin.
@ -370,6 +372,9 @@ if (ENABLE_CLICKHOUSE_SU)
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-su" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-su)
endif ()
if (ENABLE_CLICKHOUSE_REPORT)
include(${ClickHouse_SOURCE_DIR}/utils/report/CMakeLists.txt)
endif ()
if (ENABLE_CLICKHOUSE_KEEPER)
if (NOT BUILD_STANDALONE_KEEPER AND CREATE_KEEPER_SYMLINK)

View File

@ -69,6 +69,7 @@ namespace ErrorCodes
extern const int TOO_DEEP_RECURSION;
extern const int NETWORK_ERROR;
extern const int AUTHENTICATION_FAILED;
extern const int NO_ELEMENTS_IN_CONFIG;
}
@ -134,29 +135,34 @@ void Client::parseConnectionsCredentials()
if (hosts_and_ports.size() >= 2)
return;
String host;
std::optional<UInt16> port;
std::optional<String> host;
if (hosts_and_ports.empty())
{
host = config().getString("host", "localhost");
if (config().has("port"))
port = config().getInt("port");
if (config().has("host"))
host = config().getString("host");
}
else
{
host = hosts_and_ports.front().host;
port = hosts_and_ports.front().port;
}
String connection;
if (config().has("connection"))
connection = config().getString("connection");
else
connection = host.value_or("localhost");
Strings keys;
config().keys("connections_credentials", keys);
for (const auto & connection : keys)
bool connection_found = false;
for (const auto & key : keys)
{
const String & prefix = "connections_credentials." + connection;
const String & prefix = "connections_credentials." + key;
const String & connection_name = config().getString(prefix + ".name", "");
if (connection_name != host)
if (connection_name != connection)
continue;
connection_found = true;
String connection_hostname;
if (config().has(prefix + ".hostname"))
@ -164,14 +170,9 @@ void Client::parseConnectionsCredentials()
else
connection_hostname = connection_name;
/// Set "host" unconditionally (since it is used as a "name"), while
/// other options only if they are not set yet (config.xml/cli
/// options).
config().setString("host", connection_hostname);
if (!hosts_and_ports.empty())
hosts_and_ports.front().host = connection_hostname;
if (config().has(prefix + ".port") && !port.has_value())
if (hosts_and_ports.empty())
config().setString("host", connection_hostname);
if (config().has(prefix + ".port") && hosts_and_ports.empty())
config().setInt("port", config().getInt(prefix + ".port"));
if (config().has(prefix + ".secure") && !config().has("secure"))
config().setBool("secure", config().getBool(prefix + ".secure"));
@ -189,6 +190,9 @@ void Client::parseConnectionsCredentials()
config().setString("history_file", history_file);
}
}
if (config().has("connection") && !connection_found)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "No such connection '{}' in connections_credentials", connection);
}
/// Make query to get all server warnings
@ -955,6 +959,7 @@ void Client::addOptions(OptionsDescription & options_description)
/// Main commandline options related to client functionality and all parameters from Settings.
options_description.main_description->add_options()
("config,c", po::value<std::string>(), "config-file path (another shorthand)")
("connection", po::value<std::string>(), "connection to use (from the client config), by default connection name is hostname")
("secure,s", "Use TLS connection")
("user,u", po::value<std::string>()->default_value("default"), "user")
/** If "--password [value]" is used but the value is omitted, the bad argument exception will be thrown.
@ -1095,6 +1100,8 @@ void Client::processOptions(const OptionsDescription & options_description,
if (options.count("config"))
config().setString("config-file", options["config"].as<std::string>());
if (options.count("connection"))
config().setString("connection", options["connection"].as<std::string>());
if (options.count("interleave-queries-file"))
interleave_queries_files = options["interleave-queries-file"].as<std::vector<std::string>>();
if (options.count("secure"))

View File

@ -1225,8 +1225,8 @@ TaskStatus ClusterCopier::iterateThroughAllPiecesInPartition(const ConnectionTim
std::this_thread::sleep_for(retry_delay_ms);
}
was_active_pieces = (res == TaskStatus::Active);
was_failed_pieces = (res == TaskStatus::Error);
was_active_pieces |= (res == TaskStatus::Active);
was_failed_pieces |= (res == TaskStatus::Error);
}
if (was_failed_pieces)

View File

@ -1282,6 +1282,8 @@ try
auto new_pool_size = config->getUInt64("background_pool_size", 16);
auto new_ratio = config->getUInt64("background_merges_mutations_concurrency_ratio", 2);
global_context->getMergeMutateExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size * new_ratio);
auto new_scheduling_policy = config->getString("background_merges_mutations_scheduling_policy", "round_robin");
global_context->getMergeMutateExecutor()->updateSchedulingPolicy(new_scheduling_policy);
}
if (global_context->areBackgroundExecutorsInitialized() && config->has("background_move_pool_size"))

View File

@ -339,6 +339,7 @@
<background_buffer_flush_schedule_pool_size>16</background_buffer_flush_schedule_pool_size>
<background_pool_size>16</background_pool_size>
<background_merges_mutations_concurrency_ratio>2</background_merges_mutations_concurrency_ratio>
<background_merges_mutations_scheduling_policy>round_robin</background_merges_mutations_scheduling_policy>
<background_move_pool_size>8</background_move_pool_size>
<background_fetches_pool_size>8</background_fetches_pool_size>
<background_common_pool_size>8</background_common_pool_size>

View File

@ -92,7 +92,52 @@
.chart div { position: absolute; }
.inputs { font-size: 14pt; }
.inputs {
height: auto;
width: 100%;
font-size: 14pt;
display: flex;
flex-flow: column nowrap;
justify-content: center;
}
.inputs.unconnected {
height: 100vh;
}
.unconnected #params {
display: flex;
flex-flow: column nowrap;
justify-content: center;
align-items: center;
}
.unconnected #connection-params {
width: 50%;
display: flex;
flex-flow: row wrap;
}
.unconnected #url {
width: 100%;
}
.unconnected #user {
width: 50%;
}
.unconnected #password {
width: 49.5%;
}
.unconnected input {
margin-bottom: 5px;
}
.inputs #chart-params {
display: block;
}
.inputs.unconnected #chart-params {
display: none;
}
#connection-params {
margin-bottom: 0.5rem;
@ -223,6 +268,10 @@
color: var(--chart-button-hover-color);
}
.disabled {
opacity: 0.5;
}
.query-editor {
display: none;
grid-template-columns: auto fit-content(10%);
@ -286,7 +335,7 @@
</style>
</head>
<body>
<div class="inputs">
<div class="inputs unconnected">
<form id="params">
<div id="connection-params">
<input spellcheck="false" id="url" type="text" value="" placeholder="URL" />
@ -294,8 +343,8 @@
<input spellcheck="false" id="password" type="password" placeholder="password" />
</div>
<div>
<input id="reload" type="button" value="Reload" style="display: none;">
<input id="add" type="button" value="Add chart">
<input id="reload" type="button" value="Reload">
<input id="add" type="button" value="Add chart" style="display: none;">
<span class="nowrap themes"><span id="toggle-dark">🌚</span><span id="toggle-light">🌞</span></span>
<div id="chart-params"></div>
</div>
@ -845,7 +894,7 @@ async function draw(idx, chart, url_params, query) {
error_div.firstChild.data = error;
title_div.style.display = 'none';
error_div.style.display = 'block';
return;
return false;
} else {
error_div.firstChild.data = '';
error_div.style.display = 'none';
@ -886,6 +935,7 @@ async function draw(idx, chart, url_params, query) {
/// Set title
const title = queries[idx] && queries[idx].title ? queries[idx].title.replaceAll(/\{(\w+)\}/g, (_, name) => params[name] ) : '';
chart.querySelector('.title').firstChild.data = title;
return true
}
function showAuthError(message) {
@ -902,8 +952,6 @@ function showAuthError(message) {
function hideAuthError() {
const charts = document.querySelector('#charts');
charts.style.display = 'flex';
const add = document.querySelector('#add');
add.style.display = 'block';
const authError = document.querySelector('#auth-error');
authError.textContent = '';
@ -924,9 +972,20 @@ async function drawAll() {
if (!firstLoad) {
showAuthError(e.message);
}
return false;
});
})).then(() => {
firstLoad = false;
})).then((results) => {
if (firstLoad) {
firstLoad = false;
} else {
enableReloadButton();
}
if (!results.includes(false)) {
const element = document.querySelector('.inputs');
element.classList.remove('unconnected');
const add = document.querySelector('#add');
add.style.display = 'block';
}
})
}
@ -941,11 +1000,25 @@ function resize() {
new ResizeObserver(resize).observe(document.body);
function disableReloadButton() {
const reloadButton = document.getElementById('reload')
reloadButton.value = 'Reloading...'
reloadButton.disabled = true
reloadButton.classList.add('disabled')
}
function enableReloadButton() {
const reloadButton = document.getElementById('reload')
reloadButton.value = 'Reload'
reloadButton.disabled = false
reloadButton.classList.remove('disabled')
}
function reloadAll() {
updateParams();
drawAll();
saveState();
document.getElementById('reload').style.display = 'none';
disableReloadButton()
}
document.getElementById('params').onsubmit = function(event) {

View File

@ -550,6 +550,10 @@ if (ENABLE_NLP)
dbms_target_link_libraries (PUBLIC ch_contrib::nlp_data)
endif()
if (TARGET ch_contrib::ulid)
dbms_target_link_libraries (PUBLIC ch_contrib::ulid)
endif()
if (TARGET ch_contrib::bzip2)
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::bzip2)
endif()

View File

@ -43,7 +43,7 @@ Suggest::Suggest()
"IN", "KILL", "QUERY", "SYNC", "ASYNC", "TEST", "BETWEEN", "TRUNCATE", "USER", "ROLE",
"PROFILE", "QUOTA", "POLICY", "ROW", "GRANT", "REVOKE", "OPTION", "ADMIN", "EXCEPT", "REPLACE",
"IDENTIFIED", "HOST", "NAME", "READONLY", "WRITABLE", "PERMISSIVE", "FOR", "RESTRICTIVE", "RANDOMIZED",
"INTERVAL", "LIMITS", "ONLY", "TRACKING", "IP", "REGEXP", "ILIKE",
"INTERVAL", "LIMITS", "ONLY", "TRACKING", "IP", "REGEXP", "ILIKE", "CLEANUP"
});
}

View File

@ -56,3 +56,4 @@
#cmakedefine01 USE_BLAKE3
#cmakedefine01 USE_SKIM
#cmakedefine01 USE_OPENSSL_INTREE
#cmakedefine01 USE_ULID

View File

@ -109,6 +109,10 @@ IMPLEMENT_SETTING_ENUM_WITH_RENAME(DefaultTableEngine, ErrorCodes::BAD_ARGUMENTS
{"ReplicatedReplacingMergeTree", DefaultTableEngine::ReplicatedReplacingMergeTree},
{"Memory", DefaultTableEngine::Memory}})
IMPLEMENT_SETTING_ENUM(CleanDeletedRows, ErrorCodes::BAD_ARGUMENTS,
{{"Never", CleanDeletedRows::Never},
{"Always", CleanDeletedRows::Always}})
IMPLEMENT_SETTING_MULTI_ENUM(MySQLDataTypesSupport, ErrorCodes::UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL,
{{"decimal", MySQLDataTypesSupport::DECIMAL},
{"datetime64", MySQLDataTypesSupport::DATETIME64},

View File

@ -121,6 +121,14 @@ enum class DefaultTableEngine
DECLARE_SETTING_ENUM(DefaultTableEngine)
enum class CleanDeletedRows
{
Never = 0, /// Disable.
Always,
};
DECLARE_SETTING_ENUM(CleanDeletedRows)
enum class MySQLDataTypesSupport
{
DECIMAL, // convert MySQL's decimal and number to ClickHouse Decimal when applicable

View File

@ -1116,8 +1116,7 @@ void BaseDaemon::setupWatchdog()
logger().information("Child process no longer exists.");
_exit(WEXITSTATUS(status));
}
if (WIFEXITED(status))
else if (WIFEXITED(status))
{
logger().information(fmt::format("Child process exited normally with code {}.", WEXITSTATUS(status)));
_exit(WEXITSTATUS(status));

View File

@ -1,6 +1,6 @@
#pragma once
#include <Processors/Transforms/MongoDBSource.h>
#include <Processors/Sources/MongoDBSource.h>
#include <Core/Block.h>
#include "DictionaryStructure.h"

View File

@ -8,7 +8,7 @@
#if USE_LIBPQXX
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <Processors/Transforms/PostgreSQLSource.h>
#include <Processors/Sources/PostgreSQLSource.h>
#include "readInvalidateQuery.h"
#include <Interpreters/Context.h>
#include <QueryPipeline/QueryPipeline.h>

View File

@ -192,6 +192,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.bson.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_bson_skip_fields_with_unsupported_types_in_schema_inference;
format_settings.max_binary_string_size = settings.format_binary_max_string_size;
format_settings.max_parser_depth = context->getSettingsRef().max_parser_depth;
format_settings.client_protocol_version = context->getClientProtocolVersion();
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (format_settings.schema.is_server)

View File

@ -82,6 +82,7 @@ struct FormatSettings
Float32 input_allow_errors_ratio = 0;
UInt64 max_binary_string_size = 0;
UInt64 client_protocol_version = 0;
UInt64 max_parser_depth = DBMS_DEFAULT_MAX_PARSER_DEPTH;

View File

@ -342,12 +342,51 @@ private:
return writeNumber2(dest, ToMonthImpl::execute(source, timezone));
}
static size_t monthOfYearText(char * dest, Time source, bool abbreviate, UInt64, UInt32, const DateLUTImpl & timezone)
{
auto month = ToMonthImpl::execute(source, timezone);
std::string_view str_view = abbreviate ? monthsShort[month - 1] : monthsFull[month - 1];
memcpy(dest, str_view.data(), str_view.size());
return str_view.size();
}
static size_t mysqlMonthOfYearTextShort(char * dest, Time source, UInt64 fractional_second, UInt32 scale, const DateLUTImpl & timezone)
{
return monthOfYearText(dest, source, true, fractional_second, scale, timezone);
}
static size_t mysqlMonthOfYearTextLong(char * dest, Time source, UInt64 fractional_second, UInt32 scale, const DateLUTImpl & timezone)
{
return monthOfYearText(dest, source, false, fractional_second, scale, timezone);
}
static size_t mysqlDayOfWeek(char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
{
*dest = '0' + ToDayOfWeekImpl::execute(source, 0, timezone);
return 1;
}
static size_t dayOfWeekText(char * dest, Time source, bool abbreviate, UInt64, UInt32, const DateLUTImpl & timezone)
{
auto week_day = ToDayOfWeekImpl::execute(source, 0, timezone);
if (week_day == 7)
week_day = 0;
std::string_view str_view = abbreviate ? weekdaysShort[week_day] : weekdaysFull[week_day];
memcpy(dest, str_view.data(), str_view.size());
return str_view.size();
}
static size_t mysqlDayOfWeekTextShort(char * dest, Time source, UInt64 fractional_second, UInt32 scale, const DateLUTImpl & timezone)
{
return dayOfWeekText(dest, source, true, fractional_second, scale, timezone);
}
static size_t mysqlDayOfWeekTextLong(char * dest, Time source, UInt64 fractional_second, UInt32 scale, const DateLUTImpl & timezone)
{
return dayOfWeekText(dest, source, false, fractional_second, scale, timezone);
}
static size_t mysqlDayOfWeek0To6(char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
{
auto day = ToDayOfWeekImpl::execute(source, 0, timezone);
@ -411,6 +450,16 @@ private:
return 5;
}
static size_t mysqlHHMM12(char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
{
auto hour = ToHourImpl::execute(source, timezone);
writeNumber2(dest, hour == 0 ? 12 : (hour > 12 ? hour - 12 : hour));
writeNumber2(dest + 3, ToMinuteImpl::execute(source, timezone));
dest[6] = hour >= 12 ? 'P' : 'A';
return 8;
}
static size_t mysqlSecond(char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
{
return writeNumber2(dest, ToSecondImpl::execute(source, timezone));
@ -503,15 +552,10 @@ private:
return writeNumberWithPadding(dest, week_day, min_represent_digits);
}
static size_t jodaDayOfWeekText(size_t min_represent_digits, char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
static size_t jodaDayOfWeekText(size_t min_represent_digits, char * dest, Time source, UInt64 fractional_second, UInt32 scale, const DateLUTImpl & timezone)
{
auto week_day = ToDayOfWeekImpl::execute(source, 0, timezone);
if (week_day == 7)
week_day = 0;
std::string_view str_view = min_represent_digits <= 3 ? weekdaysShort[week_day] : weekdaysFull[week_day];
memcpy(dest, str_view.data(), str_view.size());
return str_view.size();
bool abbreviate = min_represent_digits <= 3;
return dayOfWeekText(dest, source, abbreviate, fractional_second, scale, timezone);
}
static size_t jodaYear(size_t min_represent_digits, char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
@ -551,12 +595,10 @@ private:
return writeNumberWithPadding(dest, month_of_year, min_represent_digits);
}
static size_t jodaMonthOfYearText(size_t min_represent_digits, char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
static size_t jodaMonthOfYearText(size_t min_represent_digits, char * dest, Time source, UInt64 fractional_second, UInt32 scale, const DateLUTImpl & timezone)
{
auto month = ToMonthImpl::execute(source, timezone);
std::string_view str_view = min_represent_digits <= 3 ? monthsShort[month - 1] : monthsFull[month - 1];
memcpy(dest, str_view.data(), str_view.size());
return str_view.size();
bool abbreviate = min_represent_digits <= 3;
return monthOfYearText(dest, source, abbreviate, fractional_second, scale, timezone);
}
static size_t jodaDayOfMonth(size_t min_represent_digits, char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
@ -909,6 +951,24 @@ public:
switch (*pos)
{
// Abbreviated weekday [Mon...Sun]
case 'a':
instructions.emplace_back(&Action<T>::mysqlDayOfWeekTextShort);
out_template += "Mon";
break;
// Abbreviated month [Jan...Dec]
case 'b':
instructions.emplace_back(&Action<T>::mysqlMonthOfYearTextShort);
out_template += "Jan";
break;
// Month as a decimal number (01-12)
case 'c':
instructions.emplace_back(&Action<T>::mysqlMonth);
out_template += "00";
break;
// Year, divided by 100, zero-padded
case 'C':
instructions.emplace_back(&Action<T>::mysqlCentury);
@ -990,6 +1050,12 @@ public:
out_template += "0";
break;
// Full weekday [Monday...Sunday]
case 'W':
instructions.emplace_back(&Action<T>::mysqlDayOfWeekTextLong);
out_template += "Monday";
break;
// Two digits year
case 'y':
instructions.emplace_back(&Action<T>::mysqlYear2);
@ -1028,65 +1094,102 @@ public:
out_template += "AM";
break;
// 24-hour HH:MM time, equivalent to %H:%M 14:55
// 12-hour HH:MM time, equivalent to %h:%i %p 2:55 PM
case 'r':
add_instruction_or_extra_shift(&Action<T>::mysqlHHMM12, 8);
out_template += "12:00 AM";
break;
// 24-hour HH:MM time, equivalent to %H:%i 14:55
case 'R':
add_instruction_or_extra_shift(&Action<T>::mysqlHHMM24, 5);
out_template += "00:00";
break;
// Seconds
case 's':
add_instruction_or_extra_shift(&Action<T>::mysqlSecond, 2);
out_template += "00";
break;
// Seconds
case 'S':
add_instruction_or_extra_shift(&Action<T>::mysqlSecond, 2);
out_template += "00";
break;
// ISO 8601 time format (HH:MM:SS), equivalent to %H:%M:%S 14:55:02
// ISO 8601 time format (HH:MM:SS), equivalent to %H:%i:%S 14:55:02
case 'T':
add_instruction_or_extra_shift(&Action<T>::mysqlISO8601Time, 8);
out_template += "00:00:00";
break;
// Hour in 12h format (01-12)
case 'h':
add_instruction_or_extra_shift(&Action<T>::mysqlHour12, 2);
out_template += "12";
break;
// Hour in 24h format (00-23)
case 'H':
add_instruction_or_extra_shift(&Action<T>::mysqlHour24, 2);
out_template += "00";
break;
// Minute of hour range [0, 59]
case 'i':
add_instruction_or_extra_shift(&Action<T>::mysqlMinute, 2);
out_template += "00";
break;
// Hour in 12h format (01-12)
case 'I':
add_instruction_or_extra_shift(&Action<T>::mysqlHour12, 2);
out_template += "12";
break;
/// Escaped literal characters.
case '%':
add_extra_shift(1);
out_template += "%";
// Hour in 24h format (00-23)
case 'k':
add_instruction_or_extra_shift(&Action<T>::mysqlHour24, 2);
out_template += "00";
break;
// Hour in 12h format (01-12)
case 'l':
add_instruction_or_extra_shift(&Action<T>::mysqlHour12, 2);
out_template += "12";
break;
case 't':
add_extra_shift(1);
out_template += "\t";
break;
case 'n':
add_extra_shift(1);
out_template += "\n";
break;
// Escaped literal characters.
case '%':
add_extra_shift(1);
out_template += "%";
break;
// Unimplemented
case 'U':
[[fallthrough]];
case 'W':
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Wrong syntax '{}', symbol '{}' is not implemented for function {}",
format,
*pos,
getName());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "format is not supported for WEEK (Sun-Sat)");
case 'v':
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "format is not supported for WEEK (Mon-Sun)");
case 'x':
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "format is not supported for YEAR for week (Mon-Sun)");
case 'X':
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "format is not supported for YEAR for week (Sun-Sat)");
default:
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Wrong syntax '{}', unexpected symbol '{}' for function {}",
ErrorCodes::BAD_ARGUMENTS,
"Incorrect syntax '{}', symbol is not supported '{}' for function {}",
format,
*pos,
getName());
@ -1337,6 +1440,8 @@ using FunctionFromUnixTimestampInJodaSyntax = FunctionFormatDateTimeImpl<NameFro
REGISTER_FUNCTION(FormatDateTime)
{
factory.registerFunction<FunctionFormatDateTime>();
factory.registerAlias("DATE_FORMAT", FunctionFormatDateTime::name);
factory.registerFunction<FunctionFromUnixTimestamp>();
factory.registerAlias("FROM_UNIXTIME", "fromUnixTimestamp");

View File

@ -0,0 +1,94 @@
#include "config.h"
#if USE_ULID
#include <Columns/ColumnFixedString.h>
#include <DataTypes/DataTypeFixedString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <ulid.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
class FunctionGenerateULID : public IFunction
{
public:
static constexpr size_t ULID_LENGTH = 26;
static constexpr auto name = "generateULID";
static FunctionPtr create(ContextPtr /*context*/)
{
return std::make_shared<FunctionGenerateULID>();
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
bool isVariadic() const override { return true; }
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.size() > 1)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be 0 or 1.",
getName(), arguments.size());
return std::make_shared<DataTypeFixedString>(ULID_LENGTH);
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & /*arguments*/, const DataTypePtr &, size_t input_rows_count) const override
{
auto col_res = ColumnFixedString::create(ULID_LENGTH);
auto & vec_res = col_res->getChars();
vec_res.resize(input_rows_count * ULID_LENGTH);
ulid_generator generator;
ulid_generator_init(&generator, 0);
for (size_t offset = 0, size = vec_res.size(); offset < size; offset += ULID_LENGTH)
ulid_generate(&generator, reinterpret_cast<char *>(&vec_res[offset]));
return col_res;
}
};
REGISTER_FUNCTION(GenerateULID)
{
factory.registerFunction<FunctionGenerateULID>(
{
R"(
Generates a Universally Unique Lexicographically Sortable Identifier (ULID).
This function takes an optional argument, the value of which is discarded to generate different values in case the function is called multiple times.
The function returns a value of type FixedString(26).
)",
Documentation::Examples{
{"ulid", "SELECT generateULID()"},
{"multiple", "SELECT generateULID(1), generateULID(2)"}},
Documentation::Categories{"ULID"}
},
FunctionFactory::CaseSensitive);
}
}
#endif

View File

@ -77,8 +77,17 @@ public:
return std::make_shared<DataTypeUInt8>();
}
ColumnPtr executeImplDryRun(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) const override
{
return execute(arguments, result_type, true);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) const override
{
return execute(arguments, result_type, false);
}
ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, bool dry_run) const
{
const IColumn * col = arguments[0].column.get();
@ -99,11 +108,14 @@ public:
if (seconds > 3.0) /// The choice is arbitrary
throw Exception(ErrorCodes::TOO_SLOW, "The maximum sleep time is 3 seconds. Requested: {}", toString(seconds));
UInt64 count = (variant == FunctionSleepVariant::PerBlock ? 1 : size);
UInt64 microseconds = static_cast<UInt64>(seconds * count * 1e6);
sleepForMicroseconds(microseconds);
ProfileEvents::increment(ProfileEvents::SleepFunctionCalls, count);
ProfileEvents::increment(ProfileEvents::SleepFunctionMicroseconds, microseconds);
if (!dry_run)
{
UInt64 count = (variant == FunctionSleepVariant::PerBlock ? 1 : size);
UInt64 microseconds = static_cast<UInt64>(seconds * count * 1e6);
sleepForMicroseconds(microseconds);
ProfileEvents::increment(ProfileEvents::SleepFunctionCalls, count);
ProfileEvents::increment(ProfileEvents::SleepFunctionMicroseconds, microseconds);
}
}
/// convertToFullColumn needed, because otherwise (constant expression case) function will not get called on each columns.

View File

@ -27,6 +27,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_REDIRECTS;
}
namespace S3
@ -80,6 +81,67 @@ void Client::RetryStrategy::RequestBookkeeping(const Aws::Client::HttpResponseOu
return wrapped_strategy->RequestBookkeeping(httpResponseOutcome, lastError);
}
namespace
{
void verifyClientConfiguration(const Aws::Client::ClientConfiguration & client_config)
{
if (!client_config.retryStrategy)
throw Exception(ErrorCodes::LOGICAL_ERROR, "The S3 client can only be used with Client::RetryStrategy, define it in the client configuration");
assert_cast<const Client::RetryStrategy &>(*client_config.retryStrategy);
}
}
std::unique_ptr<Client> Client::create(
size_t max_redirects_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
const Aws::Client::ClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing)
{
verifyClientConfiguration(client_configuration);
return std::unique_ptr<Client>(
new Client(max_redirects_, credentials_provider, client_configuration, sign_payloads, use_virtual_addressing));
}
std::unique_ptr<Client> Client::create(const Client & other)
{
return std::unique_ptr<Client>(new Client(other));
}
Client::Client(
size_t max_redirects_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
const Aws::Client::ClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing)
: Aws::S3::S3Client(credentials_provider, client_configuration, std::move(sign_payloads), use_virtual_addressing)
, max_redirects(max_redirects_)
, log(&Poco::Logger::get("S3Client"))
{
auto * endpoint_provider = dynamic_cast<Aws::S3::Endpoint::S3DefaultEpProviderBase *>(accessEndpointProvider().get());
endpoint_provider->GetBuiltInParameters().GetParameter("Region").GetString(explicit_region);
std::string endpoint;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(endpoint);
detect_region = explicit_region == Aws::Region::AWS_GLOBAL && endpoint.find(".amazonaws.com") != std::string::npos;
cache = std::make_shared<ClientCache>();
ClientCacheRegistry::instance().registerClient(cache);
}
Client::Client(const Client & other)
: Aws::S3::S3Client(other)
, explicit_region(other.explicit_region)
, detect_region(other.detect_region)
, max_redirects(other.max_redirects)
, log(&Poco::Logger::get("S3Client"))
{
cache = std::make_shared<ClientCache>(*other.cache);
ClientCacheRegistry::instance().registerClient(cache);
}
bool Client::checkIfWrongRegionDefined(const std::string & bucket, const Aws::S3::S3Error & error, std::string & region) const
{
if (detect_region)
@ -135,7 +197,7 @@ Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) c
if (checkIfWrongRegionDefined(bucket, error, new_region))
{
request.overrideRegion(new_region);
return HeadObject(request);
return Aws::S3::S3Client::HeadObject(request);
}
if (error.GetResponseCode() != Aws::Http::HttpResponseCode::MOVED_PERMANENTLY)
@ -248,6 +310,83 @@ Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & r
return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return Aws::S3::S3Client::DeleteObjects(req); });
}
template <typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
Client::doRequest(const RequestType & request, RequestFn request_fn) const
{
const auto & bucket = request.GetBucket();
if (auto region = getRegionForBucket(bucket); !region.empty())
{
if (!detect_region)
LOG_INFO(log, "Using region override {} for bucket {}", region, bucket);
request.overrideRegion(std::move(region));
}
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));
bool found_new_endpoint = false;
// if we found correct endpoint after 301 responses, update the cache for future requests
SCOPE_EXIT(
if (found_new_endpoint)
{
auto uri_override = request.getURIOverride();
assert(uri_override.has_value());
updateURIForBucket(bucket, std::move(*uri_override));
}
);
for (size_t attempt = 0; attempt <= max_redirects; ++attempt)
{
auto result = request_fn(request);
if (result.IsSuccess())
return result;
const auto & error = result.GetError();
std::string new_region;
if (checkIfWrongRegionDefined(bucket, error, new_region))
{
request.overrideRegion(new_region);
continue;
}
if (error.GetResponseCode() != Aws::Http::HttpResponseCode::MOVED_PERMANENTLY)
return result;
// maybe we detect a correct region
if (!detect_region)
{
if (auto region = GetErrorMarshaller()->ExtractRegion(error); !region.empty() && region != explicit_region)
{
request.overrideRegion(region);
insertRegionOverride(bucket, region);
}
}
// we possibly got new location, need to try with that one
auto new_uri = getURIFromError(error);
if (!new_uri)
return result;
const auto & current_uri_override = request.getURIOverride();
/// we already tried with this URI
if (current_uri_override && current_uri_override->uri == new_uri->uri)
{
LOG_INFO(log, "Getting redirected to the same invalid location {}", new_uri->uri.toString());
return result;
}
found_new_endpoint = true;
request.overrideURI(*new_uri);
}
throw Exception(ErrorCodes::TOO_MANY_REDIRECTS, "Too many redirects");
}
std::string Client::getRegionForBucket(const std::string & bucket, bool force_detect) const
{
std::lock_guard lock(cache->region_cache_mutex);
@ -458,8 +597,8 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
client_configuration.retryStrategy = std::make_shared<Client::RetryStrategy>(std::move(client_configuration.retryStrategy));
return Client::create(
client_configuration.s3_max_redirects,
std::move(credentials_provider),
std::move(client_configuration), // Client configuration.
credentials_provider,
client_configuration, // Client configuration.
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
is_virtual_hosted_style || client_configuration.endpointOverride.empty() /// Use virtual addressing if endpoint is not specified.
);

View File

@ -19,16 +19,7 @@
#include <aws/core/client/AWSErrorMarshaller.h>
#include <aws/core/client/RetryStrategy.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_REDIRECTS;
}
namespace S3
namespace DB::S3
{
namespace Model = Aws::S3::Model;
@ -80,15 +71,25 @@ private:
/// - automatically detect endpoint and regions for each bucket and cache them
///
/// For this client to work correctly both Client::RetryStrategy and Requests defined in <IO/S3/Requests.h> should be used.
class Client : public Aws::S3::S3Client
///
/// To add support for new type of request
/// - ExtendedRequest should be defined inside IO/S3/Requests.h
/// - new method accepting that request should be defined in this Client (check other requests for reference)
/// - method handling the request from Aws::S3::S3Client should be left to private so we don't use it by accident
class Client : private Aws::S3::S3Client
{
public:
template <typename... Args>
static std::unique_ptr<Client> create(Args &&... args)
{
(verifyArgument(args), ...);
return std::unique_ptr<Client>(new Client(std::forward<Args>(args)...));
}
/// we use a factory method to verify arguments before creating a client because
/// there are certain requirements on arguments for it to work correctly
/// e.g. Client::RetryStrategy should be used
static std::unique_ptr<Client> create(
size_t max_redirects_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
const Aws::Client::ClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing);
static std::unique_ptr<Client> create(const Client & other);
Client & operator=(const Client &) = delete;
@ -108,7 +109,12 @@ public:
}
}
/// Decorator for RetryStrategy needed for this client to work correctly
/// Decorator for RetryStrategy needed for this client to work correctly.
/// We want to manually handle permanent moves (status code 301) because:
/// - redirect location is written in XML format inside the response body something that doesn't exist for HEAD
/// requests so we need to manually find the correct location
/// - we want to cache the new location to decrease number of roundtrips for future requests
/// This decorator doesn't retry if 301 is detected and fallbacks to the inner retry strategy otherwise.
class RetryStrategy : public Aws::Client::RetryStrategy
{
public:
@ -149,35 +155,19 @@ public:
Model::DeleteObjectOutcome DeleteObject(const DeleteObjectRequest & request) const;
Model::DeleteObjectsOutcome DeleteObjects(const DeleteObjectsRequest & request) const;
using Aws::S3::S3Client::EnableRequestProcessing;
using Aws::S3::S3Client::DisableRequestProcessing;
private:
template <typename... Args>
explicit Client(size_t max_redirects_, Args &&... args)
: Aws::S3::S3Client(std::forward<Args>(args)...)
, max_redirects(max_redirects_)
, log(&Poco::Logger::get("S3Client"))
{
auto * endpoint_provider = dynamic_cast<Aws::S3::Endpoint::S3DefaultEpProviderBase *>(accessEndpointProvider().get());
endpoint_provider->GetBuiltInParameters().GetParameter("Region").GetString(explicit_region);
std::string endpoint;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(endpoint);
detect_region = explicit_region == Aws::Region::AWS_GLOBAL && endpoint.find(".amazonaws.com") != std::string::npos;
Client(size_t max_redirects_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider>& credentials_provider,
const Aws::Client::ClientConfiguration& client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing);
cache = std::make_shared<ClientCache>();
ClientCacheRegistry::instance().registerClient(cache);
}
Client(const Client & other);
Client(const Client & other)
: Aws::S3::S3Client(other)
, explicit_region(other.explicit_region)
, detect_region(other.detect_region)
, max_redirects(other.max_redirects)
, log(&Poco::Logger::get("S3Client"))
{
cache = std::make_shared<ClientCache>(*other.cache);
ClientCacheRegistry::instance().registerClient(cache);
}
/// Make regular functions private
/// Leave regular functions private so we don't accidentally use them
/// otherwise region and endpoint redirection won't work
using Aws::S3::S3Client::HeadObject;
using Aws::S3::S3Client::ListObjectsV2;
using Aws::S3::S3Client::ListObjects;
@ -196,80 +186,7 @@ private:
template <typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
doRequest(const RequestType & request, RequestFn request_fn) const
{
const auto & bucket = request.GetBucket();
if (auto region = getRegionForBucket(bucket); !region.empty())
{
if (!detect_region)
LOG_INFO(log, "Using region override {} for bucket {}", region, bucket);
request.overrideRegion(std::move(region));
}
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));
bool found_new_endpoint = false;
// if we found correct endpoint after 301 responses, update the cache for future requests
SCOPE_EXIT(
if (found_new_endpoint)
{
auto uri_override = request.getURIOverride();
assert(uri_override.has_value());
updateURIForBucket(bucket, std::move(*uri_override));
}
);
for (size_t attempt = 0; attempt <= max_redirects; ++attempt)
{
auto result = request_fn(request);
if (result.IsSuccess())
return result;
const auto & error = result.GetError();
std::string new_region;
if (checkIfWrongRegionDefined(bucket, error, new_region))
{
request.overrideRegion(new_region);
continue;
}
if (error.GetResponseCode() != Aws::Http::HttpResponseCode::MOVED_PERMANENTLY)
return result;
// maybe we detect a correct region
if (!detect_region)
{
if (auto region = GetErrorMarshaller()->ExtractRegion(error); !region.empty() && region != explicit_region)
{
request.overrideRegion(region);
insertRegionOverride(bucket, region);
}
}
// we possibly got new location, need to try with that one
auto new_uri = getURIFromError(error);
if (!new_uri)
return result;
const auto & current_uri_override = request.getURIOverride();
/// we already tried with this URI
if (current_uri_override && current_uri_override->uri == new_uri->uri)
{
LOG_INFO(log, "Getting redirected to the same invalid location {}", new_uri->uri.toString());
return result;
}
found_new_endpoint = true;
request.overrideURI(*new_uri);
}
throw Exception(ErrorCodes::TOO_MANY_REDIRECTS, "Too many redirects");
}
doRequest(const RequestType & request, RequestFn request_fn) const;
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
@ -281,19 +198,6 @@ private:
bool checkIfWrongRegionDefined(const std::string & bucket, const Aws::S3::S3Error & error, std::string & region) const;
void insertRegionOverride(const std::string & bucket, const std::string & region) const;
template <typename T>
static void verifyArgument(const T & /*arg*/)
{}
template <std::derived_from<Aws::Client::ClientConfiguration> T>
static void verifyArgument(const T & client_config)
{
if (!client_config.retryStrategy)
throw Exception(ErrorCodes::LOGICAL_ERROR, "The S3 client can only be used with Client::RetryStrategy, define it in the client configuration");
assert_cast<const RetryStrategy &>(*client_config.retryStrategy);
}
std::string explicit_region;
mutable bool detect_region = true;
@ -339,6 +243,4 @@ private:
}
}
#endif

View File

@ -42,7 +42,6 @@ namespace
}
/// Performs a request to get the size and last modification time of an object.
/// The function performs either HeadObject or GetObjectAttributes request depending on the endpoint.
std::pair<std::optional<ObjectInfo>, Aws::S3::S3Error> tryGetObjectInfo(
const S3::Client & client, const String & bucket, const String & key, const String & version_id,
const S3Settings::RequestSettings & /*request_settings*/, bool with_metadata, bool for_disk_s3)
@ -87,7 +86,7 @@ ObjectInfo getObjectInfo(
else if (throw_on_error)
{
throw DB::Exception(ErrorCodes::S3_ERROR,
"Failed to get object attributes: {}. HTTP response code: {}",
"Failed to get object info: {}. HTTP response code: {}",
error.GetMessage(), static_cast<size_t>(error.GetResponseCode()));
}
return {};

View File

@ -3795,6 +3795,12 @@ void Context::initializeBackgroundExecutorsIfNeeded()
else if (config.has("profiles.default.background_merges_mutations_concurrency_ratio"))
background_merges_mutations_concurrency_ratio = config.getUInt64("profiles.default.background_merges_mutations_concurrency_ratio");
String background_merges_mutations_scheduling_policy = "round_robin";
if (config.has("background_merges_mutations_scheduling_policy"))
background_merges_mutations_scheduling_policy = config.getString("background_merges_mutations_scheduling_policy");
else if (config.has("profiles.default.background_merges_mutations_scheduling_policy"))
background_merges_mutations_scheduling_policy = config.getString("profiles.default.background_merges_mutations_scheduling_policy");
size_t background_move_pool_size = 8;
if (config.has("background_move_pool_size"))
background_move_pool_size = config.getUInt64("background_move_pool_size");
@ -3819,10 +3825,11 @@ void Context::initializeBackgroundExecutorsIfNeeded()
"MergeMutate",
/*max_threads_count*/background_pool_size,
/*max_tasks_count*/background_pool_size * background_merges_mutations_concurrency_ratio,
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
background_merges_mutations_scheduling_policy
);
LOG_INFO(shared->log, "Initialized background executor for merges and mutations with num_threads={}, num_tasks={}",
background_pool_size, background_pool_size * background_merges_mutations_concurrency_ratio);
LOG_INFO(shared->log, "Initialized background executor for merges and mutations with num_threads={}, num_tasks={}, scheduling_policy={}",
background_pool_size, background_pool_size * background_merges_mutations_concurrency_ratio, background_merges_mutations_scheduling_policy);
shared->moves_executor = std::make_shared<OrdinaryBackgroundExecutor>
(
@ -4069,4 +4076,14 @@ bool Context::canUseParallelReplicasOnFollower() const
&& getClientInfo().collaborate_with_initiator;
}
UInt64 Context::getClientProtocolVersion() const
{
return client_protocol_version;
}
void Context::setClientProtocolVersion(UInt64 version)
{
client_protocol_version = version;
}
}

View File

@ -131,11 +131,17 @@ class StoragePolicySelector;
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
template <class Queue>
class MergeTreeBackgroundExecutor;
class MergeMutateRuntimeQueue;
class OrdinaryRuntimeQueue;
using MergeMutateBackgroundExecutor = MergeTreeBackgroundExecutor<MergeMutateRuntimeQueue>;
/// Scheduling policy can be changed using `background_merges_mutations_scheduling_policy` config option.
/// By default concurrent merges are scheduled using "round_robin" to ensure fair and starvation-free operation.
/// Previously in heavily overloaded shards big merges could possibly be starved by smaller
/// merges due to the use of strict priority scheduling "shortest_task_first".
class DynamicRuntimeQueue;
using MergeMutateBackgroundExecutor = MergeTreeBackgroundExecutor<DynamicRuntimeQueue>;
using MergeMutateBackgroundExecutorPtr = std::shared_ptr<MergeMutateBackgroundExecutor>;
using OrdinaryBackgroundExecutor = MergeTreeBackgroundExecutor<OrdinaryRuntimeQueue>;
class RoundRobinRuntimeQueue;
using OrdinaryBackgroundExecutor = MergeTreeBackgroundExecutor<RoundRobinRuntimeQueue>;
using OrdinaryBackgroundExecutorPtr = std::shared_ptr<OrdinaryBackgroundExecutor>;
struct PartUUIDs;
using PartUUIDsPtr = std::shared_ptr<PartUUIDs>;
@ -273,6 +279,9 @@ private:
std::optional<MergeTreeAllRangesCallback> merge_tree_all_ranges_callback;
UUID parallel_replicas_group_uuid{UUIDHelpers::Nil};
/// This parameter can be set by the HTTP client to tune the behavior of output formats for compatibility.
UInt64 client_protocol_version = 0;
/// Record entities accessed by current query, and store this information in system.query_log.
struct QueryAccessInfo
{
@ -822,6 +831,8 @@ public:
bool tryCheckClientConnectionToMyKeeperCluster() const;
UInt32 getZooKeeperSessionUptime() const;
UInt64 getClientProtocolVersion() const;
void setClientProtocolVersion(UInt64 version);
#if USE_ROCKSDB
MergeTreeMetadataCachePtr getMergeTreeMetadataCache() const;

View File

@ -79,7 +79,7 @@ BlockIO InterpreterOptimizeQuery::execute()
if (auto * snapshot_data = dynamic_cast<MergeTreeData::SnapshotData *>(storage_snapshot->data.get()))
snapshot_data->parts = {};
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, getContext());
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, ast.cleanup, getContext());
return {};
}

View File

@ -620,12 +620,17 @@ InterpreterSelectQuery::InterpreterSelectQuery(
current_info.query = query_ptr;
current_info.syntax_analyzer_result = syntax_analyzer_result;
Names queried_columns = syntax_analyzer_result->requiredSourceColumns();
const auto & supported_prewhere_columns = storage->supportedPrewhereColumns();
if (supported_prewhere_columns.has_value())
std::erase_if(queried_columns, [&](const auto & name) { return !supported_prewhere_columns->contains(name); });
MergeTreeWhereOptimizer{
current_info,
context,
std::move(column_compressed_sizes),
metadata_snapshot,
syntax_analyzer_result->requiredSourceColumns(),
queried_columns,
log};
}
}
@ -2044,6 +2049,27 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
}
}
/// Set of all (including ALIAS) required columns for PREWHERE
auto get_prewhere_columns = [&]()
{
NameSet columns;
if (prewhere_info)
{
/// Get some columns directly from PREWHERE expression actions
auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns().getNames();
columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
if (prewhere_info->row_level_filter)
{
auto row_level_required_columns = prewhere_info->row_level_filter->getRequiredColumns().getNames();
columns.insert(row_level_required_columns.begin(), row_level_required_columns.end());
}
}
return columns;
};
/// There are multiple sources of required columns:
/// - raw required columns,
/// - columns deduced from ALIAS columns,
@ -2053,22 +2079,9 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
/// before any other executions.
if (alias_columns_required)
{
NameSet required_columns_from_prewhere; /// Set of all (including ALIAS) required columns for PREWHERE
NameSet required_columns_from_prewhere = get_prewhere_columns();
NameSet required_aliases_from_prewhere; /// Set of ALIAS required columns for PREWHERE
if (prewhere_info)
{
/// Get some columns directly from PREWHERE expression actions
auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns().getNames();
required_columns_from_prewhere.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
if (prewhere_info->row_level_filter)
{
auto row_level_required_columns = prewhere_info->row_level_filter->getRequiredColumns().getNames();
required_columns_from_prewhere.insert(row_level_required_columns.begin(), row_level_required_columns.end());
}
}
/// Expression, that contains all raw required columns
ASTPtr required_columns_all_expr = std::make_shared<ASTExpressionList>();
@ -2164,6 +2177,18 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
required_columns.push_back(column);
}
}
const auto & supported_prewhere_columns = storage->supportedPrewhereColumns();
if (supported_prewhere_columns.has_value())
{
NameSet required_columns_from_prewhere = get_prewhere_columns();
for (const auto & column_name : required_columns_from_prewhere)
{
if (!supported_prewhere_columns->contains(column_name))
throw Exception(ErrorCodes::ILLEGAL_PREWHERE, "Storage {} doesn't support PREWHERE for {}", storage->getName(), column_name);
}
}
}
void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan)

View File

@ -218,7 +218,11 @@ void ServerAsynchronousMetrics::updateImpl(AsynchronousMetricValues & new_values
size_t max_part_count_for_partition = 0;
size_t number_of_databases = databases.size();
size_t number_of_databases = 0;
for (auto [db_name, _] : databases)
if (db_name != DatabaseCatalog::TEMPORARY_DATABASE)
++number_of_databases; /// filter out the internal database for temporary tables, system table "system.databases" behaves the same way
size_t total_number_of_tables = 0;
size_t total_number_of_bytes = 0;

View File

@ -534,36 +534,64 @@ MergeTreeTransactionPtr TransactionLog::tryGetRunningTransaction(const TIDHash &
return it->second;
}
CSN TransactionLog::getCSN(const TransactionID & tid)
CSN TransactionLog::getCSN(const TransactionID & tid, const std::atomic<CSN> * failback_with_strict_load_csn)
{
/// Avoid creation of the instance if transactions are not actually involved
if (tid == Tx::PrehistoricTID)
return Tx::PrehistoricCSN;
return instance().getCSNImpl(tid.getHash());
return instance().getCSNImpl(tid.getHash(), failback_with_strict_load_csn);
}
CSN TransactionLog::getCSN(const TIDHash & tid)
CSN TransactionLog::getCSN(const TIDHash & tid, const std::atomic<CSN> * failback_with_strict_load_csn)
{
/// Avoid creation of the instance if transactions are not actually involved
if (tid == Tx::PrehistoricTID.getHash())
return Tx::PrehistoricCSN;
return instance().getCSNImpl(tid);
return instance().getCSNImpl(tid, failback_with_strict_load_csn);
}
CSN TransactionLog::getCSNImpl(const TIDHash & tid_hash) const
CSN TransactionLog::getCSNImpl(const TIDHash & tid_hash, const std::atomic<CSN> * failback_with_strict_load_csn) const
{
chassert(tid_hash);
chassert(tid_hash != Tx::EmptyTID.getHash());
std::lock_guard lock{mutex};
auto it = tid_to_csn.find(tid_hash);
if (it != tid_to_csn.end())
return it->second.csn;
{
std::lock_guard lock{mutex};
auto it = tid_to_csn.find(tid_hash);
if (it != tid_to_csn.end())
return it->second.csn;
}
/// Usually commit csn checked by load memory with memory_order_relaxed option just for performance improvements
/// If fast loading fails than getCSN is called.
/// There is a race possible, transaction could be committed concurrently. Right before getCSN has been called. In that case tid_to_csn has no tid_hash but commit csn is set.
/// In order to be sure, commit csn has to be loaded with memory_order_seq_cst after lookup at tid_to_csn
if (failback_with_strict_load_csn)
if (CSN maybe_csn = failback_with_strict_load_csn->load())
return maybe_csn;
return Tx::UnknownCSN;
}
void TransactionLog::assertTIDIsNotOutdated(const TransactionID & tid)
CSN TransactionLog::getCSNAndAssert(const TransactionID & tid, std::atomic<CSN> & failback_with_strict_load_csn)
{
/// failback_with_strict_load_csn is not provided to getCSN
/// Because it would be checked after assertTIDIsNotOutdated
if (CSN maybe_csn = getCSN(tid))
return maybe_csn;
assertTIDIsNotOutdated(tid, &failback_with_strict_load_csn);
/// If transaction is not outdated then it might be already committed
/// We should load CSN again to distinguish it
/// Otherwise the transactiuon hasn't been committed yet
if (CSN maybe_csn = failback_with_strict_load_csn.load())
return maybe_csn;
return Tx::UnknownCSN;
}
void TransactionLog::assertTIDIsNotOutdated(const TransactionID & tid, const std::atomic<CSN> * failback_with_strict_load_csn)
{
if (tid == Tx::PrehistoricTID)
return;
@ -573,6 +601,14 @@ void TransactionLog::assertTIDIsNotOutdated(const TransactionID & tid)
if (tail <= tid.start_csn)
return;
/// At this point of execution tail is lesser that tid.start_csn
/// This mean that transaction is either outdated or just has been committed concurrently and the tail moved forward.
/// If the second case takes place transaction's commit csn has to be set.
/// We should load CSN again to distinguish the second case.
if (failback_with_strict_load_csn)
if (CSN maybe_csn = failback_with_strict_load_csn->load())
return;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get CSN for too old TID {}, current tail_ptr is {}, probably it's a bug", tid, tail);
}

View File

@ -107,11 +107,14 @@ public:
/// Returns CSN if transaction with specified ID was committed and UnknownCSN if it was not.
/// Returns PrehistoricCSN for PrehistoricTID without creating a TransactionLog instance as a special case.
static CSN getCSN(const TransactionID & tid);
static CSN getCSN(const TIDHash & tid);
/// Some time a transaction could be committed concurrently, in order to resolve it provide failback_with_strict_load_csn
static CSN getCSN(const TransactionID & tid, const std::atomic<CSN> * failback_with_strict_load_csn = nullptr);
static CSN getCSN(const TIDHash & tid, const std::atomic<CSN> * failback_with_strict_load_csn = nullptr);
static CSN getCSNAndAssert(const TransactionID & tid, std::atomic<CSN> & failback_with_strict_load_csn);
/// Ensures that getCSN returned UnknownCSN because transaction is not committed and not because entry was removed from the log.
static void assertTIDIsNotOutdated(const TransactionID & tid);
static void assertTIDIsNotOutdated(const TransactionID & tid, const std::atomic<CSN> * failback_with_strict_load_csn = nullptr);
/// Returns a pointer to transaction object if it's running or nullptr.
MergeTreeTransactionPtr tryGetRunningTransaction(const TIDHash & tid);
@ -147,7 +150,8 @@ private:
ZooKeeperPtr getZooKeeper() const;
CSN getCSNImpl(const TIDHash & tid_hash) const;
/// Some time a transaction could be committed concurrently, in order to resolve it provide failback_with_strict_load_csn
CSN getCSNImpl(const TIDHash & tid_hash, const std::atomic<CSN> * failback_with_strict_load_csn = nullptr) const;
const ContextPtr global_context;
Poco::Logger * const log;

View File

@ -20,26 +20,6 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_TEXT;
}
inline static CSN getCSNAndAssert(TIDHash tid_hash, std::atomic<CSN> & csn, const TransactionID * tid = nullptr)
{
CSN maybe_csn = TransactionLog::getCSN(tid_hash);
if (maybe_csn)
return maybe_csn;
/// Either transaction is not committed (yet) or it was committed and then the CSN entry was cleaned up from the log.
/// We should load CSN again to distinguish the second case.
/// If entry was cleaned up, then CSN is already stored in VersionMetadata and we will get it.
/// And for the first case we will get UnknownCSN again.
maybe_csn = csn.load();
if (maybe_csn)
return maybe_csn;
if (tid)
TransactionLog::assertTIDIsNotOutdated(*tid);
return Tx::UnknownCSN;
}
VersionMetadata::VersionMetadata()
{
/// It would be better to make it static, but static loggers do not work for some reason (initialization order?)
@ -217,7 +197,7 @@ bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
/// so we can determine their visibility through fast path.
/// But for long-running writing transactions we will always do
/// CNS lookup and get 0 (UnknownCSN) until the transaction is committed/rolled back.
creation = getCSNAndAssert(creation_tid.getHash(), creation_csn, &creation_tid);
creation = TransactionLog::getCSNAndAssert(creation_tid, creation_csn);
if (!creation)
{
return false; /// Part creation is not committed yet
@ -229,7 +209,7 @@ bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
if (removal_lock)
{
removal = getCSNAndAssert(removal_lock, removal_csn);
removal = TransactionLog::getCSN(removal_lock, &removal_csn);
if (removal)
removal_csn.store(removal, std::memory_order_relaxed);
}
@ -267,7 +247,7 @@ bool VersionMetadata::canBeRemovedImpl(CSN oldest_snapshot_version)
if (!creation)
{
/// Cannot remove part if its creation not committed yet
creation = getCSNAndAssert(creation_tid.getHash(), creation_csn, &creation_tid);
creation = TransactionLog::getCSNAndAssert(creation_tid, creation_csn);
if (creation)
creation_csn.store(creation, std::memory_order_relaxed);
else
@ -287,7 +267,7 @@ bool VersionMetadata::canBeRemovedImpl(CSN oldest_snapshot_version)
if (!removal)
{
/// Part removal is not committed yet
removal = getCSNAndAssert(removal_lock, removal_csn);
removal = TransactionLog::getCSN(removal_lock, &removal_csn);
if (removal)
removal_csn.store(removal, std::memory_order_relaxed);
else

View File

@ -363,24 +363,17 @@ void removeUnneededColumnsFromSelectClause(ASTSelectQuery * select_query, const
{
ASTs & elements = select_query->select()->children;
std::unordered_map<String, size_t> required_columns_with_duplicate_count;
/// Order of output columns should match order in required_result_columns,
/// otherwise UNION queries may have incorrect header when subselect has duplicated columns.
///
/// NOTE: multimap is required since there can be duplicated column names.
std::unordered_multimap<String, size_t> output_columns_positions;
std::map<String, size_t> required_columns_with_duplicate_count;
if (!required_result_columns.empty())
{
/// Some columns may be queried multiple times, like SELECT x, y, y FROM table.
for (size_t i = 0; i < required_result_columns.size(); ++i)
for (const auto & name : required_result_columns)
{
const auto & name = required_result_columns[i];
if (remove_dups)
required_columns_with_duplicate_count[name] = 1;
else
++required_columns_with_duplicate_count[name];
output_columns_positions.emplace(name, i);
}
}
else if (remove_dups)
@ -392,8 +385,8 @@ void removeUnneededColumnsFromSelectClause(ASTSelectQuery * select_query, const
else
return;
ASTs new_elements(elements.size() + output_columns_positions.size());
size_t new_elements_size = 0;
ASTs new_elements;
new_elements.reserve(elements.size());
NameSet remove_columns;
@ -401,35 +394,17 @@ void removeUnneededColumnsFromSelectClause(ASTSelectQuery * select_query, const
{
String name = elem->getAliasOrColumnName();
/// Columns that are presented in output_columns_positions should
/// appears in the same order in the new_elements, hence default
/// result_index goes after all elements of output_columns_positions
/// (it is for columns that are not located in
/// output_columns_positions, i.e. untuple())
size_t result_index = output_columns_positions.size() + new_elements_size;
/// Note, order of duplicated columns is not important here (since they
/// are the same), only order for unique columns is important, so it is
/// fine to use multimap here.
if (auto it = output_columns_positions.find(name); it != output_columns_positions.end())
{
result_index = it->second;
output_columns_positions.erase(it);
}
auto it = required_columns_with_duplicate_count.find(name);
if (required_columns_with_duplicate_count.end() != it && it->second)
{
new_elements[result_index] = elem;
new_elements.push_back(elem);
--it->second;
++new_elements_size;
}
else if (select_query->distinct || hasArrayJoin(elem))
{
/// ARRAY JOIN cannot be optimized out since it may change number of rows,
/// so as DISTINCT.
new_elements[result_index] = elem;
++new_elements_size;
new_elements.push_back(elem);
}
else
{
@ -440,25 +415,18 @@ void removeUnneededColumnsFromSelectClause(ASTSelectQuery * select_query, const
/// Never remove untuple. It's result column may be in required columns.
/// It is not easy to analyze untuple here, because types were not calculated yet.
if (func && func->name == "untuple")
{
new_elements[result_index] = elem;
++new_elements_size;
}
new_elements.push_back(elem);
/// removing aggregation can change number of rows, so `count()` result in outer sub-query would be wrong
if (func && !select_query->groupBy())
{
GetAggregatesVisitor::Data data = {};
GetAggregatesVisitor(data).visit(elem);
if (!data.aggregates.empty())
{
new_elements[result_index] = elem;
++new_elements_size;
}
new_elements.push_back(elem);
}
}
}
/// Remove empty nodes.
std::erase(new_elements, ASTPtr{});
if (select_query->interpolate())
{

View File

@ -24,6 +24,9 @@ void ASTOptimizeQuery::formatQueryImpl(const FormatSettings & settings, FormatSt
if (deduplicate)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " DEDUPLICATE" << (settings.hilite ? hilite_none : "");
if (cleanup)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " CLEANUP" << (settings.hilite ? hilite_none : "");
if (deduplicate_by_columns)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " BY " << (settings.hilite ? hilite_none : "");

View File

@ -21,11 +21,12 @@ public:
bool deduplicate = false;
/// Deduplicate by columns.
ASTPtr deduplicate_by_columns;
/// Delete 'is_deleted' data
bool cleanup = false;
/** Get the text that identifies this element. */
String getID(char delim) const override
{
return "OptimizeQuery" + (delim + getDatabase()) + delim + getTable() + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : "");
return "OptimizeQuery" + (delim + getDatabase()) + delim + getTable() + (final ? "_final" : "") + (deduplicate ? "_deduplicate" : "")+ (cleanup ? "_cleanup" : "");
}
ASTPtr clone() const override

View File

@ -28,6 +28,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
ParserKeyword s_partition("PARTITION");
ParserKeyword s_final("FINAL");
ParserKeyword s_deduplicate("DEDUPLICATE");
ParserKeyword s_cleanup("CLEANUP");
ParserKeyword s_by("BY");
ParserToken s_dot(TokenType::Dot);
ParserIdentifier name_p(true);
@ -38,6 +39,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
ASTPtr partition;
bool final = false;
bool deduplicate = false;
bool cleanup = false;
String cluster_str;
if (!s_optimize_table.ignore(pos, expected))
@ -68,6 +70,9 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
if (s_deduplicate.ignore(pos, expected))
deduplicate = true;
if (s_cleanup.ignore(pos, expected))
cleanup = true;
ASTPtr deduplicate_by_columns;
if (deduplicate && s_by.ignore(pos, expected))
{
@ -85,6 +90,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
query->final = final;
query->deduplicate = deduplicate;
query->deduplicate_by_columns = deduplicate_by_columns;
query->cleanup = cleanup;
query->database = database;
query->table = table;

View File

@ -14,7 +14,7 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Query OPTIMIZE TABLE [db.]name [PARTITION partition] [FINAL] [DEDUPLICATE]
/** Query OPTIMIZE TABLE [db.]name [PARTITION partition] [FINAL] [DEDUPLICATE] [CLEANUP]
*/
class ParserOptimizeQuery : public IParserBase
{

View File

@ -94,6 +94,7 @@ const std::unordered_set<std::string_view> keywords
"CHAR",
"CHARACTER",
"CHECK",
"CLEANUP",
"CLEAR",
"CLUSTER",
"CLUSTER_HOST_IDS",

View File

@ -336,6 +336,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(const QueryTreeNodePtr & tabl
{
auto subquery_options = select_query_options.subquery();
Planner subquery_planner(table_expression, subquery_options, planner_context->getGlobalPlannerContext());
/// Propagate storage limits to subquery
subquery_planner.addStorageLimits(*select_query_info.storage_limits);
subquery_planner.buildQueryPlanIfNeeded();
query_plan = std::move(subquery_planner).extractQueryPlan();
}

View File

@ -55,9 +55,9 @@ private:
class NativeOutputFormat final : public IOutputFormat
{
public:
NativeOutputFormat(WriteBuffer & buf, const Block & header)
NativeOutputFormat(WriteBuffer & buf, const Block & header, UInt64 client_protocol_version = 0)
: IOutputFormat(header, buf)
, writer(buf, 0, header)
, writer(buf, client_protocol_version, header)
{
}
@ -115,9 +115,9 @@ void registerOutputFormatNative(FormatFactory & factory)
factory.registerOutputFormat("Native", [](
WriteBuffer & buf,
const Block & sample,
const FormatSettings &)
const FormatSettings & settings)
{
return std::make_shared<NativeOutputFormat>(buf, sample);
return std::make_shared<NativeOutputFormat>(buf, sample, settings.client_protocol_version);
});
}

View File

@ -1,20 +1,31 @@
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
#include <Columns/ColumnsNumber.h>
#include <IO/WriteBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
const Block & header_,
size_t num_inputs,
SortDescription description_,
const String & is_deleted_column,
const String & version_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
bool use_average_block_sizes,
bool cleanup_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size), cleanup(cleanup_)
{
if (!is_deleted_column.empty())
is_deleted_column_number = header_.getPositionByName(is_deleted_column);
if (!version_column.empty())
version_column_number = header_.getPositionByName(version_column);
}
@ -61,7 +72,15 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// Write the data for the previous primary key.
if (!selected_row.empty())
insertRow();
{
if (is_deleted_column_number!=-1)
{
if (!(cleanup && assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num]))
insertRow();
}
else
insertRow();
}
selected_row.clear();
}
@ -71,6 +90,13 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true);
if ((is_deleted_column_number!=-1))
{
const UInt8 is_deleted = assert_cast<const ColumnUInt8 &>(*current->all_columns[is_deleted_column_number]).getData()[current->getRow()];
if ((is_deleted != 1) && (is_deleted != 0))
throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect data: is_deleted = {} (must be 1 or 0).", toString(is_deleted));
}
/// A non-strict comparison, since we select the last row for the same version values.
if (version_column_number == -1
|| selected_row.empty()
@ -101,7 +127,15 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// We will write the data for the last primary key.
if (!selected_row.empty())
insertRow();
{
if (is_deleted_column_number!=-1)
{
if (!(cleanup && assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num]))
insertRow();
}
else
insertRow();
}
return Status(merged_data.pull(), true);
}

View File

@ -20,17 +20,22 @@ class ReplacingSortedAlgorithm final : public IMergingAlgorithmWithSharedChunks
public:
ReplacingSortedAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_, const String & version_column,
SortDescription description_,
const String & is_deleted_column,
const String & version_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
bool use_average_block_sizes = false,
bool cleanup = false);
Status merge() override;
private:
MergedData merged_data;
ssize_t is_deleted_column_number = -1;
ssize_t version_column_number = -1;
bool cleanup = false;
using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 2; /// last, current.

View File

@ -13,19 +13,23 @@ class ReplacingSortedTransform final : public IMergingTransform<ReplacingSortedA
public:
ReplacingSortedTransform(
const Block & header, size_t num_inputs,
SortDescription description_, const String & version_column,
SortDescription description_,
const String & is_deleted_column, const String & version_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
bool use_average_block_sizes = false,
bool cleanup = false)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
header,
num_inputs,
std::move(description_),
is_deleted_column,
version_column,
max_block_size,
out_row_sources_buf_,
use_average_block_sizes)
use_average_block_sizes,
cleanup)
{
}

View File

@ -86,7 +86,9 @@ size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
/// update input order info in read_from_merge_tree step
const int direction = 0; /// for DISTINCT direction doesn't matter, ReadFromMergeTree will choose proper one
read_from_merge_tree->requestReadingInOrder(number_of_sorted_distinct_columns, direction, pre_distinct->getLimitHint());
bool can_read = read_from_merge_tree->requestReadingInOrder(number_of_sorted_distinct_columns, direction, pre_distinct->getLimitHint());
if (!can_read)
return 0;
/// update data stream's sorting properties for found transforms
const DataStream * input_stream = &read_from_merge_tree->getOutputStream();

View File

@ -913,7 +913,7 @@ AggregationInputOrder buildInputOrderInfo(
}
InputOrderInfoPtr buildInputOrderInfo(
ReadFromMergeTree * reading,
const ReadFromMergeTree * reading,
const FixedColumns & fixed_columns,
const ActionsDAGPtr & dag,
const SortDescription & description,
@ -1041,7 +1041,11 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
limit);
if (order_info)
reading->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
{
bool can_read = reading->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
if (!can_read)
return nullptr;
}
return order_info;
}
@ -1054,7 +1058,11 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
limit);
if (order_info)
merge->requestReadingInOrder(order_info);
{
bool can_read = merge->requestReadingInOrder(order_info);
if (!can_read)
return nullptr;
}
return order_info;
}
@ -1086,10 +1094,14 @@ AggregationInputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPl
dag, keys);
if (order_info.input_order)
reading->requestReadingInOrder(
{
bool can_read = reading->requestReadingInOrder(
order_info.input_order->used_prefix_of_sorting_key_size,
order_info.input_order->direction,
order_info.input_order->limit);
if (!can_read)
return {};
}
return order_info;
}
@ -1101,7 +1113,11 @@ AggregationInputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPl
dag, keys);
if (order_info.input_order)
merge->requestReadingInOrder(order_info.input_order);
{
bool can_read = merge->requestReadingInOrder(order_info.input_order);
if (!can_read)
return {};
}
return order_info;
}
@ -1296,7 +1312,9 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
if (order_info)
{
read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
bool can_read = read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
if (!can_read)
return 0;
sorting->convertToFinishSorting(order_info->sort_description_for_merging);
}

View File

@ -283,7 +283,6 @@ Pipe ReadFromMergeTree::readFromPool(
total_rows = query_info.limit;
const auto & settings = context->getSettingsRef();
MergeTreeReadPool::BackoffSettings backoff_settings(settings);
/// round min_marks_to_read up to nearest multiple of block_size expressed in marks
/// If granularity is adaptive it doesn't make sense
@ -295,18 +294,54 @@ Pipe ReadFromMergeTree::readFromPool(
/ max_block_size * max_block_size / fixed_index_granularity;
}
auto pool = std::make_shared<MergeTreeReadPool>(
max_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
storage_snapshot,
prewhere_info,
required_columns,
virt_column_names,
backoff_settings,
settings.preferred_block_size_bytes,
false);
bool all_parts_are_remote = true;
bool all_parts_are_local = true;
for (const auto & part : parts_with_range)
{
const bool is_remote = part.data_part->isStoredOnRemoteDisk();
all_parts_are_local &= !is_remote;
all_parts_are_remote &= is_remote;
}
MergeTreeReadPoolPtr pool;
if ((all_parts_are_remote
&& settings.allow_prefetched_read_pool_for_remote_filesystem
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method))
|| (!all_parts_are_local
&& settings.allow_prefetched_read_pool_for_local_filesystem
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method)))
{
pool = std::make_shared<MergeTreePrefetchedReadPool>(
max_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
storage_snapshot,
prewhere_info,
required_columns,
virt_column_names,
settings.preferred_block_size_bytes,
reader_settings,
context,
use_uncompressed_cache,
all_parts_are_remote,
*data.getSettings());
}
else
{
pool = std::make_shared<MergeTreeReadPool>(
max_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
storage_snapshot,
prewhere_info,
required_columns,
virt_column_names,
context,
false);
}
auto * logger = &Poco::Logger::get(data.getLogName() + " (SelectExecutor)");
LOG_DEBUG(logger, "Reading approx. {} rows with {} streams", total_rows, max_streams);
@ -810,7 +845,7 @@ static void addMergingFinal(
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(header, num_outputs,
sort_description, merging_params.version_column, max_block_size);
sort_description, merging_params.is_deleted_column, merging_params.version_column, max_block_size, /*out_row_sources_buf_*/ nullptr, /*use_average_block_sizes*/ false, /*cleanup*/ !merging_params.is_deleted_column.empty());
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, num_outputs,
@ -1176,8 +1211,6 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
if (key_condition->alwaysFalse())
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
const auto & select = query_info.query->as<ASTSelectQuery &>();
size_t total_marks_pk = 0;
size_t parts_before_pk = 0;
try
@ -1214,11 +1247,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
auto reader_settings = getMergeTreeReaderSettings(context, query_info);
bool use_skip_indexes = settings.use_skip_indexes;
bool final = false;
if (query_info.table_expression_modifiers)
final = query_info.table_expression_modifiers->hasFinal();
else
final = select.final();
bool final = isFinal(query_info);
if (final && !settings.use_skip_indexes_if_final)
use_skip_indexes = false;
@ -1273,12 +1302,17 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
}
void ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t limit)
bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t limit)
{
/// if dirction is not set, use current one
if (!direction)
direction = getSortDirection();
/// Disable read-in-order optimization for reverse order with final.
/// Otherwise, it can lead to incorrect final behavior because the implementation may rely on the reading in direct order).
if (direction != 1 && isFinal(query_info))
return false;
auto order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, limit);
if (query_info.projection)
query_info.projection->input_order_info = order_info;
@ -1312,6 +1346,8 @@ void ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
output_stream->sort_description = std::move(sort_description);
output_stream->sort_scope = DataStream::SortScope::Stream;
}
return true;
}
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
@ -1358,12 +1394,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
ActionsDAGPtr result_projection;
Names column_names_to_read = std::move(result.column_names_to_read);
const auto & select = query_info.query->as<ASTSelectQuery &>();
bool final = false;
if (query_info.table_expression_modifiers)
final = query_info.table_expression_modifiers->hasFinal();
else
final = select.final();
bool final = isFinal(query_info);
if (!final && result.sampling.use_sampling)
{
@ -1397,6 +1428,8 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
std::vector<String> add_columns = metadata_for_reading->getColumnsRequiredForSortingKey();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
if (!data.merging_params.is_deleted_column.empty())
column_names_to_read.push_back(data.merging_params.is_deleted_column);
if (!data.merging_params.sign_column.empty())
column_names_to_read.push_back(data.merging_params.sign_column);
if (!data.merging_params.version_column.empty())
@ -1672,6 +1705,15 @@ void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const
}
}
bool ReadFromMergeTree::isFinal(const SelectQueryInfo & query_info)
{
if (query_info.table_expression_modifiers)
return query_info.table_expression_modifiers->hasFinal();
const auto & select = query_info.query->as<ASTSelectQuery &>();
return select.final();
}
bool MergeTreeDataSelectAnalysisResult::error() const
{
return std::holds_alternative<std::exception_ptr>(result);

View File

@ -158,7 +158,10 @@ public:
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
const PrewhereInfo * getPrewhereInfo() const { return prewhere_info.get(); }
void requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
/// Returns `false` if requested reading cannot be performed.
bool requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
static bool isFinal(const SelectQueryInfo & query_info);
private:
static MergeTreeDataSelectAnalysisResultPtr selectRangesToReadImpl(

View File

@ -555,7 +555,7 @@ void AggregatingTransform::initGenerate()
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = variants.sizeWithoutOverflowRow();
LOG_DEBUG(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({:.3f} rows/sec., {}/sec.)",
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({:.3f} rows/sec., {}/sec.)",
src_rows, rows, ReadableSize(src_bytes),
elapsed_seconds, src_rows / elapsed_seconds,
ReadableSize(src_bytes / elapsed_seconds));

View File

@ -1415,7 +1415,6 @@ void WindowTransform::work()
assert(prev_frame_start <= frame_start);
const auto first_used_block = std::min(next_output_block_number,
std::min(prev_frame_start.block, current_row.block));
if (first_block_number < first_used_block)
{
// fmt::print(stderr, "will drop blocks from {} to {}\n", first_block_number,
@ -1970,6 +1969,147 @@ struct WindowFunctionRowNumber final : public WindowFunction
}
};
// Usage: ntile(n). n is the number of buckets.
struct WindowFunctionNtile final : public WindowFunction
{
WindowFunctionNtile(const std::string & name_,
const DataTypes & argument_types_, const Array & parameters_)
: WindowFunction(name_, argument_types_, parameters_, std::make_shared<DataTypeUInt64>())
{
if (argument_types.size() != 1)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function {} takes exactly one parameter", name_);
}
auto type_id = argument_types[0]->getTypeId();
if (type_id != TypeIndex::UInt8 && type_id != TypeIndex::UInt16 && type_id != TypeIndex::UInt32 && type_id != TypeIndex::UInt32 && type_id != TypeIndex::UInt64)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ntile's argument type must be an unsigned integer (not larger then 64-bit), but got {}", argument_types[0]->getName());
}
}
bool allocatesMemoryInArena() const override { return false; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
{
if (!buckets) [[unlikely]]
{
checkWindowFrameType(transform);
const auto & current_block = transform->blockAt(transform->current_row);
const auto & workspace = transform->workspaces[function_index];
const auto & arg_col = *current_block.original_input_columns[workspace.argument_column_indices[0]];
if (!isColumnConst(arg_col))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ntile's argument must be a constant");
auto type_id = argument_types[0]->getTypeId();
if (type_id == TypeIndex::UInt8)
buckets = arg_col[transform->current_row.row].get<UInt8>();
else if (type_id == TypeIndex::UInt16)
buckets = arg_col[transform->current_row.row].get<UInt16>();
else if (type_id == TypeIndex::UInt32)
buckets = arg_col[transform->current_row.row].get<UInt32>();
else if (type_id == TypeIndex::UInt64)
buckets = arg_col[transform->current_row.row].get<UInt64>();
if (!buckets)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ntile's argument must > 0");
}
}
// new partition
if (transform->current_row_number == 1) [[unlikely]]
{
current_partition_rows = 0;
current_partition_inserted_row = 0;
start_row = transform->current_row;
}
current_partition_rows++;
// Only do the action when we meet the last row in this partition.
if (!transform->partition_ended)
return;
else
{
auto current_row = transform->current_row;
current_row.row++;
const auto & end_row = transform->partition_end;
if (current_row != end_row)
{
if (current_row.row < transform->blockRowsNumber(current_row))
return;
if (end_row.block != current_row.block + 1 || end_row.row)
{
return;
}
// else, current_row is the last input row.
}
}
auto bucket_capacity = current_partition_rows / buckets;
auto capacity_diff = current_partition_rows - bucket_capacity * buckets;
// bucket number starts from 1.
UInt64 bucket_num = 1;
while (current_partition_inserted_row < current_partition_rows)
{
auto current_bucket_capacity = bucket_capacity;
if (capacity_diff > 0)
{
current_bucket_capacity += 1;
capacity_diff--;
}
auto left_rows = current_bucket_capacity;
while (left_rows)
{
auto available_block_rows = transform->blockRowsNumber(start_row) - start_row.row;
IColumn & to = *transform->blockAt(start_row).output_columns[function_index];
auto & pod_array = assert_cast<ColumnUInt64 &>(to).getData();
if (left_rows < available_block_rows)
{
pod_array.resize_fill(pod_array.size() + left_rows, bucket_num);
start_row.row += left_rows;
left_rows = 0;
}
else
{
pod_array.resize_fill(pod_array.size() + available_block_rows, bucket_num);
left_rows -= available_block_rows;
start_row.block++;
start_row.row = 0;
}
}
current_partition_inserted_row += current_bucket_capacity;
bucket_num += 1;
}
}
private:
UInt64 buckets = 0;
RowNumber start_row;
UInt64 current_partition_rows = 0;
UInt64 current_partition_inserted_row = 0;
static void checkWindowFrameType(const WindowTransform * transform)
{
if (transform->order_by_indices.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ntile's window frame must have order by clause");
if (transform->window_description.frame.type != WindowFrame::FrameType::ROWS)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ntile's frame type must be ROWS");
}
if (transform->window_description.frame.begin_type != WindowFrame::BoundaryType::Unbounded)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ntile's frame start type must be UNBOUNDED PRECEDING");
}
if (transform->window_description.frame.end_type != WindowFrame::BoundaryType::Unbounded)
{
// We must wait all for the partition end and get the total rows number in this
// partition. So before the end of this partition, there is no any block could be
// dropped out.
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ntile's frame end type must be UNBOUNDED FOLLOWING");
}
}
};
// ClickHouse-specific variant of lag/lead that respects the window frame.
template <bool is_lead>
struct WindowFunctionLagLeadInFrame final : public WindowFunction
@ -2338,6 +2478,13 @@ void registerWindowFunctions(AggregateFunctionFactory & factory)
parameters);
}, properties}, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("ntile", {[](const std::string & name,
const DataTypes & argument_types, const Array & parameters, const Settings *)
{
return std::make_shared<WindowFunctionNtile>(name, argument_types,
parameters);
}, properties}, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("nth_value", {[](const std::string & name,
const DataTypes & argument_types, const Array & parameters, const Settings *)
{

View File

@ -558,6 +558,13 @@ void HTTPHandler::processQuery(
auto client_info = session->getClientInfo();
auto context = session->makeQueryContext(std::move(client_info));
/// This parameter is used to tune the behavior of output formats (such as Native) for compatibility.
if (params.has("client_protocol_version"))
{
UInt64 version_param = parse<UInt64>(params.get("client_protocol_version"));
context->setClientProtocolVersion(version_param);
}
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
CompressionMethod http_response_compression_method = CompressionMethod::None;
@ -663,7 +670,7 @@ void HTTPHandler::processQuery(
std::unique_ptr<ReadBuffer> in;
static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace",
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check"};
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version"};
Names reserved_param_suffixes;

View File

@ -1,5 +1,4 @@
#include <algorithm>
#include <iomanip>
#include <iterator>
#include <memory>
#include <mutex>
@ -24,7 +23,6 @@
#include <IO/LimitReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Interpreters/executeQuery.h>
@ -39,9 +37,7 @@
#include <Core/ExternalTable.h>
#include <Access/AccessControl.h>
#include <Access/Credentials.h>
#include <Storages/ColumnDefault.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeEnum.h>
#include <Compression/CompressionFactory.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>

View File

@ -135,6 +135,10 @@ public:
/// Returns true if the storage supports queries with the PREWHERE section.
virtual bool supportsPrewhere() const { return false; }
/// Returns which columns supports PREWHERE, or empty std::nullopt if all columns is supported.
/// This is needed for engines whose aggregates data from multiple tables, like Merge.
virtual std::optional<NameSet> supportedPrewhereColumns() const { return std::nullopt; }
/// Returns true if the storage supports optimization of moving conditions to PREWHERE section.
virtual bool canMoveConditionsToPrewhere() const { return supportsPrewhere(); }
@ -481,6 +485,7 @@ public:
bool /*final*/,
bool /*deduplicate*/,
const Names & /* deduplicate_by_columns */,
bool /*cleanup*/,
ContextPtr /*context*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method optimize is not supported by storage {}", getName());

View File

@ -0,0 +1,29 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Core/Block.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
struct MergeTreeReadTask;
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
class IMergeTreeReadPool : private boost::noncopyable
{
public:
virtual ~IMergeTreeReadPool() = default;
virtual Block getHeader() const = 0;
virtual MergeTreeReadTaskPtr getTask(size_t thread) = 0;
virtual void profileFeedback(ReadBufferFromFileBase::ProfileInfo info) = 0;
};
using MergeTreeReadPoolPtr = std::shared_ptr<IMergeTreeReadPool>;
}

View File

@ -279,6 +279,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
reserved_space,
entry.deduplicate,
entry.deduplicate_by_columns,
entry.cleanup,
storage.merging_params,
NO_TRANSACTION_PTR);

View File

@ -107,6 +107,7 @@ void MergePlainMergeTreeTask::prepare()
merge_mutate_entry->tagger->reserved_space,
deduplicate,
deduplicate_by_columns,
cleanup,
storage.merging_params,
txn);
}

View File

@ -19,6 +19,7 @@ public:
StorageMetadataPtr metadata_snapshot_,
bool deduplicate_,
Names deduplicate_by_columns_,
bool cleanup_,
MergeMutateSelectedEntryPtr merge_mutate_entry_,
TableLockHolder table_lock_holder_,
IExecutableTask::TaskResultCallback & task_result_callback_)
@ -26,6 +27,7 @@ public:
, metadata_snapshot(std::move(metadata_snapshot_))
, deduplicate(deduplicate_)
, deduplicate_by_columns(std::move(deduplicate_by_columns_))
, cleanup(cleanup_)
, merge_mutate_entry(std::move(merge_mutate_entry_))
, table_lock_holder(std::move(table_lock_holder_))
, task_result_callback(task_result_callback_)
@ -66,6 +68,7 @@ private:
StorageMetadataPtr metadata_snapshot;
bool deduplicate;
Names deduplicate_by_columns;
bool cleanup;
MergeMutateSelectedEntryPtr merge_mutate_entry{nullptr};
TableLockHolder table_lock_holder;
FutureMergedMutatedPartPtr future_part{nullptr};

View File

@ -66,7 +66,10 @@ static void extractMergingAndGatheringColumns(
/// Force version column for Replacing mode
if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
{
key_columns.emplace(merging_params.is_deleted_column);
key_columns.emplace(merging_params.version_column);
}
/// Force sign column for VersionedCollapsing mode. Version is already in primary key.
if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
@ -673,6 +676,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
global_ctx->space_reservation,
global_ctx->deduplicate,
global_ctx->deduplicate_by_columns,
global_ctx->cleanup,
projection_merging_params,
global_ctx->need_prefix,
global_ctx->new_data_part.get(),
@ -907,8 +911,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
case MergeTreeData::MergingParams::Replacing:
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.version_column,
merge_block_size, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column,
merge_block_size, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
(data_settings->clean_deleted_rows != CleanDeletedRows::Never) || global_ctx->cleanup);
break;
case MergeTreeData::MergingParams::Graphite:

View File

@ -58,6 +58,7 @@ public:
ReservationSharedPtr space_reservation_,
bool deduplicate_,
Names deduplicate_by_columns_,
bool cleanup_,
MergeTreeData::MergingParams merging_params_,
bool need_prefix,
IMergeTreeDataPart * parent_part_,
@ -81,6 +82,7 @@ public:
global_ctx->space_reservation = std::move(space_reservation_);
global_ctx->deduplicate = std::move(deduplicate_);
global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_);
global_ctx->cleanup = std::move(cleanup_);
global_ctx->parent_part = std::move(parent_part_);
global_ctx->data = std::move(data_);
global_ctx->mutator = std::move(mutator_);
@ -142,6 +144,7 @@ private:
ReservationSharedPtr space_reservation{nullptr};
bool deduplicate{false};
Names deduplicate_by_columns{};
bool cleanup{false};
NamesAndTypesList gathering_columns{};
NamesAndTypesList merging_columns{};

View File

@ -268,7 +268,8 @@ void MergeTreeBackgroundExecutor<Queue>::threadFunction()
}
template class MergeTreeBackgroundExecutor<MergeMutateRuntimeQueue>;
template class MergeTreeBackgroundExecutor<OrdinaryRuntimeQueue>;
template class MergeTreeBackgroundExecutor<RoundRobinRuntimeQueue>;
template class MergeTreeBackgroundExecutor<PriorityRuntimeQueue>;
template class MergeTreeBackgroundExecutor<DynamicRuntimeQueue>;
}

View File

@ -6,7 +6,9 @@
#include <future>
#include <condition_variable>
#include <set>
#include <iostream>
#include <variant>
#include <utility>
#include <boost/circular_buffer.hpp>
#include <boost/noncopyable.hpp>
@ -17,6 +19,7 @@
#include <base/defines.h>
#include <Storages/MergeTree/IExecutableTask.h>
namespace DB
{
namespace ErrorCodes
@ -67,8 +70,8 @@ struct TaskRuntimeData
}
};
class OrdinaryRuntimeQueue
/// Simplest First-in-First-out queue, ignores priority.
class RoundRobinRuntimeQueue
{
public:
TaskRuntimeDataPtr pop()
@ -78,24 +81,29 @@ public:
return result;
}
void push(TaskRuntimeDataPtr item) { queue.push_back(std::move(item));}
void push(TaskRuntimeDataPtr item)
{
queue.push_back(std::move(item));
}
void remove(StorageID id)
{
auto it = std::remove_if(queue.begin(), queue.end(),
[&] (auto item) -> bool { return item->task->getStorageID() == id; });
[&] (auto && item) -> bool { return item->task->getStorageID() == id; });
queue.erase(it, queue.end());
}
void setCapacity(size_t count) { queue.set_capacity(count); }
bool empty() { return queue.empty(); }
static constexpr std::string_view name = "round_robin";
private:
boost::circular_buffer<TaskRuntimeDataPtr> queue{0};
};
/// Uses a heap to pop a task with minimal priority
class MergeMutateRuntimeQueue
/// Uses a heap to pop a task with minimal priority.
class PriorityRuntimeQueue
{
public:
TaskRuntimeDataPtr pop()
@ -115,20 +123,89 @@ public:
void remove(StorageID id)
{
auto it = std::remove_if(buffer.begin(), buffer.end(),
[&] (auto item) -> bool { return item->task->getStorageID() == id; });
buffer.erase(it, buffer.end());
std::erase_if(buffer, [&] (auto && item) -> bool { return item->task->getStorageID() == id; });
std::make_heap(buffer.begin(), buffer.end(), TaskRuntimeData::comparePtrByPriority);
}
void setCapacity(size_t count) { buffer.reserve(count); }
bool empty() { return buffer.empty(); }
static constexpr std::string_view name = "shortest_task_first";
private:
std::vector<TaskRuntimeDataPtr> buffer{};
std::vector<TaskRuntimeDataPtr> buffer;
};
/// Queue that can dynamically change scheduling policy
template <class ... Policies>
class DynamicRuntimeQueueImpl
{
public:
TaskRuntimeDataPtr pop()
{
return std::visit<TaskRuntimeDataPtr>([&] (auto && queue) { return queue.pop(); }, impl);
}
void push(TaskRuntimeDataPtr item)
{
std::visit([&] (auto && queue) { queue.push(std::move(item)); }, impl);
}
void remove(StorageID id)
{
std::visit([&] (auto && queue) { queue.remove(id); }, impl);
}
void setCapacity(size_t count)
{
capacity = count;
std::visit([&] (auto && queue) { queue.setCapacity(count); }, impl);
}
bool empty()
{
return std::visit<bool>([&] (auto && queue) { return queue.empty(); }, impl);
}
// Change policy. It does nothing if new policy is unknown or equals current policy.
void updatePolicy(std::string_view name)
{
// We use this double lambda trick to generate code for all possible pairs of types of old and new queue.
// If types are different it moves tasks from old queue to new one using corresponding pop() and push()
resolve<Policies...>(name, [&] <class NewQueue> (std::in_place_type_t<NewQueue>)
{
std::visit([&] (auto && queue)
{
if constexpr (std::is_same_v<std::decay_t<decltype(queue)>, NewQueue>)
return; // The same policy
NewQueue new_queue;
new_queue.setCapacity(capacity);
while (!queue.empty())
new_queue.push(queue.pop());
impl = std::move(new_queue);
}, impl);
});
}
private:
// Find policy with specified `name` and call `func()` if found.
// Tag `std::in_place_type_t<T>` used to help templated lambda to deduce type T w/o creating its instance
template <class T, class ... Ts, class Func>
void resolve(std::string_view name, Func && func)
{
if (T::name == name)
return func(std::in_place_type<T>);
if constexpr (sizeof...(Ts))
return resolve<Ts...>(name, std::forward<Func>(func));
}
std::variant<Policies...> impl;
size_t capacity;
};
// Avoid typedef and alias to facilitate forward declaration
class DynamicRuntimeQueue : public DynamicRuntimeQueueImpl<RoundRobinRuntimeQueue, PriorityRuntimeQueue> {};
/**
* Executor for a background MergeTree related operations such as merges, mutations, fetches and so on.
* It can execute only successors of ExecutableTask interface.
@ -149,13 +226,18 @@ private:
* |s|
*
* Each task is simply a sequence of steps. Heavier tasks have longer sequences.
* When a step of a task is executed, we move tasks to pending queue. And take another from the queue's head.
* With these architecture all small merges / mutations will be executed faster, than bigger ones.
* When a step of a task is executed, we move tasks to pending queue. And take the next task from pending queue.
* Next task is chosen from pending tasks using one of the scheduling policies (class Queue):
* 1) RoundRobinRuntimeQueue. Uses boost::circular_buffer as FIFO-queue. Next task is taken from queue's head and after one step
* enqueued into queue's tail. With this architecture all merges / mutations are fairly scheduled and never starved.
* All decisions regarding priorities are left to components creating tasks (e.g. SimpleMergeSelector).
* 2) PriorityRuntimeQueue. Uses heap to select task with smallest priority value.
* With this architecture all small merges / mutations will be executed faster, than bigger ones.
* WARNING: Starvation is possible in case of overload.
*
* We use boost::circular_buffer as a container for queues not to do any allocations.
*
* Another nuisance that we faces with is than background operations always interact with an associated Storage.
* So, when a Storage want to shutdown, it must wait until all its background operations are finished.
* We use boost::circular_buffer as a container for active queue to avoid allocations.
* Another nuisance that we face is that background operations always interact with an associated Storage.
* So, when a Storage wants to shutdown, it must wait until all its background operations are finished.
*/
template <class Queue>
class MergeTreeBackgroundExecutor final : boost::noncopyable
@ -185,6 +267,18 @@ public:
pool.scheduleOrThrowOnError([this] { threadFunction(); });
}
MergeTreeBackgroundExecutor(
String name_,
size_t threads_count_,
size_t max_tasks_count_,
CurrentMetrics::Metric metric_,
std::string_view policy)
requires requires(Queue queue) { queue.updatePolicy(policy); } // Because we use explicit template instantiation
: MergeTreeBackgroundExecutor(name_, threads_count_, max_tasks_count_, metric_)
{
pending.updatePolicy(policy);
}
~MergeTreeBackgroundExecutor()
{
wait();
@ -204,6 +298,14 @@ public:
void removeTasksCorrespondingToStorage(StorageID id);
void wait();
/// Update scheduling policy for pending tasks. It does nothing if `new_policy` is the same or unknown.
void updateSchedulingPolicy(std::string_view new_policy)
requires requires(Queue queue) { queue.updatePolicy(new_policy); } // Because we use explicit template instantiation
{
std::lock_guard lock(mutex);
pending.updatePolicy(new_policy);
}
private:
String name;
size_t threads_count TSA_GUARDED_BY(mutex) = 0;
@ -225,10 +327,8 @@ private:
Poco::Logger * log = &Poco::Logger::get("MergeTreeBackgroundExecutor");
};
extern template class MergeTreeBackgroundExecutor<MergeMutateRuntimeQueue>;
extern template class MergeTreeBackgroundExecutor<OrdinaryRuntimeQueue>;
using MergeMutateBackgroundExecutor = MergeTreeBackgroundExecutor<MergeMutateRuntimeQueue>;
using OrdinaryBackgroundExecutor = MergeTreeBackgroundExecutor<OrdinaryRuntimeQueue>;
extern template class MergeTreeBackgroundExecutor<RoundRobinRuntimeQueue>;
extern template class MergeTreeBackgroundExecutor<PriorityRuntimeQueue>;
extern template class MergeTreeBackgroundExecutor<DynamicRuntimeQueue>;
}

View File

@ -719,6 +719,10 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
{
const auto columns = metadata.getColumns().getAllPhysical();
if (!is_deleted_column.empty() && mode != MergingParams::Replacing)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"is_deleted column for MergeTree cannot be specified in modes except Replacing.");
if (!sign_column.empty() && mode != MergingParams::Collapsing && mode != MergingParams::VersionedCollapsing)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Sign column for MergeTree cannot be specified "
@ -788,6 +792,41 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "Version column {} does not exist in table declaration.", version_column);
};
/// Check that if the is_deleted column is needed, it exists and is of type UInt8. If exist, version column must be defined too but version checks are not done here.
auto check_is_deleted_column = [this, & columns](bool is_optional, const std::string & storage)
{
if (is_deleted_column.empty())
{
if (is_optional)
return;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: is_deleted ({}) column for storage {} is empty", is_deleted_column, storage);
}
else
{
if (version_column.empty() && !is_optional)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: Version column ({}) for storage {} is empty while is_deleted ({}) is not.",
version_column, storage, is_deleted_column);
bool miss_is_deleted_column = true;
for (const auto & column : columns)
{
if (column.name == is_deleted_column)
{
if (!typeid_cast<const DataTypeUInt8 *>(column.type.get()))
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "is_deleted column ({}) for storage {} must have type UInt8. Provided column of type {}.",
is_deleted_column, storage, column.type->getName());
miss_is_deleted_column = false;
break;
}
}
if (miss_is_deleted_column)
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "is_deleted column {} does not exist in table declaration.", is_deleted_column);
}
};
if (mode == MergingParams::Collapsing)
check_sign_column(false, "CollapsingMergeTree");
@ -823,7 +862,10 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
}
if (mode == MergingParams::Replacing)
{
check_is_deleted_column(true, "ReplacingMergeTree");
check_version_column(true, "ReplacingMergeTree");
}
if (mode == MergingParams::VersionedCollapsing)
{
@ -1190,11 +1232,10 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
if (!res.part->version.creation_csn)
{
auto min = TransactionLog::getCSN(res.part->version.creation_tid);
auto min = TransactionLog::getCSNAndAssert(res.part->version.creation_tid, res.part->version.creation_csn);
if (!min)
{
/// Transaction that created this part was not committed. Remove part.
TransactionLog::assertTIDIsNotOutdated(res.part->version.creation_tid);
min = Tx::RolledBackCSN;
}
@ -1207,7 +1248,7 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
if (!version.removal_tid.isEmpty() && !version.removal_csn)
{
auto max = TransactionLog::getCSN(version.removal_tid);
auto max = TransactionLog::getCSNAndAssert(version.removal_tid, version.removal_csn);
if (max)
{
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has removal_tid={}, setting removal_csn={}",
@ -1216,7 +1257,6 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
}
else
{
TransactionLog::assertTIDIsNotOutdated(version.removal_tid);
/// Transaction that tried to remove this part was not committed. Clear removal_tid.
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: clearing removal_tid={}",
res.part->name, version.removal_tid);

View File

@ -334,6 +334,9 @@ public:
/// For Collapsing and VersionedCollapsing mode.
String sign_column;
/// For Replacing mode. Can be empty for Replacing.
String is_deleted_column;
/// For Summing mode. If empty - columns_to_sum is determined automatically.
Names columns_to_sum;

View File

@ -525,6 +525,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
ReservationSharedPtr space_reservation,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeData::MergingParams & merging_params,
const MergeTreeTransactionPtr & txn,
bool need_prefix,
@ -541,6 +542,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
space_reservation,
deduplicate,
deduplicate_by_columns,
cleanup,
merging_params,
need_prefix,
parent_part,

View File

@ -111,6 +111,7 @@ public:
ReservationSharedPtr space_reservation,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
const MergeTreeData::MergingParams & merging_params,
const MergeTreeTransactionPtr & txn,
bool need_prefix = true,

View File

@ -281,7 +281,7 @@ Block MergeTreeDataWriter::mergeBlock(
return nullptr;
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedAlgorithm>(
block, 1, sort_description, merging_params.version_column, block_size + 1);
block, 1, sort_description, merging_params.is_deleted_column, merging_params.version_column, block_size + 1);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedAlgorithm>(
block, 1, sort_description, merging_params.sign_column,

View File

@ -39,16 +39,7 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
bool use_uncompressed_cache_,
bool is_remote_read_,
const MergeTreeSettings & storage_settings_)
: IMergeTreeReadPool(
storage_snapshot_,
column_names_,
virtual_column_names_,
min_marks_for_concurrent_read_,
prewhere_info_,
parts_,
(preferred_block_size_bytes_ > 0),
/*do_not_steal_tasks_*/false)
, WithContext(context_)
: WithContext(context_)
, log(&Poco::Logger::get("MergeTreePrefetchedReadPool(" + (parts_.empty() ? "" : parts_.front().data_part->storage.getStorageID().getNameForLogs()) + ")"))
, header(storage_snapshot_->getSampleBlockForColumns(column_names_))
, mark_cache(context_->getGlobalContext()->getMarkCache().get())
@ -57,6 +48,10 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
, profile_callback([this](ReadBufferFromFileBase::ProfileInfo info_) { profileFeedback(info_); })
, index_granularity_bytes(storage_settings_.index_granularity_bytes)
, fixed_index_granularity(storage_settings_.index_granularity)
, storage_snapshot(storage_snapshot_)
, column_names(column_names_)
, virtual_column_names(virtual_column_names_)
, prewhere_info(prewhere_info_)
, is_remote_read(is_remote_read_)
, prefetch_threadpool(getContext()->getPrefetchThreadpool())
{

View File

@ -84,12 +84,20 @@ private:
ReadBufferFromFileBase::ProfileCallback profile_callback;
size_t index_granularity_bytes;
size_t fixed_index_granularity;
StorageSnapshotPtr storage_snapshot;
const Names column_names;
const Names virtual_column_names;
PrewhereInfoPtr prewhere_info;
RangesInDataParts parts_ranges;
[[ maybe_unused ]] const bool is_remote_read;
ThreadPool & prefetch_threadpool;
PartsInfos parts_infos;
ThreadsTasks threads_tasks;
std::mutex mutex;
struct TaskHolder
{

View File

@ -20,7 +20,47 @@ namespace ErrorCodes
namespace DB
{
std::vector<size_t> IMergeTreeReadPool::fillPerPartInfo(const RangesInDataParts & parts)
MergeTreeReadPool::MergeTreeReadPool(
size_t threads_,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
const Names & virtual_column_names_,
ContextPtr context_,
bool do_not_steal_tasks_)
: storage_snapshot(storage_snapshot_)
, column_names(column_names_)
, virtual_column_names(virtual_column_names_)
, min_marks_for_concurrent_read(min_marks_for_concurrent_read_)
, prewhere_info(prewhere_info_)
, parts_ranges(std::move(parts_))
, predict_block_size_bytes(context_->getSettingsRef().preferred_block_size_bytes > 0)
, do_not_steal_tasks(do_not_steal_tasks_)
, backoff_settings{context_->getSettingsRef()}
, backoff_state{threads_}
{
/// parts don't contain duplicate MergeTreeDataPart's.
const auto per_part_sum_marks = fillPerPartInfo(
parts_ranges, storage_snapshot, is_part_on_remote_disk,
do_not_steal_tasks, predict_block_size_bytes,
column_names, virtual_column_names, prewhere_info, per_part_params);
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges);
}
std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
const RangesInDataParts & parts,
const StorageSnapshotPtr & storage_snapshot,
std::vector<bool> & is_part_on_remote_disk,
bool & do_not_steal_tasks,
bool & predict_block_size_bytes,
const Names & column_names,
const Names & virtual_column_names,
const PrewhereInfoPtr & prewhere_info,
std::vector<MergeTreeReadPool::PerPartParams> & per_part_params)
{
std::vector<size_t> per_part_sum_marks;
Block sample_block = storage_snapshot->metadata->getSampleBlock();
@ -65,35 +105,6 @@ std::vector<size_t> IMergeTreeReadPool::fillPerPartInfo(const RangesInDataParts
return per_part_sum_marks;
}
MergeTreeReadPool::MergeTreeReadPool(
size_t threads_,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
const Names & virtual_column_names_,
const BackoffSettings & backoff_settings_,
size_t preferred_block_size_bytes_,
bool do_not_steal_tasks_)
: IMergeTreeReadPool(
storage_snapshot_,
column_names_,
virtual_column_names_,
min_marks_for_concurrent_read_,
prewhere_info_,
std::move(parts_),
(preferred_block_size_bytes_ > 0),
do_not_steal_tasks_)
, backoff_settings{backoff_settings_}
, backoff_state{threads_}
{
/// parts don't contain duplicate MergeTreeDataPart's.
const auto per_part_sum_marks = fillPerPartInfo(parts_ranges);
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges);
}
MergeTreeReadTaskPtr MergeTreeReadPool::getTask(size_t thread)
{

View File

@ -6,6 +6,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <Storages/MergeTree/IMergeTreeReadPool.h>
#include <Storages/SelectQueryInfo.h>
#include <mutex>
@ -14,77 +15,42 @@
namespace DB
{
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
class IMergeTreeReadPool
{
public:
IMergeTreeReadPool(
StorageSnapshotPtr storage_snapshot_,
Names column_names_,
Names virtual_column_names_,
size_t min_marks_for_concurrent_read_,
PrewhereInfoPtr prewhere_info_,
RangesInDataParts parts_ranges_,
bool predict_block_size_bytes_,
bool do_not_steal_tasks_)
: storage_snapshot(storage_snapshot_)
, column_names(column_names_)
, virtual_column_names(virtual_column_names_)
, min_marks_for_concurrent_read(min_marks_for_concurrent_read_)
, prewhere_info(prewhere_info_)
, parts_ranges(parts_ranges_)
, predict_block_size_bytes(predict_block_size_bytes_)
, do_not_steal_tasks(do_not_steal_tasks_)
{}
virtual MergeTreeReadTaskPtr getTask(size_t thread) = 0;
virtual Block getHeader() const = 0;
virtual void profileFeedback(ReadBufferFromFileBase::ProfileInfo info) = 0;
virtual ~IMergeTreeReadPool() = default;
protected:
std::vector<size_t> fillPerPartInfo(const RangesInDataParts & parts);
/// Initialized in constructor
StorageSnapshotPtr storage_snapshot;
const Names column_names;
const Names virtual_column_names;
size_t min_marks_for_concurrent_read{0};
PrewhereInfoPtr prewhere_info;
RangesInDataParts parts_ranges;
bool predict_block_size_bytes;
bool do_not_steal_tasks;
struct PerPartParams
{
MergeTreeReadTaskColumns task_columns;
NameSet column_name_set;
MergeTreeBlockSizePredictorPtr size_predictor;
RangesInDataPart data_part;
};
std::vector<PerPartParams> per_part_params;
std::vector<bool> is_part_on_remote_disk;
mutable std::mutex mutex;
};
using IMergeTreeReadPoolPtr = std::shared_ptr<IMergeTreeReadPool>;
/** Provides read tasks for MergeTreeThreadSelectProcessor`s in fine-grained batches, allowing for more
* uniform distribution of work amongst multiple threads. All parts and their ranges are divided into `threads`
* workloads with at most `sum_marks / threads` marks. Then, threads are performing reads from these workloads
* in "sequential" manner, requesting work in small batches. As soon as some thread has exhausted
* it's workload, it either is signaled that no more work is available (`do_not_steal_tasks == false`) or
* continues taking small batches from other threads' workloads (`do_not_steal_tasks == true`).
/** Provides read tasks for MergeTreeThreadSelectProcessor`s in fine-grained batches, allowing for more
* uniform distribution of work amongst multiple threads. All parts and their ranges are divided into `threads`
* workloads with at most `sum_marks / threads` marks. Then, threads are performing reads from these workloads
* in "sequential" manner, requesting work in small batches. As soon as some thread has exhausted
* it's workload, it either is signaled that no more work is available (`do_not_steal_tasks == false`) or
* continues taking small batches from other threads' workloads (`do_not_steal_tasks == true`).
*/
class MergeTreeReadPool final: public IMergeTreeReadPool, private boost::noncopyable
class MergeTreeReadPool : public IMergeTreeReadPool
{
public:
struct BackoffSettings;
MergeTreeReadPool(
size_t threads_,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
const Names & virtual_column_names_,
ContextPtr context_,
bool do_not_steal_tasks_ = false);
~MergeTreeReadPool() override = default;
MergeTreeReadTaskPtr getTask(size_t thread) override;
/** Each worker could call this method and pass information about read performance.
* If read performance is too low, pool could decide to lower number of threads: do not assign more tasks to several threads.
* This allows to overcome excessive load to disk subsystem, when reads are not from page cache.
*/
void profileFeedback(ReadBufferFromFileBase::ProfileInfo info) override;
Block getHeader() const override;
/** Pull could dynamically lower (backoff) number of threads, if read operation are too slow.
* Settings for that backoff.
*/
@ -107,46 +73,51 @@ public:
max_throughput(settings.read_backoff_max_throughput),
min_interval_between_events_ms(settings.read_backoff_min_interval_between_events_ms.totalMilliseconds()),
min_events(settings.read_backoff_min_events),
min_concurrency(settings.read_backoff_min_concurrency)
{
}
min_concurrency(settings.read_backoff_min_concurrency) {}
BackoffSettings() : min_read_latency_ms(0) {}
};
BackoffSettings backoff_settings;
struct PerPartParams
{
MergeTreeReadTaskColumns task_columns;
NameSet column_name_set;
MergeTreeBlockSizePredictorPtr size_predictor;
RangesInDataPart data_part;
};
MergeTreeReadPool(
size_t threads_,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
const Names & virtual_column_names_,
const BackoffSettings & backoff_settings_,
size_t preferred_block_size_bytes_,
bool do_not_steal_tasks_ = false);
~MergeTreeReadPool() override = default;
MergeTreeReadTaskPtr getTask(size_t thread) override;
/** Each worker could call this method and pass information about read performance.
* If read performance is too low, pool could decide to lower number of threads: do not assign more tasks to several threads.
* This allows to overcome excessive load to disk subsystem, when reads are not from page cache.
*/
void profileFeedback(ReadBufferFromFileBase::ProfileInfo info) override;
Block getHeader() const override;
static std::vector<size_t> fillPerPartInfo(
const RangesInDataParts & parts,
const StorageSnapshotPtr & storage_snapshot,
std::vector<bool> & is_part_on_remote_disk,
bool & do_not_steal_tasks,
bool & predict_block_size_bytes,
const Names & column_names,
const Names & virtual_column_names,
const PrewhereInfoPtr & prewhere_info,
std::vector<MergeTreeReadPool::PerPartParams> & per_part_params);
private:
void fillPerThreadInfo(
size_t threads, size_t sum_marks, std::vector<size_t> per_part_sum_marks,
const RangesInDataParts & parts);
/// Initialized in constructor
StorageSnapshotPtr storage_snapshot;
const Names column_names;
const Names virtual_column_names;
size_t min_marks_for_concurrent_read{0};
PrewhereInfoPtr prewhere_info;
RangesInDataParts parts_ranges;
bool predict_block_size_bytes;
bool do_not_steal_tasks;
std::vector<PerPartParams> per_part_params;
std::vector<bool> is_part_on_remote_disk;
BackoffSettings backoff_settings;
mutable std::mutex mutex;
/// State to track numbers of slow reads.
struct BackoffState
{
@ -156,7 +127,6 @@ private:
explicit BackoffState(size_t threads) : current_threads(threads) {}
};
BackoffState backoff_state;
struct Part
@ -185,9 +155,7 @@ private:
};
using MergeTreeReadPoolPtr = std::shared_ptr<MergeTreeReadPool>;
class MergeTreeReadPoolParallelReplicas : public IMergeTreeReadPool, private boost::noncopyable
class MergeTreeReadPoolParallelReplicas : public IMergeTreeReadPool
{
public:
@ -199,21 +167,19 @@ public:
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
const Names & virtual_column_names_,
size_t min_marks_for_concurrent_read_
)
: IMergeTreeReadPool(
storage_snapshot_,
column_names_,
virtual_column_names_,
min_marks_for_concurrent_read_,
prewhere_info_,
parts_,
/*predict_block_size*/false,
/*do_not_steal_tasks*/false)
, extension(extension_)
, threads(threads_)
size_t min_marks_for_concurrent_read_)
: extension(extension_)
, threads(threads_)
, prewhere_info(prewhere_info_)
, storage_snapshot(storage_snapshot_)
, min_marks_for_concurrent_read(min_marks_for_concurrent_read_)
, column_names(column_names_)
, virtual_column_names(virtual_column_names_)
, parts_ranges(std::move(parts_))
{
fillPerPartInfo(parts_ranges);
MergeTreeReadPool::fillPerPartInfo(
parts_ranges, storage_snapshot, is_part_on_remote_disk, do_not_steal_tasks,
predict_block_size_bytes, column_names, virtual_column_names, prewhere_info, per_part_params);
extension.all_callback({
.description = parts_ranges.getDescriptions(),
@ -223,8 +189,10 @@ public:
~MergeTreeReadPoolParallelReplicas() override;
MergeTreeReadTaskPtr getTask(size_t thread) override;
Block getHeader() const override;
MergeTreeReadTaskPtr getTask(size_t thread) override;
void profileFeedback(ReadBufferFromFileBase::ProfileInfo) override {}
private:
@ -234,6 +202,20 @@ private:
size_t threads;
bool no_more_tasks_available{false};
Poco::Logger * log = &Poco::Logger::get("MergeTreeReadPoolParallelReplicas");
std::mutex mutex;
PrewhereInfoPtr prewhere_info;
StorageSnapshotPtr storage_snapshot;
size_t min_marks_for_concurrent_read;
const Names column_names;
const Names virtual_column_names;
RangesInDataParts parts_ranges;
bool do_not_steal_tasks = false;
bool predict_block_size_bytes = false;
std::vector<bool> is_part_on_remote_disk;
std::vector<MergeTreeReadPool::PerPartParams> per_part_params;
};
using MergeTreeReadPoolParallelReplicasPtr = std::shared_ptr<MergeTreeReadPoolParallelReplicas>;
@ -247,10 +229,10 @@ public:
ParallelReadingExtension extension_,
CoordinationMode mode_,
size_t min_marks_for_concurrent_read_)
: parts_ranges(parts_)
, extension(extension_)
, mode(mode_)
, min_marks_for_concurrent_read(min_marks_for_concurrent_read_)
: parts_ranges(parts_)
, extension(extension_)
, mode(mode_)
, min_marks_for_concurrent_read(min_marks_for_concurrent_read_)
{
for (const auto & part : parts_ranges)
request.push_back({part.data_part->info, MarkRanges{}});
@ -266,6 +248,7 @@ public:
MarkRanges getNewTask(RangesInDataPartDescription description);
RangesInDataParts parts_ranges;
ParallelReadingExtension extension;
CoordinationMode mode;

Some files were not shown because too many files have changed in this diff Show More