Merge remote-tracking branch 'origin/master' into igor/insert_zk_retries_retry

This commit is contained in:
Igor Nikonov 2022-11-08 20:26:50 +00:00
commit 345304abe0
218 changed files with 3812 additions and 2853 deletions

View File

@ -13,6 +13,8 @@ assignees: ''
> A clear and concise description of what works not as it is supposed to.
> A link to reproducer in [https://fiddle.clickhouse.com/](https://fiddle.clickhouse.com/).
**Does it reproduce on recent release?**
[The list of releases](https://github.com/ClickHouse/ClickHouse/blob/master/utils/list-versions/version_date.tsv)

View File

@ -2,7 +2,7 @@
name: Debug
'on':
[push, pull_request, release, workflow_dispatch]
[push, pull_request, release, workflow_dispatch, workflow_call]
jobs:
DebugInfo:

View File

@ -1056,6 +1056,23 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
MarkReleaseReady:
needs:
- BuilderBinDarwin
- BuilderBinDarwinAarch64
- BuilderDebRelease
- BuilderDebAarch64
runs-on: [self-hosted, style-checker]
steps:
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Mark Commit Release Ready
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 mark_release_ready.py
##############################################################################################
########################### FUNCTIONAl STATELESS TESTS #######################################
##############################################################################################
@ -3069,6 +3086,8 @@ jobs:
needs:
- DockerHubPush
- BuilderReport
- BuilderSpecialReport
- MarkReleaseReady
- FunctionalStatelessTestDebug0
- FunctionalStatelessTestDebug1
- FunctionalStatelessTestDebug2

View File

@ -10,6 +10,9 @@ env:
workflow_dispatch:
jobs:
Debug:
# The task for having a preserved ENV and event.json for later investigation
uses: ./.github/workflows/debug.yml
DockerHubPushAarch64:
runs-on: [self-hosted, style-checker-aarch64]
steps:

View File

@ -3579,6 +3579,7 @@ jobs:
- DockerServerImages
- CheckLabels
- BuilderReport
- BuilderSpecialReport
- FastTest
- FunctionalStatelessTestDebug0
- FunctionalStatelessTestDebug1

View File

@ -615,6 +615,23 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
MarkReleaseReady:
needs:
- BuilderBinDarwin
- BuilderBinDarwinAarch64
- BuilderDebRelease
- BuilderDebAarch64
runs-on: [self-hosted, style-checker]
steps:
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Mark Commit Release Ready
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 mark_release_ready.py
##############################################################################################
########################### FUNCTIONAl STATELESS TESTS #######################################
##############################################################################################
@ -1888,6 +1905,7 @@ jobs:
- DockerServerImages
- BuilderReport
- BuilderSpecialReport
- MarkReleaseReady
- FunctionalStatelessTestDebug0
- FunctionalStatelessTestDebug1
- FunctionalStatelessTestDebug2

View File

@ -202,7 +202,7 @@ option(ADD_GDB_INDEX_FOR_GOLD "Add .gdb-index to resulting binaries for gold lin
if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
# Can be lld or ld-lld or lld-13 or /path/to/lld.
if (LINKER_NAME MATCHES "lld")
if (LINKER_NAME MATCHES "lld" AND OS_LINUX)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--gdb-index")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--gdb-index")
message (STATUS "Adding .gdb-index via --gdb-index linker option.")
@ -248,7 +248,7 @@ endif ()
# Create BuildID when using lld. For other linkers it is created by default.
# (NOTE: LINKER_NAME can be either path or name, and in different variants)
if (LINKER_NAME MATCHES "lld")
if (LINKER_NAME MATCHES "lld" AND OS_LINUX)
# SHA1 is not cryptographically secure but it is the best what lld is offering.
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--build-id=sha1")
endif ()

View File

@ -1,8 +1,10 @@
#if defined(OS_LINUX)
# include <sys/syscall.h>
#endif
#include <cstdlib>
#include <unistd.h>
#include <base/safeExit.h>
#include <base/defines.h> /// for THREAD_SANITIZER
[[noreturn]] void safeExit(int code)
{

View File

@ -8,6 +8,14 @@
#include <link.h> // ElfW
#include <errno.h>
#include "syscall.h"
#if defined(__has_feature)
#if __has_feature(memory_sanitizer)
#include <sanitizer/msan_interface.h>
#endif
#endif
#define ARRAY_SIZE(a) sizeof((a))/sizeof((a[0]))
/// Suppress TSan since it is possible for this code to be called from multiple threads,
@ -39,7 +47,9 @@ ssize_t __retry_read(int fd, void * buf, size_t count)
{
for (;;)
{
ssize_t ret = read(fd, buf, count);
// We cannot use the read syscall as it will be intercept by sanitizers, which aren't
// initialized yet. Emit syscall directly.
ssize_t ret = __syscall_ret(__syscall(SYS_read, fd, buf, count));
if (ret == -1)
{
if (errno == EINTR)
@ -90,6 +100,11 @@ static unsigned long NO_SANITIZE_THREAD __auxv_init_procfs(unsigned long type)
_Static_assert(sizeof(aux) < 4096, "Unexpected sizeof(aux)");
while (__retry_read(fd, &aux, sizeof(aux)) == sizeof(aux))
{
#if defined(__has_feature)
#if __has_feature(memory_sanitizer)
__msan_unpoison(&aux, sizeof(aux));
#endif
#endif
if (aux.a_type == AT_NULL)
{
break;

View File

@ -58,13 +58,19 @@ if (NOT LINKER_NAME)
find_program (LLD_PATH NAMES "ld.lld")
find_program (GOLD_PATH NAMES "ld.gold")
elseif (COMPILER_CLANG)
find_program (LLD_PATH NAMES "ld.lld-${COMPILER_VERSION_MAJOR}" "lld-${COMPILER_VERSION_MAJOR}" "ld.lld" "lld")
# llvm lld is a generic driver.
# Invoke ld.lld (Unix), ld64.lld (macOS), lld-link (Windows), wasm-ld (WebAssembly) instead
if (OS_LINUX)
find_program (LLD_PATH NAMES "ld.lld-${COMPILER_VERSION_MAJOR}" "ld.lld")
elseif (OS_DARWIN)
find_program (LLD_PATH NAMES "ld64.lld-${COMPILER_VERSION_MAJOR}" "ld64.lld")
endif ()
find_program (GOLD_PATH NAMES "ld.gold" "gold")
endif ()
endif()
if (OS_LINUX AND NOT LINKER_NAME)
# prefer lld linker over gold or ld on linux
if ((OS_LINUX OR OS_DARWIN) AND NOT LINKER_NAME)
# prefer lld linker over gold or ld on linux and macos
if (LLD_PATH)
if (COMPILER_GCC)
# GCC driver requires one of supported linker names like "lld".

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit 1be805e7cb2494aa8170015493474379b0362dfc
Subproject commit e4e746a24eb56861a86f3672771e3308d8c40722

View File

@ -1,7 +1,7 @@
# docker build -t clickhouse/style-test .
FROM ubuntu:20.04
ARG ACT_VERSION=0.2.25
ARG ACTIONLINT_VERSION=1.6.8
ARG ACT_VERSION=0.2.33
ARG ACTIONLINT_VERSION=1.6.22
# ARG for quick switch to a given ubuntu mirror
ARG apt_archive="http://archive.ubuntu.com"

View File

@ -86,7 +86,7 @@ node1 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY
``` text
┌─hosts─┬─groupArray(n)─┐
│ node1 │ [1,3,5,7,9] │
│ node3 │ [1,3,5,7,9] │
│ node2 │ [0,2,4,6,8] │
└───────┴───────────────┘
```

View File

@ -68,36 +68,57 @@ In the results of `SELECT` query, the values of `AggregateFunction` type have im
## Example of an Aggregated Materialized View {#example-of-an-aggregated-materialized-view}
`AggregatingMergeTree` materialized view that watches the `test.visits` table:
We will create the table `test.visits` that contain the raw data:
``` sql
CREATE MATERIALIZED VIEW test.basic
ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMM(StartDate) ORDER BY (CounterID, StartDate)
CREATE TABLE test.visits
(
StartDate DateTime64 NOT NULL,
CounterID UInt64,
Sign Nullable(Int32),
UserID Nullable(Int32)
) ENGINE = MergeTree ORDER BY (StartDate, CounterID);
```
`AggregatingMergeTree` materialized view that watches the `test.visits` table, and use the `AggregateFunction` type:
``` sql
CREATE MATERIALIZED VIEW test.mv_visits
(
StartDate DateTime64 NOT NULL,
CounterID UInt64,
Visits AggregateFunction(sum, Nullable(Int32)),
Users AggregateFunction(uniq, Nullable(Int32))
)
ENGINE = AggregatingMergeTree() ORDER BY (StartDate, CounterID)
AS SELECT
CounterID,
StartDate,
CounterID,
sumState(Sign) AS Visits,
uniqState(UserID) AS Users
FROM test.visits
GROUP BY CounterID, StartDate;
GROUP BY StartDate, CounterID;
```
Inserting data into the `test.visits` table.
``` sql
INSERT INTO test.visits ...
INSERT INTO test.visits (StartDate, CounterID, Sign, UserID)
VALUES (1667446031, 1, 3, 4)
INSERT INTO test.visits (StartDate, CounterID, Sign, UserID)
VALUES (1667446031, 1, 6, 3)
```
The data are inserted in both the table and view `test.basic` that will perform the aggregation.
The data are inserted in both the table and the materialized view `test.mv_visits`.
To get the aggregated data, we need to execute a query such as `SELECT ... GROUP BY ...` from the view `test.basic`:
To get the aggregated data, we need to execute a query such as `SELECT ... GROUP BY ...` from the materialized view `test.mv_visits`:
``` sql
SELECT
StartDate,
sumMerge(Visits) AS Visits,
uniqMerge(Users) AS Users
FROM test.basic
FROM test.mv_visits
GROUP BY StartDate
ORDER BY StartDate;
```

View File

@ -1,9 +1,5 @@
---
slug: /en/operations/troubleshooting
sidebar_position: 46
sidebar_label: Troubleshooting
title: Troubleshooting
---
[//]: # (This file is included in FAQ > Troubleshooting)
- [Installation](#troubleshooting-installation-errors)
- [Connecting to the server](#troubleshooting-accepts-no-connections)

View File

@ -126,7 +126,7 @@ clickhouse keeper --config /etc/your_path_to_config/config.xml
ClickHouse Keeper also provides 4lw commands which are almost the same with Zookeeper. Each command is composed of four letters such as `mntr`, `stat` etc. There are some more interesting commands: `stat` gives some general information about the server and connected clients, while `srvr` and `cons` give extended details on server and connections respectively.
The 4lw commands has a white list configuration `four_letter_word_white_list` which has default value `conf,cons,crst,envi,ruok,srst,srvr,stat,wchc,wchs,dirs,mntr,isro`.
The 4lw commands has a white list configuration `four_letter_word_white_list` which has default value `conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif`.
You can issue the commands to ClickHouse Keeper via telnet or nc, at the client port.
@ -309,6 +309,25 @@ Sessions with Ephemerals (1):
/clickhouse/task_queue/ddl
```
- `csnp`: Schedule a snapshot creation task. Return the last committed log index of the scheduled snapshot if success or `Failed to schedule snapshot creation task.` if failed. Note that `lgif` command can help you determine whether the snapshot is done.
```
100
```
- `lgif`: Keeper log information. `first_log_idx` : my first log index in log store; `first_log_term` : my first log term; `last_log_idx` : my last log index in log store; `last_log_term` : my last log term; `last_committed_log_idx` : my last committed log index in state machine; `leader_committed_log_idx` : leader's committed log index from my perspective; `target_committed_log_idx` : target log index should be committed to; `last_snapshot_idx` : the largest committed log index in last snapshot.
```
first_log_idx 1
first_log_term 1
last_log_idx 101
last_log_term 1
last_committed_log_idx 100
leader_committed_log_idx 101
target_committed_log_idx 101
last_snapshot_idx 50
```
## Migration from ZooKeeper {#migration-from-zookeeper}
Seamlessly migration from ZooKeeper to ClickHouse Keeper is impossible you have to stop your ZooKeeper cluster, convert data and start ClickHouse Keeper. `clickhouse-keeper-converter` tool allows converting ZooKeeper logs and snapshots to ClickHouse Keeper snapshot. It works only with ZooKeeper > 3.4. Steps for migration:

View File

@ -1,7 +1,8 @@
---
slug: /en/operations/system-tables/
sidebar_position: 52
sidebar_label: System Tables
sidebar_label: Overview
pagination_next: 'en/operations/system-tables/asynchronous_metric_log'
---
# System Tables
@ -72,4 +73,3 @@ If procfs is supported and enabled on the system, ClickHouse server collects the
- `OSReadBytes`
- `OSWriteBytes`
[Original article](https://clickhouse.com/docs/en/operations/system-tables/) <!--hide-->

View File

@ -24,6 +24,7 @@ Columns:
- `DOUBLE_SHA1_PASSWORD`
- `LDAP`
- `KERBEROS`
- `SSL_CERTIFICATE`
- `profiles` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — The list of profiles set for all roles and/or users.
- `roles` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — The list of roles to which the profile is applied.
- `settings` ([Array](../../sql-reference/data-types/array.md)([Tuple](../../sql-reference/data-types/tuple.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md), [String](../../sql-reference/data-types/string.md)))) — Settings that were changed when the client logged in/out.

View File

@ -12,7 +12,7 @@ Columns:
- `storage` ([String](../../sql-reference/data-types/string.md)) — Path to the storage of users. Configured in the `access_control_path` parameter.
- `auth_type` ([Enum8](../../sql-reference/data-types/enum.md)('no_password' = 0,'plaintext_password' = 1, 'sha256_password' = 2, 'double_sha1_password' = 3)) — Shows the authentication type. There are multiple ways of user identification: with no password, with plain text password, with [SHA256](https://ru.wikipedia.org/wiki/SHA-2)-encoded password or with [double SHA-1](https://ru.wikipedia.org/wiki/SHA-1)-encoded password.
- `auth_type` ([Enum8](../../sql-reference/data-types/enum.md)('no_password' = 0,'plaintext_password' = 1, 'sha256_password' = 2, 'double_sha1_password' = 3, 'ldap' = 4, 'kerberos' = 5, 'ssl_certificate' = 6)) — Shows the authentication type. There are multiple ways of user identification: with no password, with plain text password, with [SHA256](https://ru.wikipedia.org/wiki/SHA-2)-encoded password or with [double SHA-1](https://ru.wikipedia.org/wiki/SHA-1)-encoded password.
- `auth_params` ([String](../../sql-reference/data-types/string.md)) — Authentication parameters in the JSON format depending on the `auth_type`.

View File

@ -109,56 +109,38 @@ In the report you can find:
`clickhouse-benchmark` can compare performances for two running ClickHouse servers.
To use the comparison mode, specify endpoints of both servers by two pairs of `--host`, `--port` keys. Keys matched together by position in arguments list, the first `--host` is matched with the first `--port` and so on. `clickhouse-benchmark` establishes connections to both servers, then sends queries. Each query addressed to a randomly selected server. The results are shown for each server separately.
To use the comparison mode, specify endpoints of both servers by two pairs of `--host`, `--port` keys. Keys matched together by position in arguments list, the first `--host` is matched with the first `--port` and so on. `clickhouse-benchmark` establishes connections to both servers, then sends queries. Each query addressed to a randomly selected server. The results are shown in a table.
## Example {#clickhouse-benchmark-example}
``` bash
$ echo "SELECT * FROM system.numbers LIMIT 10000000 OFFSET 10000000" | clickhouse-benchmark -i 10
$ echo "SELECT * FROM system.numbers LIMIT 10000000 OFFSET 10000000" | clickhouse-benchmark --host=localhost --port=9001 --host=localhost --port=9000 -i 10
```
``` text
Loaded 1 queries.
Queries executed: 6.
Queries executed: 5.
localhost:9000, queries 6, QPS: 6.153, RPS: 123398340.957, MiB/s: 941.455, result RPS: 61532982.200, result MiB/s: 469.459.
localhost:9001, queries 2, QPS: 3.764, RPS: 75446929.370, MiB/s: 575.614, result RPS: 37639659.982, result MiB/s: 287.168.
localhost:9000, queries 3, QPS: 3.815, RPS: 76466659.385, MiB/s: 583.394, result RPS: 38148392.297, result MiB/s: 291.049.
0.000% 0.159 sec.
10.000% 0.159 sec.
20.000% 0.159 sec.
30.000% 0.160 sec.
40.000% 0.160 sec.
50.000% 0.162 sec.
60.000% 0.164 sec.
70.000% 0.165 sec.
80.000% 0.166 sec.
90.000% 0.166 sec.
95.000% 0.167 sec.
99.000% 0.167 sec.
99.900% 0.167 sec.
99.990% 0.167 sec.
0.000% 0.258 sec. 0.250 sec.
10.000% 0.258 sec. 0.250 sec.
20.000% 0.258 sec. 0.250 sec.
30.000% 0.258 sec. 0.267 sec.
40.000% 0.258 sec. 0.267 sec.
50.000% 0.273 sec. 0.267 sec.
60.000% 0.273 sec. 0.267 sec.
70.000% 0.273 sec. 0.267 sec.
80.000% 0.273 sec. 0.269 sec.
90.000% 0.273 sec. 0.269 sec.
95.000% 0.273 sec. 0.269 sec.
99.000% 0.273 sec. 0.269 sec.
99.900% 0.273 sec. 0.269 sec.
99.990% 0.273 sec. 0.269 sec.
Queries executed: 10.
localhost:9000, queries 10, QPS: 6.082, RPS: 121959604.568, MiB/s: 930.478, result RPS: 60815551.642, result MiB/s: 463.986.
0.000% 0.159 sec.
10.000% 0.159 sec.
20.000% 0.160 sec.
30.000% 0.163 sec.
40.000% 0.164 sec.
50.000% 0.165 sec.
60.000% 0.166 sec.
70.000% 0.166 sec.
80.000% 0.167 sec.
90.000% 0.167 sec.
95.000% 0.170 sec.
99.000% 0.172 sec.
99.900% 0.172 sec.
99.990% 0.172 sec.
No difference proven at 99.5% confidence
```
[Original article](https://clickhouse.com/docs/en/operations/utilities/clickhouse-benchmark.md) <!--hide-->

View File

@ -1,10 +1,11 @@
---
slug: /en/operations/utilities/
sidebar_position: 56
sidebar_label: Utilities
sidebar_label: Overview
pagination_next: 'en/operations/utilities/clickhouse-copier'
---
# ClickHouse Utility
# ClickHouse Utilities
- [clickhouse-local](../../operations/utilities/clickhouse-local.md) — Allows running SQL queries on data without starting the ClickHouse server, similar to how `awk` does this.
- [clickhouse-copier](../../operations/utilities/clickhouse-copier.md) — Copies (and reshards) data from one cluster to another cluster.

View File

@ -1150,3 +1150,13 @@ A text with tags .
The content within <b>CDATA</b>
Do Nothing for 2 Minutes 2:00 &nbsp;
```
## ascii(s) {#ascii}
Returns the ASCII code point of the first character of str. The result type is Int32.
If s is empty, the result is 0. If the first character is not an ASCII character or not part of the Latin-1 Supplement range of UTF-16, the result is undefined.

View File

@ -12,7 +12,7 @@ Syntax:
``` sql
ALTER USER [IF EXISTS] name1 [ON CLUSTER cluster_name1] [RENAME TO new_name1]
[, name2 [ON CLUSTER cluster_name2] [RENAME TO new_name2] ...]
[NOT IDENTIFIED | IDENTIFIED {[WITH {no_password | plaintext_password | sha256_password | sha256_hash | double_sha1_password | double_sha1_hash}] BY {'password' | 'hash'}} | {WITH ldap SERVER 'server_name'} | {WITH kerberos [REALM 'realm']}]
[NOT IDENTIFIED | IDENTIFIED {[WITH {no_password | plaintext_password | sha256_password | sha256_hash | double_sha1_password | double_sha1_hash}] BY {'password' | 'hash'}} | {WITH ldap SERVER 'server_name'} | {WITH kerberos [REALM 'realm']} | {WITH ssl_certificate CN 'common_name'}]
[[ADD | DROP] HOST {LOCAL | NAME 'name' | REGEXP 'name_regexp' | IP 'address' | LIKE 'pattern'} [,...] | ANY | NONE]
[DEFAULT ROLE role [,...] | ALL | ALL EXCEPT role [,...] ]
[GRANTEES {user | role | ANY | NONE} [,...] [EXCEPT {user | role} [,...]]]

View File

@ -8,7 +8,7 @@ title: "CHECK TABLE Statement"
Checks if the data in the table is corrupted.
``` sql
CHECK TABLE [db.]name
CHECK TABLE [db.]name [PARTITION partition_expr]
```
The `CHECK TABLE` query compares actual file sizes with the expected values which are stored on the server. If the file sizes do not match the stored values, it means the data is corrupted. This can be caused, for example, by a system crash during query execution.

View File

@ -12,7 +12,7 @@ Syntax:
``` sql
CREATE USER [IF NOT EXISTS | OR REPLACE] name1 [ON CLUSTER cluster_name1]
[, name2 [ON CLUSTER cluster_name2] ...]
[NOT IDENTIFIED | IDENTIFIED {[WITH {no_password | plaintext_password | sha256_password | sha256_hash | double_sha1_password | double_sha1_hash}] BY {'password' | 'hash'}} | {WITH ldap SERVER 'server_name'} | {WITH kerberos [REALM 'realm']}]
[NOT IDENTIFIED | IDENTIFIED {[WITH {no_password | plaintext_password | sha256_password | sha256_hash | double_sha1_password | double_sha1_hash}] BY {'password' | 'hash'}} | {WITH ldap SERVER 'server_name'} | {WITH kerberos [REALM 'realm']} | {WITH ssl_certificate CN 'common_name'}]
[HOST {LOCAL | NAME 'name' | REGEXP 'name_regexp' | IP 'address' | LIKE 'pattern'} [,...] | ANY | NONE]
[DEFAULT ROLE role [,...]]
[DEFAULT DATABASE database | NONE]
@ -34,6 +34,7 @@ There are multiple ways of user identification:
- `IDENTIFIED WITH double_sha1_hash BY 'hash'`
- `IDENTIFIED WITH ldap SERVER 'server_name'`
- `IDENTIFIED WITH kerberos` or `IDENTIFIED WITH kerberos REALM 'realm'`
- `IDENTIFIED WITH ssl_certificate CN 'mysite.com:user'`
For identification with sha256_hash using `SALT` - hash must be calculated from concatination of 'password' and 'salt'.

View File

@ -281,8 +281,8 @@ After running this statement the `[db.]replicated_merge_tree_family_table_name`
### RESTART REPLICA
Provides possibility to reinitialize Zookeeper sessions state for `ReplicatedMergeTree` table, will compare current state with Zookeeper as source of true and add tasks to Zookeeper queue if needed.
Initialization replication queue based on ZooKeeper date happens in the same way as `ATTACH TABLE` statement. For a short time the table will be unavailable for any operations.
Provides possibility to reinitialize Zookeeper session's state for `ReplicatedMergeTree` table, will compare current state with Zookeeper as source of truth and add tasks to Zookeeper queue if needed.
Initialization of replication queue based on ZooKeeper data happens in the same way as for `ATTACH TABLE` statement. For a short time, the table will be unavailable for any operations.
``` sql
SYSTEM RESTART REPLICA [db.]replicated_merge_tree_family_table_name

View File

@ -39,3 +39,7 @@ You cant use table functions if the [allow_ddl](../../operations/settings/per
| [s3](../../sql-reference/table-functions/s3.md) | Creates a [S3](../../engines/table-engines/integrations/s3.md)-engine table. |
| [sqlite](../../sql-reference/table-functions/sqlite.md) | Creates a [sqlite](../../engines/table-engines/integrations/sqlite.md)-engine table. |
:::note
Only these table functions are enabled in readonly mode :
null, view, viewIfPermitted, numbers, numbers_mt, generateRandom, values, cluster, clusterAllReplicas
:::

View File

@ -1,7 +1,6 @@
#include "ExternalDictionaryLibraryHandler.h"
#include <base/scope_guard.h>
#include <base/bit_cast.h>
#include <base/find_symbols.h>
#include <IO/ReadHelpers.h>
@ -113,7 +112,7 @@ Block ExternalDictionaryLibraryHandler::loadAll()
Block ExternalDictionaryLibraryHandler::loadIds(const std::vector<uint64_t> & ids)
{
const ExternalDictionaryLibraryAPI::VectorUInt64 ids_data{bit_cast<decltype(ExternalDictionaryLibraryAPI::VectorUInt64::data)>(ids.data()), ids.size()};
const ExternalDictionaryLibraryAPI::VectorUInt64 ids_data{std::bit_cast<decltype(ExternalDictionaryLibraryAPI::VectorUInt64::data)>(ids.data()), ids.size()};
auto columns_holder = std::make_unique<ExternalDictionaryLibraryAPI::CString[]>(attributes_names.size());
ExternalDictionaryLibraryAPI::CStrings columns_pass{static_cast<decltype(ExternalDictionaryLibraryAPI::CStrings::data)>(columns_holder.get()), attributes_names.size()};

View File

@ -2,7 +2,6 @@
#include <Common/StringUtils/StringUtils.h>
#include <Core/Block.h>
#include <base/bit_cast.h>
#include <base/range.h>
#include "ExternalDictionaryLibraryAPI.h"

View File

@ -32,7 +32,6 @@
#include <Core/Block.h>
#include <base/StringRef.h>
#include <Common/DateLUT.h>
#include <base/bit_cast.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromFile.h>
@ -278,9 +277,9 @@ Float transformFloatMantissa(Float x, UInt64 seed)
using UInt = std::conditional_t<std::is_same_v<Float, Float32>, UInt32, UInt64>;
constexpr size_t mantissa_num_bits = std::is_same_v<Float, Float32> ? 23 : 52;
UInt x_uint = bit_cast<UInt>(x);
UInt x_uint = std::bit_cast<UInt>(x);
x_uint = static_cast<UInt>(feistelNetwork(x_uint, mantissa_num_bits, seed));
return bit_cast<Float>(x_uint);
return std::bit_cast<Float>(x_uint);
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <base/types.h>
#include <base/bit_cast.h>
#include <base/sort.h>
#include <Common/HashTable/HashMap.h>
@ -104,13 +103,13 @@ private:
/// Take the most significant 16 bits of the floating point number.
BFloat16 toBFloat16(const Value & x) const
{
return bit_cast<UInt32>(static_cast<Float32>(x)) >> 16;
return std::bit_cast<UInt32>(static_cast<Float32>(x)) >> 16;
}
/// Put the bits into most significant 16 bits of the floating point number and fill other bits with zeros.
Float32 toFloat32(const BFloat16 & x) const
{
return bit_cast<Float32>(x << 16);
return std::bit_cast<Float32>(x << 16);
}
using Pair = PairNoInit<Float32, Weight>;

View File

@ -16,7 +16,7 @@ namespace DB
* Dependencies between passes must be avoided.
*/
class IQueryTreePass;
using QueryTreePassPtr = std::shared_ptr<IQueryTreePass>;
using QueryTreePassPtr = std::unique_ptr<IQueryTreePass>;
using QueryTreePasses = std::vector<QueryTreePassPtr>;
class IQueryTreePass

View File

@ -5527,9 +5527,15 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
* 3. Check that there are no columns that are not specified in GROUP BY keys.
* 4. Validate GROUP BY modifiers.
*/
auto join_tree_node_type = query_node_typed.getJoinTree()->getNodeType();
bool join_tree_is_subquery = join_tree_node_type == QueryTreeNodeType::QUERY || join_tree_node_type == QueryTreeNodeType::UNION;
if (!join_tree_is_subquery)
{
assertNoAggregateFunctionNodes(query_node_typed.getJoinTree(), "in JOIN TREE");
assertNoGroupingFunction(query_node_typed.getJoinTree(), "in JOIN TREE");
assertNoWindowFunctionNodes(query_node_typed.getJoinTree(), "in JOIN TREE");
}
if (query_node_typed.hasWhere())
{

View File

@ -18,6 +18,9 @@
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Common/Exception.h>
namespace DB
{
@ -25,6 +28,38 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
namespace
{
#ifndef NDEBUG
/** This visitor checks if Query Tree structure is valid after each pass
* in debug build.
*/
class ValidationChecker : public InDepthQueryTreeVisitor<ValidationChecker>
{
String pass_name;
public:
explicit ValidationChecker(String pass_name_)
: pass_name(std::move(pass_name_))
{}
void visitImpl(QueryTreeNodePtr & node) const
{
auto * column = node->as<ColumnNode>();
if (!column)
return;
if (column->getColumnSourceOrNull() == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Column {} {} query tree node does not have valid source node after running {} pass",
column->getColumnName(), column->getColumnType(), pass_name);
}
};
#endif
}
/** ClickHouse query tree pass manager.
@ -61,7 +96,12 @@ void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node)
size_t passes_size = passes.size();
for (size_t i = 0; i < passes_size; ++i)
{
passes[i]->run(query_tree_node, current_context);
#ifndef NDEBUG
ValidationChecker(passes[i]->getName()).visit(query_tree_node);
#endif
}
}
void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node, size_t up_to_pass_index)
@ -75,7 +115,12 @@ void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node, size_t up_to_pa
auto current_context = getContext();
for (size_t i = 0; i < up_to_pass_index; ++i)
{
passes[i]->run(query_tree_node, current_context);
#ifndef NDEBUG
ValidationChecker(passes[i]->getName()).visit(query_tree_node);
#endif
}
}
void QueryTreePassManager::dump(WriteBuffer & buffer)
@ -114,38 +159,38 @@ void addQueryTreePasses(QueryTreePassManager & manager)
auto context = manager.getContext();
const auto & settings = context->getSettingsRef();
manager.addPass(std::make_shared<QueryAnalysisPass>());
manager.addPass(std::make_unique<QueryAnalysisPass>());
if (settings.optimize_functions_to_subcolumns)
manager.addPass(std::make_shared<FunctionToSubcolumnsPass>());
manager.addPass(std::make_unique<FunctionToSubcolumnsPass>());
if (settings.count_distinct_optimization)
manager.addPass(std::make_shared<CountDistinctPass>());
manager.addPass(std::make_unique<CountDistinctPass>());
if (settings.optimize_rewrite_sum_if_to_count_if)
manager.addPass(std::make_shared<SumIfToCountIfPass>());
manager.addPass(std::make_unique<SumIfToCountIfPass>());
if (settings.optimize_normalize_count_variants)
manager.addPass(std::make_shared<NormalizeCountVariantsPass>());
manager.addPass(std::make_unique<NormalizeCountVariantsPass>());
manager.addPass(std::make_shared<CustomizeFunctionsPass>());
manager.addPass(std::make_unique<CustomizeFunctionsPass>());
if (settings.optimize_arithmetic_operations_in_aggregate_functions)
manager.addPass(std::make_shared<AggregateFunctionsArithmericOperationsPass>());
manager.addPass(std::make_unique<AggregateFunctionsArithmericOperationsPass>());
if (settings.optimize_injective_functions_inside_uniq)
manager.addPass(std::make_shared<UniqInjectiveFunctionsEliminationPass>());
manager.addPass(std::make_unique<UniqInjectiveFunctionsEliminationPass>());
if (settings.optimize_multiif_to_if)
manager.addPass(std::make_shared<MultiIfToIfPass>());
manager.addPass(std::make_unique<MultiIfToIfPass>());
manager.addPass(std::make_shared<IfConstantConditionPass>());
manager.addPass(std::make_unique<IfConstantConditionPass>());
if (settings.optimize_if_chain_to_multiif)
manager.addPass(std::make_shared<IfChainToMultiIfPass>());
manager.addPass(std::make_unique<IfChainToMultiIfPass>());
manager.addPass(std::make_shared<OrderByTupleEliminationPass>());
manager.addPass(std::make_shared<OrderByLimitByDuplicateEliminationPass>());
manager.addPass(std::make_unique<OrderByTupleEliminationPass>());
manager.addPass(std::make_unique<OrderByLimitByDuplicateEliminationPass>());
}
}

View File

@ -1247,7 +1247,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
bool have_data_in_stdin = !is_interactive && !stdin_is_a_tty && !std_in.eof();
if (need_render_progress && have_data_in_stdin)
if (need_render_progress)
{
/// Set total_bytes_to_read for current fd.
FileProgress file_progress(0, std_in.getFileSize());

View File

@ -1331,7 +1331,7 @@ public:
}
template <typename DateOrTime>
inline auto addQuarters(DateOrTime d, Int64 delta) const
inline auto NO_SANITIZE_UNDEFINED addQuarters(DateOrTime d, Int64 delta) const
{
return addMonths(d, delta * 3);
}

View File

@ -36,7 +36,7 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
}
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv";
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif";
KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
: server_id(NOT_EXIST)

View File

@ -136,6 +136,12 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
FourLetterCommandPtr api_version_command = std::make_shared<ApiVersionCommand>(keeper_dispatcher);
factory.registerCommand(api_version_command);
FourLetterCommandPtr create_snapshot_command = std::make_shared<CreateSnapshotCommand>(keeper_dispatcher);
factory.registerCommand(create_snapshot_command);
FourLetterCommandPtr log_info_command = std::make_shared<LogInfoCommand>(keeper_dispatcher);
factory.registerCommand(log_info_command);
factory.initializeAllowList(keeper_dispatcher);
factory.setInitialize(true);
}
@ -472,4 +478,33 @@ String ApiVersionCommand::run()
return toString(static_cast<uint8_t>(Coordination::current_keeper_api_version));
}
String CreateSnapshotCommand::run()
{
auto log_index = keeper_dispatcher.createSnapshot();
return log_index > 0 ? std::to_string(log_index) : "Failed to schedule snapshot creation task.";
}
String LogInfoCommand::run()
{
KeeperLogInfo log_info = keeper_dispatcher.getKeeperLogInfo();
StringBuffer ret;
auto append = [&ret] (String key, uint64_t value) -> void
{
writeText(key, ret);
writeText('\t', ret);
writeText(std::to_string(value), ret);
writeText('\n', ret);
};
append("first_log_idx", log_info.first_log_idx);
append("first_log_term", log_info.first_log_idx);
append("last_log_idx", log_info.last_log_idx);
append("last_log_term", log_info.last_log_term);
append("last_committed_log_idx", log_info.last_committed_log_idx);
append("leader_committed_log_idx", log_info.leader_committed_log_idx);
append("target_committed_log_idx", log_info.target_committed_log_idx);
append("last_snapshot_idx", log_info.last_snapshot_idx);
return ret.str();
}
}

View File

@ -17,6 +17,7 @@ using FourLetterCommandPtr = std::shared_ptr<DB::IFourLetterCommand>;
/// Just like zookeeper Four Letter Words commands, CH Keeper responds to a small set of commands.
/// Each command is composed of four letters, these commands are useful to monitor and issue system problems.
/// The feature is based on Zookeeper 3.5.9, details is in https://zookeeper.apache.org/doc/r3.5.9/zookeeperAdmin.html#sc_zkCommands.
/// Also we add some additional commands such as csnp, lgif etc.
struct IFourLetterCommand
{
public:
@ -327,4 +328,40 @@ struct ApiVersionCommand : public IFourLetterCommand
String run() override;
~ApiVersionCommand() override = default;
};
/// Create snapshot manually
struct CreateSnapshotCommand : public IFourLetterCommand
{
explicit CreateSnapshotCommand(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "csnp"; }
String run() override;
~CreateSnapshotCommand() override = default;
};
/** Raft log information:
* first_log_idx 1
* first_log_term 1
* last_log_idx 101
* last_log_term 1
* last_committed_idx 100
* leader_committed_log_idx 101
* target_committed_log_idx 101
* last_snapshot_idx 50
*/
struct LogInfoCommand : public IFourLetterCommand
{
explicit LogInfoCommand(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "lgif"; }
String run() override;
~LogInfoCommand() override = default;
};
}

View File

@ -47,4 +47,32 @@ struct Keeper4LWInfo
}
};
/// Keeper log information for 4lw commands
struct KeeperLogInfo
{
/// My first log index in log store.
uint64_t first_log_idx;
/// My first log term.
uint64_t first_log_term;
/// My last log index in log store.
uint64_t last_log_idx;
/// My last log term.
uint64_t last_log_term;
/// My last committed log index in state machine.
uint64_t last_committed_log_idx;
/// Leader's committed log index from my perspective.
uint64_t leader_committed_log_idx;
/// Target log index should be committed to.
uint64_t target_committed_log_idx;
/// The largest committed log index in last snapshot.
uint64_t last_snapshot_idx;
};
}

View File

@ -203,6 +203,18 @@ public:
{
keeper_stats.reset();
}
/// Create snapshot manually, return the last committed log index in the snapshot
uint64_t createSnapshot()
{
return server->createSnapshot();
}
/// Get Raft information
KeeperLogInfo getKeeperLogInfo()
{
return server->getKeeperLogInfo();
}
};
}

View File

@ -907,4 +907,29 @@ Keeper4LWInfo KeeperServer::getPartiallyFilled4LWInfo() const
return result;
}
uint64_t KeeperServer::createSnapshot()
{
uint64_t log_idx = raft_instance->create_snapshot();
if (log_idx != 0)
LOG_INFO(log, "Snapshot creation scheduled with last committed log index {}.", log_idx);
else
LOG_WARNING(log, "Failed to schedule snapshot creation task.");
return log_idx;
}
KeeperLogInfo KeeperServer::getKeeperLogInfo()
{
KeeperLogInfo log_info;
auto log_store = state_manager->load_log_store();
log_info.first_log_idx = log_store->start_index();
log_info.first_log_term = log_store->term_at(log_info.first_log_idx);
log_info.last_log_idx = raft_instance->get_last_log_idx();
log_info.last_log_term = raft_instance->get_last_log_term();
log_info.last_committed_log_idx = raft_instance->get_committed_log_idx();
log_info.leader_committed_log_idx = raft_instance->get_leader_committed_log_idx();
log_info.target_committed_log_idx = raft_instance->get_target_committed_log_idx();
log_info.last_snapshot_idx = raft_instance->get_last_snapshot_idx();
return log_info;
}
}

View File

@ -131,6 +131,10 @@ public:
/// Wait configuration update for action. Used by followers.
/// Return true if update was successfully received.
bool waitConfigurationUpdate(const ConfigUpdateAction & task);
uint64_t createSnapshot();
KeeperLogInfo getKeeperLogInfo();
};
}

View File

@ -411,6 +411,7 @@ inline bool isDecimal(const DataTypePtr & data_type) { return WhichDataType(data
inline bool isTuple(const DataTypePtr & data_type) { return WhichDataType(data_type).isTuple(); }
inline bool isArray(const DataTypePtr & data_type) { return WhichDataType(data_type).isArray(); }
inline bool isMap(const DataTypePtr & data_type) {return WhichDataType(data_type).isMap(); }
inline bool isInterval(const DataTypePtr & data_type) {return WhichDataType(data_type).isInterval(); }
inline bool isNothing(const DataTypePtr & data_type) { return WhichDataType(data_type).isNothing(); }
inline bool isUUID(const DataTypePtr & data_type) { return WhichDataType(data_type).isUUID(); }

View File

@ -24,7 +24,6 @@
#include <Common/quoteString.h>
#include <Common/setThreadName.h>
#include <base/sleep.h>
#include <base/bit_cast.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Parsers/CommonParsers.h>

View File

@ -11,6 +11,7 @@ enum class DataSourceType
Local,
RAM,
S3,
S3_Plain,
HDFS,
WebServer,
AzureBlobStorage,
@ -26,6 +27,8 @@ inline String toString(DataSourceType data_source_type)
return "memory";
case DataSourceType::S3:
return "s3";
case DataSourceType::S3_Plain:
return "s3_plain";
case DataSourceType::HDFS:
return "hdfs";
case DataSourceType::WebServer:

View File

@ -213,7 +213,9 @@ public:
template <class ...Args>
S3PlainObjectStorage(Args && ...args)
: S3ObjectStorage("S3PlainObjectStorage", std::forward<Args>(args)...)
{}
{
data_source_description.type = DataSourceType::S3_Plain;
}
};
}

View File

@ -22,6 +22,7 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeInterval.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
@ -642,7 +643,8 @@ class FunctionBinaryArithmetic : public IFunction
DataTypeInt8, DataTypeInt16, DataTypeInt32, DataTypeInt64, DataTypeInt128, DataTypeInt256,
DataTypeDecimal32, DataTypeDecimal64, DataTypeDecimal128, DataTypeDecimal256,
DataTypeDate, DataTypeDateTime,
DataTypeFixedString, DataTypeString>;
DataTypeFixedString, DataTypeString,
DataTypeInterval>;
using Floats = TypeList<DataTypeFloat32, DataTypeFloat64>;
@ -717,6 +719,82 @@ class FunctionBinaryArithmetic : public IFunction
return FunctionFactory::instance().get(function_name, context);
}
static FunctionOverloadResolverPtr
getFunctionForDateTupleOfIntervalsArithmetic(const DataTypePtr & type0, const DataTypePtr & type1, ContextPtr context)
{
bool first_is_date_or_datetime = isDateOrDate32(type0) || isDateTime(type0) || isDateTime64(type0);
bool second_is_date_or_datetime = isDateOrDate32(type1) || isDateTime(type1) || isDateTime64(type1);
/// Exactly one argument must be Date or DateTime
if (first_is_date_or_datetime == second_is_date_or_datetime)
return {};
if (!isTuple(type0) && !isTuple(type1))
return {};
/// Special case when the function is plus or minus, one of arguments is Date/DateTime and another is Tuple.
/// We construct another function and call it.
if constexpr (!is_plus && !is_minus)
return {};
if (isTuple(type0) && second_is_date_or_datetime && is_minus)
throw Exception("Wrong order of arguments for function " + String(name) + ": argument of Tuple type cannot be first",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
std::string function_name;
if (is_plus)
{
function_name = "addTupleOfIntervals";
}
else
{
function_name = "subtractTupleOfIntervals";
}
return FunctionFactory::instance().get(function_name, context);
}
static FunctionOverloadResolverPtr
getFunctionForMergeIntervalsArithmetic(const DataTypePtr & type0, const DataTypePtr & type1, ContextPtr context)
{
/// Special case when the function is plus or minus, first argument is Interval or Tuple of Intervals
/// and the second argument is the Interval of a different kind.
/// We construct another function (example: addIntervals) and call it
if constexpr (!is_plus && !is_minus)
return {};
const auto * tuple_data_type_0 = checkAndGetDataType<DataTypeTuple>(type0.get());
const auto * interval_data_type_0 = checkAndGetDataType<DataTypeInterval>(type0.get());
const auto * interval_data_type_1 = checkAndGetDataType<DataTypeInterval>(type1.get());
if ((!tuple_data_type_0 && !interval_data_type_0) || !interval_data_type_1)
return {};
if (interval_data_type_0 && interval_data_type_0->equals(*interval_data_type_1))
return {};
if (tuple_data_type_0)
{
auto & tuple_types = tuple_data_type_0->getElements();
for (auto & type : tuple_types)
if (!isInterval(type))
return {};
}
std::string function_name;
if (is_plus)
{
function_name = "addInterval";
}
else
{
function_name = "subtractInterval";
}
return FunctionFactory::instance().get(function_name, context);
}
static FunctionOverloadResolverPtr
getFunctionForTupleArithmetic(const DataTypePtr & type0, const DataTypePtr & type1, ContextPtr context)
{
@ -915,6 +993,30 @@ class FunctionBinaryArithmetic : public IFunction
return function->execute(new_arguments, result_type, input_rows_count);
}
ColumnPtr executeDateTimeTupleOfIntervalsPlusMinus(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type,
size_t input_rows_count, const FunctionOverloadResolverPtr & function_builder) const
{
ColumnsWithTypeAndName new_arguments = arguments;
/// Tuple argument must be second.
if (isTuple(arguments[0].type))
std::swap(new_arguments[0], new_arguments[1]);
auto function = function_builder->build(new_arguments);
return function->execute(new_arguments, result_type, input_rows_count);
}
ColumnPtr executeIntervalTupleOfIntervalsPlusMinus(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type,
size_t input_rows_count, const FunctionOverloadResolverPtr & function_builder) const
{
ColumnsWithTypeAndName new_arguments = arguments;
auto function = function_builder->build(new_arguments);
return function->execute(new_arguments, result_type, input_rows_count);
}
ColumnPtr executeTupleNumberOperator(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type,
size_t input_rows_count, const FunctionOverloadResolverPtr & function_builder) const
{
@ -1134,6 +1236,34 @@ public:
return function->getResultType();
}
/// Special case when the function is plus or minus, one of arguments is Date/DateTime and another is Tuple.
if (auto function_builder = getFunctionForDateTupleOfIntervalsArithmetic(arguments[0], arguments[1], context))
{
ColumnsWithTypeAndName new_arguments(2);
for (size_t i = 0; i < 2; ++i)
new_arguments[i].type = arguments[i];
/// Tuple argument must be second.
if (isTuple(new_arguments[0].type))
std::swap(new_arguments[0], new_arguments[1]);
auto function = function_builder->build(new_arguments);
return function->getResultType();
}
/// Special case when the function is plus or minus, one of arguments is Interval/Tuple of Intervals and another is Interval.
if (auto function_builder = getFunctionForMergeIntervalsArithmetic(arguments[0], arguments[1], context))
{
ColumnsWithTypeAndName new_arguments(2);
for (size_t i = 0; i < 2; ++i)
new_arguments[i].type = arguments[i];
auto function = function_builder->build(new_arguments);
return function->getResultType();
}
/// Special case when the function is multiply or divide, one of arguments is Tuple and another is Number.
if (auto function_builder = getFunctionForTupleAndNumberArithmetic(arguments[0], arguments[1], context))
{
@ -1185,6 +1315,21 @@ public:
type_res = std::make_shared<DataTypeString>();
return true;
}
else if constexpr (std::is_same_v<LeftDataType, DataTypeInterval> || std::is_same_v<RightDataType, DataTypeInterval>)
{
if constexpr (std::is_same_v<LeftDataType, DataTypeInterval> &&
std::is_same_v<RightDataType, DataTypeInterval>)
{
if constexpr (is_plus || is_minus)
{
if (left.getKind() == right.getKind())
{
type_res = std::make_shared<LeftDataType>(left.getKind());
return true;
}
}
}
}
else
{
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
@ -1566,6 +1711,18 @@ public:
return executeDateTimeIntervalPlusMinus(arguments, result_type, input_rows_count, function_builder);
}
/// Special case when the function is plus or minus, one of arguments is Date/DateTime and another is Tuple.
if (auto function_builder = getFunctionForDateTupleOfIntervalsArithmetic(arguments[0].type, arguments[1].type, context))
{
return executeDateTimeTupleOfIntervalsPlusMinus(arguments, result_type, input_rows_count, function_builder);
}
/// Special case when the function is plus or minus, one of arguments is Interval/Tuple of Intervals and another is Interval.
if (auto function_builder = getFunctionForMergeIntervalsArithmetic(arguments[0].type, arguments[1].type, context))
{
return executeIntervalTupleOfIntervalsPlusMinus(arguments, result_type, input_rows_count, function_builder);
}
/// Special case when the function is plus, minus or multiply, both arguments are tuples.
if (auto function_builder = getFunctionForTupleArithmetic(arguments[0].type, arguments[1].type, context))
{

View File

@ -3,6 +3,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeInterval.h>
#include <DataTypes/Native.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnDecimal.h>
@ -145,7 +146,8 @@ class FunctionUnaryArithmetic : public IFunction
DataTypeDecimal<Decimal64>,
DataTypeDecimal<Decimal128>,
DataTypeDecimal<Decimal256>,
DataTypeFixedString
DataTypeFixedString,
DataTypeInterval
>(type, std::forward<F>(f));
}
@ -211,6 +213,12 @@ public:
return false;
result = std::make_shared<DataType>(type.getN());
}
else if constexpr (std::is_same_v<DataTypeInterval, DataType>)
{
if constexpr (!IsUnaryOperation<Op>::negate)
return false;
result = std::make_shared<DataTypeInterval>(type.getKind());
}
else
{
using T0 = typename DataType::FieldType;

View File

@ -16,7 +16,6 @@
#include <cmath>
#include <type_traits>
#include <array>
#include <base/bit_cast.h>
#include <base/sort.h>
#include <algorithm>

View File

@ -91,23 +91,30 @@ public:
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.size() < 2 || 3 < arguments.size())
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 2 or 3.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be 2 or 3",
getName(), arguments.size());
if (!isStringOrFixedString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of function {}",
arguments[0]->getName(), getName());
if (!isString(arguments[1]))
throw Exception(
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of function {}",
arguments[1]->getName(), getName());
if (arguments.size() >= 3)
{
if (!isUnsignedInteger(arguments[2]))
throw Exception(
"Illegal type " + arguments[2]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of function {}",
arguments[2]->getName(), getName());
}
return std::make_shared<DataTypeNumber<typename Impl::ResultType>>();
@ -196,9 +203,11 @@ public:
vec_res);
else
throw Exception(
"Illegal columns " + arguments[0].column->getName() + " and "
+ arguments[1].column->getName() + " of arguments of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
ErrorCodes::ILLEGAL_COLUMN,
"Illegal columns {} and {} of arguments of function {}",
arguments[0].column->getName(),
arguments[1].column->getName(),
getName());
return col_res;
}

View File

@ -25,7 +25,7 @@ namespace impl
/// Is the [I]LIKE expression reduced to finding a substring in a string?
inline bool likePatternIsSubstring(std::string_view pattern, String & res)
{
if (pattern.size() < 2 || pattern.front() != '%' || pattern.back() != '%')
if (pattern.size() < 2 || !pattern.starts_with('%') || !pattern.ends_with('%'))
return false;
res.clear();
@ -101,9 +101,7 @@ struct MatchImpl
static constexpr bool case_insensitive = (case_ == MatchTraits::Case::Insensitive);
static constexpr bool negate = (result_ == MatchTraits::Result::Negate);
using Searcher = std::conditional_t<case_insensitive,
VolnitskyCaseInsensitiveUTF8,
VolnitskyUTF8>;
using Searcher = std::conditional_t<case_insensitive, VolnitskyCaseInsensitiveUTF8, VolnitskyUTF8>;
static void vectorConstant(
const ColumnString::Chars & haystack_data,
@ -115,13 +113,12 @@ struct MatchImpl
const size_t haystack_size = haystack_offsets.size();
assert(haystack_size == res.size());
assert(start_pos_ == nullptr);
if (haystack_offsets.empty())
return;
/// A simple case where the [I]LIKE expression reduces to finding a substring in a string
/// Special case that the [I]LIKE expression reduces to finding a substring in a string
String strstr_pattern;
if (is_like && impl::likePatternIsSubstring(needle, strstr_pattern))
{
@ -158,9 +155,10 @@ struct MatchImpl
/// Tail, in which there can be no substring.
if (i < res.size())
memset(&res[i], negate, (res.size() - i) * sizeof(res[0]));
return;
}
else
{
const auto & regexp = Regexps::Regexp(Regexps::createRegexp<is_like, /*no_capture*/ true, case_insensitive>(needle));
String required_substring;
@ -172,10 +170,7 @@ struct MatchImpl
if (required_substring.empty())
{
if (!regexp.getRE2()) /// An empty regexp. Always matches.
{
if (haystack_size)
memset(res.data(), !negate, haystack_size * sizeof(res[0]));
}
else
{
size_t prev_offset = 0;
@ -221,7 +216,6 @@ struct MatchImpl
if (pos + required_substring.size() < begin + haystack_offsets[i])
{
/// And if it does not, if necessary, we check the regexp.
if (is_trivial)
res[i] = !negate;
else
@ -258,7 +252,6 @@ struct MatchImpl
memset(&res[i], negate, (res.size() - i) * sizeof(res[0]));
}
}
}
/// Very carefully crafted copy-paste.
static void vectorFixedConstant(
@ -274,7 +267,7 @@ struct MatchImpl
if (haystack.empty())
return;
/// A simple case where the LIKE expression reduces to finding a substring in a string
/// Special case that the [I]LIKE expression reduces to finding a substring in a string
String strstr_pattern;
if (is_like && impl::likePatternIsSubstring(needle, strstr_pattern))
{
@ -316,9 +309,10 @@ struct MatchImpl
/// Tail, in which there can be no substring.
if (i < res.size())
memset(&res[i], negate, (res.size() - i) * sizeof(res[0]));
return;
}
else
{
const auto & regexp = Regexps::Regexp(Regexps::createRegexp<is_like, /*no_capture*/ true, case_insensitive>(needle));
String required_substring;
@ -330,10 +324,7 @@ struct MatchImpl
if (required_substring.empty())
{
if (!regexp.getRE2()) /// An empty regexp. Always matches.
{
if (haystack_size)
memset(res.data(), !negate, haystack_size * sizeof(res[0]));
}
else
{
size_t offset = 0;
@ -383,7 +374,6 @@ struct MatchImpl
if (pos + required_substring.size() <= next_pos)
{
/// And if it does not, if necessary, we check the regexp.
if (is_trivial)
res[i] = !negate;
else
@ -420,7 +410,6 @@ struct MatchImpl
memset(&res[i], negate, (res.size() - i) * sizeof(res[0]));
}
}
}
static void vectorVector(
const ColumnString::Chars & haystack_data,
@ -434,7 +423,6 @@ struct MatchImpl
assert(haystack_size == needle_offset.size());
assert(haystack_size == res.size());
assert(start_pos_ == nullptr);
if (haystack_offsets.empty())
@ -481,9 +469,7 @@ struct MatchImpl
if (required_substr.empty())
{
if (!regexp->getRE2()) /// An empty regexp. Always matches.
{
res[i] = !negate;
}
else
{
const bool match = regexp->getRE2()->Match(
@ -502,15 +488,11 @@ struct MatchImpl
const auto * match = searcher.search(cur_haystack_data, cur_haystack_length);
if (match == cur_haystack_data + cur_haystack_length)
{
res[i] = negate; // no match
}
else
{
if (is_trivial)
{
res[i] = !negate; // no wildcards in pattern
}
else
{
const size_t start_pos = (required_substring_is_prefix) ? (match - cur_haystack_data) : 0;
@ -546,7 +528,6 @@ struct MatchImpl
assert(haystack_size == needle_offset.size());
assert(haystack_size == res.size());
assert(start_pos_ == nullptr);
if (haystack.empty())
@ -593,9 +574,7 @@ struct MatchImpl
if (required_substr.empty())
{
if (!regexp->getRE2()) /// An empty regexp. Always matches.
{
res[i] = !negate;
}
else
{
const bool match = regexp->getRE2()->Match(
@ -614,15 +593,11 @@ struct MatchImpl
const auto * match = searcher.search(cur_haystack_data, cur_haystack_length);
if (match == cur_haystack_data + cur_haystack_length)
{
res[i] = negate; // no match
}
else
{
if (is_trivial)
{
res[i] = !negate; // no wildcards in pattern
}
else
{
const size_t start_pos = (required_substring_is_prefix) ? (match - cur_haystack_data) : 0;

86
src/Functions/ascii.cpp Normal file
View File

@ -0,0 +1,86 @@
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringOrArrayToT.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NOT_IMPLEMENTED;
}
struct AsciiName
{
static constexpr auto name = "ascii";
};
struct AsciiImpl
{
static constexpr auto is_fixed_to_constant = false;
using ReturnType = Int32;
static void vector(const ColumnString::Chars & data, const ColumnString::Offsets & offsets, PaddedPODArray<ReturnType> & res)
{
size_t size = offsets.size();
ColumnString::Offset prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
res[i] = doAscii(data, prev_offset, offsets[i] - prev_offset - 1);
prev_offset = offsets[i];
}
}
[[noreturn]] static void vectorFixedToConstant(const ColumnString::Chars & /*data*/, size_t /*n*/, Int32 & /*res*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "vectorFixedToConstant not implemented for function {}", AsciiName::name);
}
static void vectorFixedToVector(const ColumnString::Chars & data, size_t n, PaddedPODArray<ReturnType> & res)
{
size_t size = data.size() / n;
for (size_t i = 0; i < size; ++i)
{
res[i] = doAscii(data, i * n, n);
}
}
[[noreturn]] static void array(const ColumnString::Offsets & /*offsets*/, PaddedPODArray<ReturnType> & /*res*/)
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Cannot apply function {} to Array argument", AsciiName::name);
}
[[noreturn]] static void uuid(const ColumnUUID::Container & /*offsets*/, size_t /*n*/, PaddedPODArray<ReturnType> & /*res*/)
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Cannot apply function {} to UUID argument", AsciiName::name);
}
private:
static Int32 doAscii(const ColumnString::Chars & buf, size_t offset, size_t size)
{
return size ? static_cast<ReturnType>(buf[offset]) : 0;
}
};
using FunctionAscii = FunctionStringOrArrayToT<AsciiImpl, AsciiName, AsciiImpl::ReturnType>;
REGISTER_FUNCTION(Ascii)
{
factory.registerFunction<FunctionAscii>(
{
R"(
Returns the ASCII code point of the first character of str. The result type is Int32.
If s is empty, the result is 0. If the first character is not an ASCII character or not part of the Latin-1 Supplement range of UTF-16, the result is undefined)
)",
Documentation::Examples{{"ascii", "SELECT ascii('234')"}},
Documentation::Categories{"String"}
}, FunctionFactory::CaseInsensitive);
}
}

View File

@ -1,6 +1,5 @@
#include <Functions/FunctionNumericPredicate.h>
#include <Functions/FunctionFactory.h>
#include <base/bit_cast.h>
#include <type_traits>
@ -20,11 +19,11 @@ struct IsFiniteImpl
static bool execute(const T t)
{
if constexpr (std::is_same_v<T, float>)
return (bit_cast<uint32_t>(t)
return (std::bit_cast<uint32_t>(t)
& 0b01111111100000000000000000000000)
!= 0b01111111100000000000000000000000;
else if constexpr (std::is_same_v<T, double>)
return (bit_cast<uint64_t>(t)
return (std::bit_cast<uint64_t>(t)
& 0b0111111111110000000000000000000000000000000000000000000000000000)
!= 0b0111111111110000000000000000000000000000000000000000000000000000;
else

View File

@ -1,6 +1,5 @@
#include <Functions/FunctionNumericPredicate.h>
#include <Functions/FunctionFactory.h>
#include <base/bit_cast.h>
#include <type_traits>
@ -16,11 +15,11 @@ struct IsInfiniteImpl
static bool execute(const T t)
{
if constexpr (std::is_same_v<T, float>)
return (bit_cast<uint32_t>(t)
return (std::bit_cast<uint32_t>(t)
& 0b01111111111111111111111111111111)
== 0b01111111100000000000000000000000;
else if constexpr (std::is_same_v<T, double>)
return (bit_cast<uint64_t>(t)
return (std::bit_cast<uint64_t>(t)
& 0b0111111111111111111111111111111111111111111111111111111111111111)
== 0b0111111111110000000000000000000000000000000000000000000000000000;
else

View File

@ -12,7 +12,8 @@ struct NameNotLike
static constexpr auto name = "notLike";
};
using FunctionNotLike = FunctionsStringSearch<MatchImpl<NameNotLike, MatchTraits::Syntax::Like, MatchTraits::Case::Sensitive, MatchTraits::Result::Negate>>;
using NotLikeImpl = MatchImpl<NameNotLike, MatchTraits::Syntax::Like, MatchTraits::Case::Sensitive, MatchTraits::Result::Negate>;
using FunctionNotLike = FunctionsStringSearch<NotLikeImpl>;
}

View File

@ -5,7 +5,6 @@
#include <Functions/GatherUtils/Algorithms.h>
#include <Functions/GatherUtils/Sinks.h>
#include <Functions/GatherUtils/Sources.h>
#include <base/bit_cast.h>
namespace DB
{
@ -59,10 +58,10 @@ namespace
{
if (num_chars <= step)
{
writeSlice(StringSource::Slice{bit_cast<const UInt8 *>(pad_string.data()), numCharsToNumBytes(num_chars)}, res_sink);
writeSlice(StringSource::Slice{std::bit_cast<const UInt8 *>(pad_string.data()), numCharsToNumBytes(num_chars)}, res_sink);
break;
}
writeSlice(StringSource::Slice{bit_cast<const UInt8 *>(pad_string.data()), numCharsToNumBytes(step)}, res_sink);
writeSlice(StringSource::Slice{std::bit_cast<const UInt8 *>(pad_string.data()), numCharsToNumBytes(step)}, res_sink);
num_chars -= step;
}
}

View File

@ -1,4 +1,6 @@
#include <Functions/FunctionConstantBase.h>
#include <base/getFQDNOrHostName.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeUUID.h>
@ -115,6 +117,13 @@ namespace
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionGetOSKernelVersion>(context); }
};
class FunctionDisplayName : public FunctionConstantBase<FunctionDisplayName, String, DataTypeString>
{
public:
static constexpr auto name = "displayName";
explicit FunctionDisplayName(ContextPtr context) : FunctionConstantBase(context->getConfigRef().getString("display_name", getFQDNOrHostName()), context->isDistributed()) {}
static FunctionPtr create(ContextPtr context) {return std::make_shared<FunctionDisplayName>(context); }
};
}
#if defined(__ELF__) && !defined(OS_FREEBSD)
@ -173,4 +182,20 @@ REGISTER_FUNCTION(GetOSKernelVersion)
}
REGISTER_FUNCTION(DisplayName)
{
factory.registerFunction<FunctionDisplayName>(
{
R"(
Returns the value of `display_name` from config or server FQDN if not set.
[example:displayName]
)",
Documentation::Examples{{"displayName", "SELECT displayName();"}},
Documentation::Categories{"Constant", "Miscellaneous"}
},
FunctionFactory::CaseSensitive);
}
}

View File

@ -1,5 +1,6 @@
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeInterval.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNothing.h>
@ -415,6 +416,274 @@ public:
}
};
template <typename Impl>
class FunctionDateOrDateTimeOperationTupleOfIntervals : public ITupleFunction
{
public:
static constexpr auto name = Impl::name;
explicit FunctionDateOrDateTimeOperationTupleOfIntervals(ContextPtr context_) : ITupleFunction(context_) {}
static FunctionPtr create(ContextPtr context_)
{
return std::make_shared<FunctionDateOrDateTimeOperationTupleOfIntervals>(context_);
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (!isDateOrDate32(arguments[0].type) && !isDateTime(arguments[0].type) && !isDateTime64(arguments[0].type))
throw Exception{ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of first argument of function {}. Should be a date or a date with time",
arguments[0].type->getName(), getName()};
const auto * cur_tuple = checkAndGetDataType<DataTypeTuple>(arguments[1].type.get());
if (!cur_tuple)
throw Exception{ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of second argument of function {}. Should be a tuple",
arguments[0].type->getName(), getName()};
const auto & cur_types = cur_tuple->getElements();
Columns cur_elements;
if (arguments[1].column)
cur_elements = getTupleElements(*arguments[1].column);
size_t tuple_size = cur_types.size();
if (tuple_size == 0)
return arguments[0].type;
auto plus = FunctionFactory::instance().get(Impl::func_name, context);
DataTypePtr res_type = arguments[0].type;
for (size_t i = 0; i < tuple_size; ++i)
{
try
{
ColumnWithTypeAndName left{res_type, {}};
ColumnWithTypeAndName right{cur_elements.empty() ? nullptr : cur_elements[i], cur_types[i], {}};
auto plus_elem = plus->build({left, right});
res_type = plus_elem->getResultType();
}
catch (DB::Exception & e)
{
e.addMessage("While executing function {} for tuple element {}", getName(), i);
throw;
}
}
return res_type;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * cur_tuple = checkAndGetDataType<DataTypeTuple>(arguments[1].type.get());
const auto & cur_types = cur_tuple->getElements();
auto cur_elements = getTupleElements(*arguments[1].column);
size_t tuple_size = cur_elements.size();
if (tuple_size == 0)
return arguments[0].column;
auto plus = FunctionFactory::instance().get(Impl::func_name, context);
ColumnWithTypeAndName res;
for (size_t i = 0; i < tuple_size; ++i)
{
ColumnWithTypeAndName column{cur_elements[i], cur_types[i], {}};
auto elem_plus = plus->build(ColumnsWithTypeAndName{i == 0 ? arguments[0] : res, column});
auto res_type = elem_plus->getResultType();
res.column = elem_plus->execute({i == 0 ? arguments[0] : res, column}, res_type, input_rows_count);
res.type = res_type;
}
return res.column;
}
};
struct AddTupleOfIntervalsImpl
{
static constexpr auto name = "addTupleOfIntervals";
static constexpr auto func_name = "plus";
};
struct SubtractTupleOfIntervalsImpl
{
static constexpr auto name = "subtractTupleOfIntervals";
static constexpr auto func_name = "minus";
};
using FunctionAddTupleOfIntervals = FunctionDateOrDateTimeOperationTupleOfIntervals<AddTupleOfIntervalsImpl>;
using FunctionSubtractTupleOfIntervals = FunctionDateOrDateTimeOperationTupleOfIntervals<SubtractTupleOfIntervalsImpl>;
template <bool is_minus>
struct FunctionTupleOperationInterval : public ITupleFunction
{
public:
static constexpr auto name = is_minus ? "subtractInterval" : "addInterval";
explicit FunctionTupleOperationInterval(ContextPtr context_) : ITupleFunction(context_) {}
static FunctionPtr create(ContextPtr context_)
{
return std::make_shared<FunctionTupleOperationInterval>(context_);
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isTuple(arguments[0]) && !isInterval(arguments[0]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of first argument of function {}, must be Tuple or Interval",
arguments[0]->getName(), getName());
if (!isInterval(arguments[1]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of second argument of function {}, must be Interval",
arguments[1]->getName(), getName());
DataTypes types;
const auto * tuple = checkAndGetDataType<DataTypeTuple>(arguments[0].get());
if (tuple)
{
const auto & cur_types = tuple->getElements();
for (const auto & type : cur_types)
if (!isInterval(type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of Tuple element of first argument of function {}, must be Interval",
type->getName(), getName());
types = cur_types;
}
else
{
types = {arguments[0]};
}
const auto * interval_last = checkAndGetDataType<DataTypeInterval>(types.back().get());
const auto * interval_new = checkAndGetDataType<DataTypeInterval>(arguments[1].get());
if (!interval_last->equals(*interval_new))
types.push_back(arguments[1]);
return std::make_shared<DataTypeTuple>(types);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
if (!isInterval(arguments[1].type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of second argument of function {}, must be Interval",
arguments[1].type->getName(), getName());
Columns tuple_columns;
const auto * first_tuple = checkAndGetDataType<DataTypeTuple>(arguments[0].type.get());
const auto * first_interval = checkAndGetDataType<DataTypeInterval>(arguments[0].type.get());
const auto * second_interval = checkAndGetDataType<DataTypeInterval>(arguments[1].type.get());
bool can_be_merged;
if (first_interval)
{
can_be_merged = first_interval->equals(*second_interval);
if (can_be_merged)
tuple_columns.resize(1);
else
tuple_columns.resize(2);
tuple_columns[0] = arguments[0].column->convertToFullColumnIfConst();
}
else if (first_tuple)
{
const auto & cur_types = first_tuple->getElements();
for (const auto & type : cur_types)
if (!isInterval(type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of Tuple element of first argument of function {}, must be Interval",
type->getName(), getName());
auto cur_elements = getTupleElements(*arguments[0].column);
size_t tuple_size = cur_elements.size();
if (tuple_size == 0)
{
can_be_merged = false;
}
else
{
const auto * tuple_last_interval = checkAndGetDataType<DataTypeInterval>(cur_types.back().get());
can_be_merged = tuple_last_interval->equals(*second_interval);
}
if (can_be_merged)
tuple_columns.resize(tuple_size);
else
tuple_columns.resize(tuple_size + 1);
for (size_t i = 0; i < tuple_size; ++i)
tuple_columns[i] = cur_elements[i];
}
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of first argument of function {}, must be Tuple or Interval",
arguments[0].type->getName(), getName());
ColumnPtr & last_column = tuple_columns.back();
if (can_be_merged)
{
ColumnWithTypeAndName left{last_column, arguments[1].type, {}};
if constexpr (is_minus)
{
auto minus = FunctionFactory::instance().get("minus", context);
auto elem_minus = minus->build({left, arguments[1]});
last_column = elem_minus->execute({left, arguments[1]}, arguments[1].type, input_rows_count)
->convertToFullColumnIfConst();
}
else
{
auto plus = FunctionFactory::instance().get("plus", context);
auto elem_plus = plus->build({left, arguments[1]});
last_column = elem_plus->execute({left, arguments[1]}, arguments[1].type, input_rows_count)
->convertToFullColumnIfConst();
}
}
else
{
if constexpr (is_minus)
{
auto negate = FunctionFactory::instance().get("negate", context);
auto elem_negate = negate->build({arguments[1]});
last_column = elem_negate->execute({arguments[1]}, arguments[1].type, input_rows_count);
}
else
{
last_column = arguments[1].column;
}
}
return ColumnTuple::create(tuple_columns);
}
};
using FunctionTupleAddInterval = FunctionTupleOperationInterval<false>;
using FunctionTupleSubtractInterval = FunctionTupleOperationInterval<true>;
/// this is for convenient usage in LNormalize
template <class FuncLabel>
class FunctionLNorm : public ITupleFunction {};
@ -1282,6 +1551,65 @@ REGISTER_FUNCTION(VectorFunctions)
factory.registerFunction<FunctionTupleDivide>();
factory.registerFunction<FunctionTupleNegate>();
factory.registerFunction<FunctionAddTupleOfIntervals>(
{
R"(
Consecutively adds a tuple of intervals to a Date or a DateTime.
[example:tuple]
)",
Documentation::Examples{
{"tuple", "WITH toDate('2018-01-01') AS date SELECT addTupleOfIntervals(date, (INTERVAL 1 DAY, INTERVAL 1 YEAR))"},
},
Documentation::Categories{"Tuple", "Interval", "Date", "DateTime"}
});
factory.registerFunction<FunctionSubtractTupleOfIntervals>(
{
R"(
Consecutively subtracts a tuple of intervals from a Date or a DateTime.
[example:tuple]
)",
Documentation::Examples{
{"tuple", "WITH toDate('2018-01-01') AS date SELECT subtractTupleOfIntervals(date, (INTERVAL 1 DAY, INTERVAL 1 YEAR))"},
},
Documentation::Categories{"Tuple", "Interval", "Date", "DateTime"}
});
factory.registerFunction<FunctionTupleAddInterval>(
{
R"(
Adds an interval to another interval or tuple of intervals. The returned value is tuple of intervals.
[example:tuple]
[example:interval1]
If the types of the first interval (or the interval in the tuple) and the second interval are the same they will be merged into one interval.
[example:interval2]
)",
Documentation::Examples{
{"tuple", "SELECT addInterval((INTERVAL 1 DAY, INTERVAL 1 YEAR), INTERVAL 1 MONTH)"},
{"interval1", "SELECT addInterval(INTERVAL 1 DAY, INTERVAL 1 MONTH)"},
{"interval2", "SELECT addInterval(INTERVAL 1 DAY, INTERVAL 1 DAY)"},
},
Documentation::Categories{"Tuple", "Interval"}
});
factory.registerFunction<FunctionTupleSubtractInterval>(
{
R"(
Adds an negated interval to another interval or tuple of intervals. The returned value is tuple of intervals.
[example:tuple]
[example:interval1]
If the types of the first interval (or the interval in the tuple) and the second interval are the same they will be merged into one interval.
[example:interval2]
)",
Documentation::Examples{
{"tuple", "SELECT subtractInterval((INTERVAL 1 DAY, INTERVAL 1 YEAR), INTERVAL 1 MONTH)"},
{"interval1", "SELECT subtractInterval(INTERVAL 1 DAY, INTERVAL 1 MONTH)"},
{"interval2", "SELECT subtractInterval(INTERVAL 2 DAY, INTERVAL 1 DAY)"},
},
Documentation::Categories{"Tuple", "Interval"}
});
factory.registerFunction<FunctionTupleMultiplyByNumber>();
factory.registerFunction<FunctionTupleDivideByNumber>();

View File

@ -1,6 +1,5 @@
#pragma once
#include <base/bit_cast.h>
#include <Common/HashTable/Hash.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnArray.h>

View File

@ -263,6 +263,11 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
user_process_list.user_memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator_for_user);
user_process_list.user_memory_tracker.setDescription("(for user)");
if (!total_network_throttler && settings.max_network_bandwidth_for_all_users)
{
total_network_throttler = std::make_shared<Throttler>(settings.max_network_bandwidth_for_all_users);
}
if (!user_process_list.user_throttler)
{
if (settings.max_network_bandwidth_for_user)
@ -270,11 +275,6 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
else if (settings.max_network_bandwidth_for_all_users)
user_process_list.user_throttler = total_network_throttler;
}
if (!total_network_throttler && settings.max_network_bandwidth_for_all_users)
{
total_network_throttler = std::make_shared<Throttler>(settings.max_network_bandwidth_for_all_users);
}
}
return res;

View File

@ -86,6 +86,7 @@ NamesAndTypesList SessionLogElement::getNamesAndTypes()
AUTH_TYPE_NAME_AND_VALUE(AuthType::DOUBLE_SHA1_PASSWORD),
AUTH_TYPE_NAME_AND_VALUE(AuthType::LDAP),
AUTH_TYPE_NAME_AND_VALUE(AuthType::KERBEROS),
AUTH_TYPE_NAME_AND_VALUE(AuthType::SSL_CERTIFICATE),
});
#undef AUTH_TYPE_NAME_AND_VALUE
static_assert(static_cast<int>(AuthenticationType::MAX) == 7);

View File

@ -1734,16 +1734,18 @@ public:
if (state == 0)
{
state = 1;
auto begin = pos;
auto init_expected = expected;
ASTPtr string_literal;
String literal;
//// A String literal followed INTERVAL keyword,
/// the literal can be a part of an expression or
/// include Number and INTERVAL TYPE at the same time
if (ParserStringLiteral{}.parse(pos, string_literal, expected))
{
String literal;
if (string_literal->as<ASTLiteral &>().value.tryGet(literal))
if (ParserStringLiteral{}.parse(pos, string_literal, expected)
&& string_literal->as<ASTLiteral &>().value.tryGet(literal))
{
Tokens tokens(literal.data(), literal.data() + literal.size());
IParser::Pos token_pos(tokens, 0);
@ -1751,32 +1753,35 @@ public:
ASTPtr expr;
if (!ParserNumber{}.parse(token_pos, expr, token_expected))
{
return false;
}
else
{
/// case: INTERVAL '1' HOUR
/// back to begin
if (!token_pos.isValid())
{
pos = begin;
expected = init_expected;
return true;
}
else
{
/// case: INTERVAL '1 HOUR'
if (!parseIntervalKind(token_pos, token_expected, interval_kind))
return false;
elements = {makeASTFunction(interval_kind.toNameOfFunctionToIntervalDataType(), expr)};
pushResult(makeASTFunction(interval_kind.toNameOfFunctionToIntervalDataType(), expr));
/// case: INTERVAL '1 HOUR 1 SECOND ...'
while (token_pos.isValid())
{
if (!ParserNumber{}.parse(token_pos, expr, token_expected) ||
!parseIntervalKind(token_pos, token_expected, interval_kind))
return false;
pushResult(makeASTFunction(interval_kind.toNameOfFunctionToIntervalDataType(), expr));
}
finished = true;
return true;
}
}
}
}
state = 1;
return true;
}
@ -1795,6 +1800,17 @@ public:
return true;
}
protected:
bool getResultImpl(ASTPtr & node) override
{
if (elements.size() == 1)
node = elements[0];
else
node = makeASTFunction("tuple", std::move(elements));
return true;
}
private:
IntervalKind interval_kind;
};

View File

@ -109,6 +109,13 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
{
std::lock_guard guard(processors_mutex);
/// Do not add new processors to existing list, since the query was already cancelled.
if (cancelled)
{
for (auto & processor : new_processors)
processor->cancel();
return false;
}
processors->insert(processors->end(), new_processors.begin(), new_processors.end());
}
@ -388,6 +395,7 @@ void ExecutingGraph::cancel()
std::lock_guard guard(processors_mutex);
for (auto & processor : *processors)
processor->cancel();
cancelled = true;
}
}

View File

@ -157,6 +157,7 @@ private:
UpgradableMutex nodes_mutex;
const bool profile_processors;
bool cancelled = false;
};
}

View File

@ -30,7 +30,7 @@ class ASTStorage;
M(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \
M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) \
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
M(HandleKafkaErrorMode, kafka_handle_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Passible values: default, stream.", 0) \
M(HandleKafkaErrorMode, kafka_handle_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default, stream.", 0) \
M(Bool, kafka_commit_on_select, false, "Commit messages when select query is made", 0) \
/** TODO: */

View File

@ -29,6 +29,7 @@
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/MergeTreeIndexUtils.h>
#include <algorithm>
#include <cassert>
#include <stack>
#include <limits>
@ -55,10 +56,15 @@ String Range::toString() const
}
/// Example: for `Hello\_World% ...` string it returns `Hello_World`, and for `%test%` returns an empty string.
String extractFixedPrefixFromLikePattern(const String & like_pattern)
/// Returns the prefix of like_pattern before the first wildcard, e.g. 'Hello\_World% ...' --> 'Hello\_World'
/// We call a pattern "perfect prefix" if:
/// - (1) the pattern has a wildcard
/// - (2) the first wildcard is '%' and is only followed by nothing or other '%'
/// e.g. 'test%' or 'test%% has perfect prefix 'test', 'test%x', 'test%_' or 'test_' has no perfect prefix.
String extractFixedPrefixFromLikePattern(std::string_view like_pattern, bool requires_perfect_prefix)
{
String fixed_prefix;
fixed_prefix.reserve(like_pattern.size());
const char * pos = like_pattern.data();
const char * end = pos + like_pattern.size();
@ -67,10 +73,13 @@ String extractFixedPrefixFromLikePattern(const String & like_pattern)
switch (*pos)
{
case '%':
[[fallthrough]];
case '_':
if (requires_perfect_prefix)
{
bool is_prefect_prefix = std::all_of(pos, end, [](auto c) { return c == '%'; });
return is_prefect_prefix ? fixed_prefix : "";
}
return fixed_prefix;
case '\\':
++pos;
if (pos == end)
@ -78,12 +87,13 @@ String extractFixedPrefixFromLikePattern(const String & like_pattern)
[[fallthrough]];
default:
fixed_prefix += *pos;
break;
}
++pos;
}
/// If we can reach this code, it means there was no wildcard found in the pattern, so it is not a perfect prefix
if (requires_perfect_prefix)
return "";
return fixed_prefix;
}
@ -346,7 +356,7 @@ const KeyCondition::AtomMap KeyCondition::atom_map
if (value.getType() != Field::Types::String)
return false;
String prefix = extractFixedPrefixFromLikePattern(value.get<const String &>());
String prefix = extractFixedPrefixFromLikePattern(value.get<const String &>(), /*requires_perfect_prefix*/ false);
if (prefix.empty())
return false;
@ -360,6 +370,27 @@ const KeyCondition::AtomMap KeyCondition::atom_map
return true;
}
},
{
"notLike",
[] (RPNElement & out, const Field & value)
{
if (value.getType() != Field::Types::String)
return false;
String prefix = extractFixedPrefixFromLikePattern(value.get<const String &>(), /*requires_perfect_prefix*/ true);
if (prefix.empty())
return false;
String right_bound = firstStringThatIsGreaterThanAllStringsWithPrefix(prefix);
out.function = RPNElement::FUNCTION_NOT_IN_RANGE;
out.range = !right_bound.empty()
? Range(prefix, true, right_bound, false)
: Range::createLeftBounded(prefix, true);
return true;
}
},
{
"startsWith",
[] (RPNElement & out, const Field & value)

View File

@ -485,6 +485,6 @@ private:
bool strict;
};
String extractFixedPrefixFromLikePattern(const String & like_pattern);
String extractFixedPrefixFromLikePattern(std::string_view like_pattern, bool requires_perfect_prefix);
}

View File

@ -1019,13 +1019,14 @@ void MergeTreeData::loadDataPartsFromDisk(
if (!part_opt)
return;
LOG_TRACE(log, "Loading part {} from disk {}", part_name, part_disk_ptr->getName());
const auto & part_info = *part_opt;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(single_disk_volume, relative_data_path, part_name);
auto part = createPart(part_name, part_info, data_part_storage);
bool broken = false;
LOG_TRACE(log, "Loading part {} ({}) from disk {}", part_name, part->getType().toString(), part_disk_ptr->getName());
String part_path = fs::path(relative_data_path) / part_name;
String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
if (part_disk_ptr->exists(marker_path))
@ -1130,8 +1131,11 @@ void MergeTreeData::loadDataPartsFromDisk(
{
for (size_t thread = 0; thread < num_threads; ++thread)
{
pool.scheduleOrThrowOnError([&, thread]
pool.scheduleOrThrowOnError([&, thread, thread_group = CurrentThread::getGroup()]
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
while (true)
{
std::pair<String, DiskPtr> thread_part;
@ -5426,6 +5430,7 @@ static void selectBestProjection(
auto projection_result_ptr = reader.estimateNumMarksToRead(
projection_parts,
candidate.prewhere_info,
candidate.required_columns,
storage_snapshot->metadata,
candidate.desc->metadata,
@ -5449,6 +5454,7 @@ static void selectBestProjection(
{
auto normal_result_ptr = reader.estimateNumMarksToRead(
normal_parts,
query_info.prewhere_info,
required_columns,
storage_snapshot->metadata,
storage_snapshot->metadata,
@ -5783,7 +5789,6 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
const auto & analysis_result = select.getAnalysisResult();
query_info.prepared_sets = select.getQueryAnalyzer()->getPreparedSets();
query_info.prewhere_info = analysis_result.prewhere_info;
const auto & before_where = analysis_result.before_where;
const auto & where_column_name = analysis_result.where_column_name;
@ -6060,6 +6065,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
{
auto normal_result_ptr = reader.estimateNumMarksToRead(
normal_parts,
query_info.prewhere_info,
analysis_result.required_columns,
metadata_snapshot,
metadata_snapshot,
@ -6092,6 +6098,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
{
query_info.merge_tree_select_result_ptr = reader.estimateNumMarksToRead(
parts,
query_info.prewhere_info,
analysis_result.required_columns,
metadata_snapshot,
metadata_snapshot,
@ -6173,8 +6180,6 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
selected_candidate->aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
}
/// Just in case, reset prewhere info calculated from projection.
query_info.prewhere_info.reset();
return *selected_candidate;
}

View File

@ -214,6 +214,14 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
/// Previous part only in boundaries of partition frame
const MergeTreeData::DataPartPtr * prev_part = nullptr;
/// collect min_age for each partition while iterating parts
struct PartitionInfo
{
time_t min_age{std::numeric_limits<time_t>::max()};
};
std::unordered_map<std::string, PartitionInfo> partitions_info;
size_t parts_selected_precondition = 0;
for (const MergeTreeData::DataPartPtr & part : data_parts)
{
@ -277,6 +285,9 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
part_info.compression_codec_desc = part->default_codec->getFullCodecDesc();
part_info.shall_participate_in_merges = has_volumes_with_disabled_merges ? part->shallParticipateInMerges(storage_policy) : true;
auto & partition_info = partitions_info[partition_id];
partition_info.min_age = std::min(partition_info.min_age, part_info.age);
++parts_selected_precondition;
parts_ranges.back().emplace_back(part_info);
@ -333,6 +344,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
SimpleMergeSelector::Settings merge_settings;
/// Override value from table settings
merge_settings.max_parts_to_merge_at_once = data_settings->max_parts_to_merge_at_once;
if (!data_settings->min_age_to_force_merge_on_partition_only)
merge_settings.min_age_to_force_merge = data_settings->min_age_to_force_merge_seconds;
if (aggressive)
@ -347,6 +359,20 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (parts_to_merge.empty())
{
if (data_settings->min_age_to_force_merge_on_partition_only && data_settings->min_age_to_force_merge_seconds)
{
auto best_partition_it = std::max_element(
partitions_info.begin(),
partitions_info.end(),
[](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; });
assert(best_partition_it != partitions_info.end());
if (static_cast<size_t>(best_partition_it->second.min_age) >= data_settings->min_age_to_force_merge_seconds)
return selectAllPartsToMergeWithinPartition(
future_part, can_merge_callback, best_partition_it->first, true, metadata_snapshot, txn, out_disable_reason);
}
if (out_disable_reason)
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
return SelectPartsDecision::CANNOT_SELECT;

View File

@ -781,6 +781,11 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
ReadFromMergeTree::IndexStats & index_stats)
{
const Settings & settings = context->getSettingsRef();
/// TODO: Analyzer syntax analyzer result
if (settings.allow_experimental_analyzer)
return;
std::optional<PartitionPruner> partition_pruner;
std::optional<KeyCondition> minmax_idx_condition;
DataTypes minmax_columns_types;
@ -1294,6 +1299,7 @@ static void selectColumnNames(
MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const PrewhereInfoPtr & prewhere_info,
const Names & column_names_to_return,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
@ -1318,7 +1324,7 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
return ReadFromMergeTree::selectRangesToRead(
std::move(parts),
query_info.prewhere_info,
prewhere_info,
added_filter_nodes,
metadata_snapshot_base,
metadata_snapshot,

View File

@ -56,6 +56,7 @@ public:
/// This method is used to select best projection for table.
MergeTreeDataSelectAnalysisResultPtr estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const PrewhereInfoPtr & prewhere_info,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,

View File

@ -1,6 +1,5 @@
#include <Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.h>
#include <base/bit_cast.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnFixedString.h>

View File

@ -3,7 +3,6 @@
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <base/types.h>
#include <base/bit_cast.h>
#include <DataTypes/DataTypeNullable.h>
#include <Storages/MergeTree/MergeTreeIndexConditionBloomFilter.h>
#include <Columns/ColumnConst.h>

View File

@ -5,7 +5,6 @@
#include <Columns/ColumnFixedString.h>
#include <DataTypes/DataTypeNullable.h>
#include <Common/HashTable/Hash.h>
#include <base/bit_cast.h>
#include <Interpreters/BloomFilterHash.h>
#include <IO/WriteHelpers.h>

View File

@ -63,6 +63,7 @@ struct Settings;
M(UInt64, merge_tree_clear_old_parts_interval_seconds, 1, "The period of executing the clear old parts operation in background.", 0) \
M(UInt64, merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds, 1ULL * 3600 * 24 * 30, "Remove old broken detached parts in the background if they remained intouched for a specified by this setting period of time.", 0) \
M(UInt64, min_age_to_force_merge_seconds, 0, "If all parts in a certain range are older than this value, range will be always eligible for merging. Set to 0 to disable.", 0) \
M(Bool, min_age_to_force_merge_on_partition_only, false, "Whether min_age_to_force_merge_seconds should be applied only on the entire partition and not on subset.", false) \
M(UInt64, merge_tree_enable_clear_old_broken_detached, false, "Enable clearing old broken detached parts operation in background.", 0) \
M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \
\

View File

@ -0,0 +1,300 @@
#include "config.h"
#if USE_AWS_S3
#include <Storages/StorageDelta.h>
#include <Common/logger_useful.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadSettings.h>
#include <IO/S3Common.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StorageFactory.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Formats/FormatFactory.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <QueryPipeline/Pipe.h>
#include <fmt/format.h>
#include <fmt/ranges.h>
#include <ranges>
namespace DB
{
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_DATA;
}
void DeltaLakeMetadata::setLastModifiedTime(const String & filename, uint64_t timestamp)
{
file_update_time[filename] = timestamp;
}
void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */)
{
bool erase = file_update_time.erase(filename);
if (!erase)
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid table metadata, tried to remove {} before adding it", filename);
}
std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
{
std::vector<String> keys;
keys.reserve(file_update_time.size());
for (auto && [k, _] : file_update_time)
keys.push_back(k);
return keys;
}
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context)
: base_configuration(configuration_), table_path(table_path_)
{
Init(context);
}
void JsonMetadataGetter::Init(ContextPtr context)
{
auto keys = getJsonLogFiles();
// read data from every json log file
for (const String & key : keys)
{
auto buf = createS3ReadBuffer(key, context);
char c;
while (!buf->eof())
{
/// May be some invalid characters before json.
while (buf->peek(c) && c != '{')
buf->ignore();
if (buf->eof())
break;
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
if (json_str.empty())
continue;
const JSON json(json_str);
handleJSON(json);
}
}
}
std::vector<String> JsonMetadataGetter::getJsonLogFiles()
{
std::vector<String> keys;
const auto & client = base_configuration.client;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};
const auto bucket{base_configuration.uri.bucket};
request.SetBucket(bucket);
/// DeltaLake format stores all metadata json files in _delta_log directory
static constexpr auto deltalake_metadata_directory = "_delta_log";
request.SetPrefix(std::filesystem::path(table_path) / deltalake_metadata_directory);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(table_path),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey();
// DeltaLake metadata files have json extension
if (std::filesystem::path(filename).extension() == ".json")
keys.push_back(filename);
}
/// Needed in case any more results are available
/// if so, we will continue reading, and not read keys that were already read
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
/// Set to false if all of the results were returned. Set to true if more keys
/// are available to return. If the number of results exceeds that specified by
/// MaxKeys, all of the results might not be returned
is_finished = !outcome.GetResult().GetIsTruncated();
}
return keys;
}
std::shared_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key, ContextPtr context)
{
/// TODO: add parallel downloads
return std::make_shared<ReadBufferFromS3>(
base_configuration.client,
base_configuration.uri.bucket,
key,
base_configuration.uri.version_id,
/* max single read retries */10,
context->getReadSettings());
}
void JsonMetadataGetter::handleJSON(const JSON & json)
{
if (json.has("add"))
{
auto path = json["add"]["path"].getString();
auto timestamp = json["add"]["modificationTime"].getInt();
metadata.setLastModifiedTime(path, timestamp);
}
else if (json.has("remove"))
{
auto path = json["remove"]["path"].getString();
auto timestamp = json["remove"]["deletionTimestamp"].getInt();
metadata.remove(path, timestamp);
}
}
StorageDelta::StorageDelta(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key)
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
JsonMetadataGetter getter{base_configuration, table_path, context_};
auto keys = getter.getFiles();
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys));
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
// set new url in configuration
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
new_configuration.format = configuration_.format;
if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
s3engine = std::make_shared<StorageS3>(
new_configuration,
table_id_,
columns_,
constraints_,
comment,
context_,
format_settings_,
/* distributed_processing_ */ false,
nullptr);
}
Pipe StorageDelta::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
StorageS3::updateS3Configuration(context, base_configuration);
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
String StorageDelta::generateQueryFromKeys(std::vector<String> && keys)
{
// DeltaLake store data parts in different files
// keys are filenames of parts
// for StorageS3 to read all parts we need format {key1,key2,key3,...keyn}
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
return new_query;
}
void registerStorageDelta(StorageFactory & factory)
{
factory.registerStorage(
"DeltaLake",
[](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty() || engine_args.size() < 3)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage DeltaLake requires 3 to 4 arguments: table_url, access_key, secret_access_key, [format]");
StorageS3Configuration configuration;
configuration.url = checkAndGetLiteralArgument<String>(engine_args[0], "url");
configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id");
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[2], "secret_access_key");
if (engine_args.size() == 4)
configuration.format = checkAndGetLiteralArgument<String>(engine_args[3], "format");
else
{
/// DeltaLake uses Parquet by default.
configuration.format = "Parquet";
}
return std::make_shared<StorageDelta>(
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), std::nullopt);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
}
#endif

107
src/Storages/StorageDelta.h Normal file
View File

@ -0,0 +1,107 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
# include <Storages/IStorage.h>
# include <Storages/StorageS3.h>
# include <unordered_map>
# include <base/JSON.h>
namespace Poco
{
class Logger;
}
namespace Aws::S3
{
class S3Client;
}
namespace DB
{
// class to parse json deltalake metadata and find files needed for query in table
class DeltaLakeMetadata
{
public:
DeltaLakeMetadata() = default;
void setLastModifiedTime(const String & filename, uint64_t timestamp);
void remove(const String & filename, uint64_t timestamp);
std::vector<String> ListCurrentFiles() &&;
private:
std::unordered_map<String, uint64_t> file_update_time;
};
// class to get deltalake log json files and read json from them
class JsonMetadataGetter
{
public:
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context);
std::vector<String> getFiles() { return std::move(metadata).ListCurrentFiles(); }
private:
void Init(ContextPtr context);
std::vector<String> getJsonLogFiles();
std::shared_ptr<ReadBuffer> createS3ReadBuffer(const String & key, ContextPtr context);
void handleJSON(const JSON & json);
StorageS3::S3Configuration base_configuration;
String table_path;
DeltaLakeMetadata metadata;
};
class StorageDelta : public IStorage
{
public:
// 1. Parses internal file structure of table
// 2. Finds out parts with latest version
// 3. Creates url for underlying StorageS3 enigne to handle reads
StorageDelta(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_);
String getName() const override { return "DeltaLake"; }
// Reads latest version of DeltaLake table
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
private:
void Init();
// DeltaLake stores data in parts in different files
// keys is vector of parts with latest version
// generateQueryFromKeys constructs query from parts filenames for
// underlying StorageS3 engine
static String generateQueryFromKeys(std::vector<String> && keys);
StorageS3::S3Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine;
Poco::Logger * log;
String table_path;
};
}
#endif

View File

@ -0,0 +1,231 @@
#include "config.h"
#if USE_AWS_S3
#include <Storages/StorageHudi.h>
#include <Common/logger_useful.h>
#include <Formats/FormatFactory.h>
#include <IO/S3Common.h>
#include <IO/ReadHelpers.h>
#include <Storages/StorageFactory.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <QueryPipeline/Pipe.h>
#include <ranges>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int S3_ERROR;
extern const int LOGICAL_ERROR;
}
StorageHudi::StorageHudi(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key)
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
auto keys = getKeysFromS3();
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration_.format);
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key;
new_configuration.format = configuration_.format;
if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
s3engine = std::make_shared<StorageS3>(
new_configuration,
table_id_,
columns_,
constraints_,
comment,
context_,
format_settings_,
/* distributed_processing_ */ false,
nullptr);
}
Pipe StorageHudi::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
StorageS3::updateS3Configuration(context, base_configuration);
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
std::vector<std::string> StorageHudi::getKeysFromS3()
{
std::vector<std::string> keys;
const auto & client = base_configuration.client;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};
const auto bucket{base_configuration.uri.bucket};
request.SetBucket(bucket);
request.SetPrefix(table_path);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(table_path),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
keys.push_back(filename);
LOG_DEBUG(log, "Found file: {}", filename);
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return keys;
}
String StorageHudi::generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
{
/// For each partition path take only latest file.
struct FileInfo
{
String filename;
UInt64 timestamp;
};
std::unordered_map<String, FileInfo> latest_parts; /// Partition path (directory) -> latest part file info.
/// Make format lowercase.
const auto expected_extension= "." + Poco::toLower(format);
/// Filter only files with specific format.
auto keys_filter = [&](const String & key) { return std::filesystem::path(key).extension() == expected_extension; };
for (const auto & key : keys | std::views::filter(keys_filter))
{
const auto key_path = fs::path(key);
const String filename = key_path.filename();
const String partition_path = key_path.parent_path();
/// Every filename contains metadata split by "_", timestamp is after last "_".
const auto delim = key.find_last_of('_') + 1;
if (delim == std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected format of metadata files");
const auto timestamp = parse<UInt64>(key.substr(delim + 1));
auto it = latest_parts.find(partition_path);
if (it == latest_parts.end())
{
latest_parts.emplace(partition_path, FileInfo{filename, timestamp});
}
else if (it->second.timestamp < timestamp)
{
it->second = {filename, timestamp};
}
}
std::string list_of_keys;
for (const auto & [directory, file_info] : latest_parts)
{
if (!list_of_keys.empty())
list_of_keys += ",";
list_of_keys += std::filesystem::path(directory) / file_info.filename;
}
return "{" + list_of_keys + "}";
}
void registerStorageHudi(StorageFactory & factory)
{
factory.registerStorage(
"Hudi",
[](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty() || engine_args.size() < 3)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Hudi requires 3 to 4 arguments: table_url, access_key, secret_access_key, [format]");
StorageS3Configuration configuration;
configuration.url = checkAndGetLiteralArgument<String>(engine_args[0], "url");
configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id");
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[2], "secret_access_key");
if (engine_args.size() == 4)
configuration.format = checkAndGetLiteralArgument<String>(engine_args[3], "format");
else
{
// Apache Hudi uses Parquet by default
configuration.format = "Parquet";
}
auto format_settings = getFormatSettings(args.getContext());
return std::make_shared<StorageHudi>(
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), format_settings);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
}
#endif

View File

@ -0,0 +1,69 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
# include <Storages/IStorage.h>
# include <Storages/StorageS3.h>
namespace Poco
{
class Logger;
}
namespace Aws::S3
{
class S3Client;
}
namespace DB
{
class StorageHudi : public IStorage
{
public:
/// 1. Parses internal file structure of table.
/// 2. Finds out parts with latest version.
/// 3. Creates url for underlying StorageS3 enigne to handle reads.
StorageHudi(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_);
String getName() const override { return "Hudi"; }
/// Reads latest version of Apache Hudi table
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
private:
std::vector<std::string> getKeysFromS3();
/// Apache Hudi store parts of data in different files.
/// Every part file has timestamp in it.
/// Every partition(directory) in Apache Hudi has different versions of part.
/// To find needed parts we need to find out latest part file for every partition.
/// Part format is usually parquet, but can differ.
static String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format);
StorageS3::S3Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine;
Poco::Logger * log;
String table_path;
};
}
#endif

View File

@ -212,6 +212,8 @@ public:
private:
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
friend class StorageHudi;
friend class StorageDelta;
S3Configuration s3_configuration;
std::vector<String> keys;

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
#include <Storages/System/StorageSystemPartsBase.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
@ -20,7 +21,9 @@ StorageSystemDetachedParts::StorageSystemDetachedParts(const StorageID & table_i
{"table", std::make_shared<DataTypeString>()},
{"partition_id", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
{"name", std::make_shared<DataTypeString>()},
{"bytes_on_disk", std::make_shared<DataTypeUInt64>()},
{"disk", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
{"reason", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
{"min_block_number", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())},
{"max_block_number", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())},
@ -28,6 +31,33 @@ StorageSystemDetachedParts::StorageSystemDetachedParts(const StorageID & table_i
}});
setInMemoryMetadata(storage_metadata);
}
static void calculateTotalSizeOnDiskImpl(const DiskPtr & disk, const String & from, UInt64 & total_size)
{
/// Files or directories of detached part may not exist. Only count the size of existing files.
if (disk->isFile(from))
{
total_size += disk->getFileSize(from);
}
else
{
for (auto it = disk->iterateDirectory(from); it->isValid(); it->next())
calculateTotalSizeOnDiskImpl(disk, fs::path(from) / it->name(), total_size);
}
}
static UInt64 calculateTotalSizeOnDisk(const DiskPtr & disk, const String & from)
{
UInt64 total_size = 0;
try
{
calculateTotalSizeOnDiskImpl(disk, from, total_size);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
return total_size;
}
Pipe StorageSystemDetachedParts::read(
const Names & /* column_names */,
@ -50,11 +80,14 @@ Pipe StorageSystemDetachedParts::read(
for (const auto & p : parts)
{
size_t i = 0;
String detached_part_path = fs::path(MergeTreeData::DETACHED_DIR_NAME) / p.dir_name;
new_columns[i++]->insert(info.database);
new_columns[i++]->insert(info.table);
new_columns[i++]->insert(p.valid_name ? p.partition_id : Field());
new_columns[i++]->insert(p.dir_name);
new_columns[i++]->insert(calculateTotalSizeOnDisk(p.disk, fs::path(info.data->getRelativeDataPath()) / detached_part_path));
new_columns[i++]->insert(p.disk->getName());
new_columns[i++]->insert((fs::path(info.data->getFullPathOnDisk(p.disk)) / detached_part_path).string());
new_columns[i++]->insert(p.valid_name ? p.prefix : Field());
new_columns[i++]->insert(p.valid_name ? p.min_block : Field());
new_columns[i++]->insert(p.valid_name ? p.max_block : Field());

View File

@ -116,7 +116,7 @@ void StorageSystemMergeTreeMetadataCache::fillData(MutableColumns & res_columns,
}
else
{
String target = extractFixedPrefixFromLikePattern(key);
String target = extractFixedPrefixFromLikePattern(key, /*requires_perfect_prefix*/ false);
if (target.empty())
throw Exception(
"SELECT from system.merge_tree_metadata_cache table must contain condition like key = 'key' or key LIKE 'prefix%' in WHERE clause.", ErrorCodes::BAD_ARGUMENTS);

View File

@ -1,15 +1,22 @@
#include <Storages/System/StorageSystemTableFunctions.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_FUNCTION;
}
NamesAndTypesList StorageSystemTableFunctions::getNamesAndTypes()
{
return
{
{"name", std::make_shared<DataTypeString>()},
{"description", std::make_shared<DataTypeString>()}
{"description", std::make_shared<DataTypeString>()},
{"allow_readonly", std::make_shared<DataTypeUInt8>()}
};
}
@ -20,7 +27,15 @@ void StorageSystemTableFunctions::fillData(MutableColumns & res_columns, Context
for (const auto & function_name : functions_names)
{
res_columns[0]->insert(function_name);
res_columns[1]->insert(factory.getDocumentation(function_name).description);
auto properties = factory.tryGetProperties(function_name);
if (properties)
{
res_columns[1]->insert(properties->documentation.description);
res_columns[2]->insert(properties->allow_readonly);
}
else
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown table function {}", function_name);
}
}

View File

@ -32,6 +32,8 @@ void registerStorageMeiliSearch(StorageFactory& factory);
#if USE_AWS_S3
void registerStorageS3(StorageFactory & factory);
void registerStorageCOS(StorageFactory & factory);
void registerStorageHudi(StorageFactory & factory);
void registerStorageDelta(StorageFactory & factory);
#endif
#if USE_HDFS
@ -118,6 +120,8 @@ void registerStorages()
#if USE_AWS_S3
registerStorageS3(factory);
registerStorageCOS(factory);
registerStorageHudi(factory);
registerStorageDelta(factory);
#endif
#if USE_HDFS

View File

@ -4,6 +4,7 @@
#include <Storages/StorageTableFunction.h>
#include <Access/Common/AccessFlags.h>
#include <Common/ProfileEvents.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace ProfileEvents
@ -25,8 +26,8 @@ StoragePtr ITableFunction::execute(const ASTPtr & ast_function, ContextPtr conte
ProfileEvents::increment(ProfileEvents::TableFunctionExecute);
AccessFlags required_access = getSourceAccessType();
String function_name = getName();
if ((function_name != "null") && (function_name != "view") && (function_name != "viewIfPermitted"))
auto table_function_properties = TableFunctionFactory::instance().tryGetProperties(getName());
if (!(table_function_properties && table_function_properties->allow_readonly))
required_access |= AccessType::CREATE_TEMPORARY_TABLE;
context->checkAccess(required_access);

View File

@ -4,6 +4,7 @@
#include <Storages/IStorage_fwd.h>
#include <Storages/ColumnsDescription.h>
#include <Access/Common/AccessType.h>
#include <Common/Documentation.h>
#include <memory>
#include <string>
@ -79,6 +80,14 @@ private:
virtual const char * getStorageTypeName() const = 0;
};
/// Properties of table function that are independent of argument types and parameters.
struct TableFunctionProperties
{
Documentation documentation;
bool allow_readonly = false;
};
using TableFunctionPtr = std::shared_ptr<ITableFunction>;

View File

@ -91,7 +91,7 @@ InterpreterExplainQuery TableFunctionExplain::getInterpreter(ContextPtr context)
void registerTableFunctionExplain(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionExplain>({R"(
factory.registerFunction<TableFunctionExplain>({.documentation = {R"(
Returns result of EXPLAIN query.
The function should not be called directly but can be invoked via `SELECT * FROM (EXPLAIN <query>)`.
@ -103,7 +103,7 @@ Example:
)",
{{"1", "SELECT explain FROM (EXPLAIN AST SELECT * FROM system.numbers) WHERE explain LIKE '%Asterisk%'"}}
});
}});
}

View File

@ -16,16 +16,15 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
void TableFunctionFactory::registerFunction(
const std::string & name, TableFunctionCreator creator, Documentation doc, CaseSensitiveness case_sensitiveness)
const std::string & name, Value value, CaseSensitiveness case_sensitiveness)
{
if (!table_functions.emplace(name, TableFunctionFactoryData{creator, doc}).second)
if (!table_functions.emplace(name, value).second)
throw Exception("TableFunctionFactory: the table function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
if (case_sensitiveness == CaseInsensitive
&& !case_insensitive_table_functions.emplace(Poco::toLower(name), TableFunctionFactoryData{creator, doc}).second)
&& !case_insensitive_table_functions.emplace(Poco::toLower(name), value).second)
throw Exception("TableFunctionFactory: the case insensitive table function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
}
@ -59,13 +58,13 @@ TableFunctionPtr TableFunctionFactory::tryGet(
auto it = table_functions.find(name);
if (table_functions.end() != it)
{
res = it->second.first();
res = it->second.creator();
}
else
{
it = case_insensitive_table_functions.find(Poco::toLower(name));
if (case_insensitive_table_functions.end() != it)
res = it->second.first();
res = it->second.creator();
}
if (!res)
@ -86,13 +85,29 @@ bool TableFunctionFactory::isTableFunctionName(const std::string & name) const
return table_functions.contains(name);
}
Documentation TableFunctionFactory::getDocumentation(const std::string & name) const
std::optional<TableFunctionProperties> TableFunctionFactory::tryGetProperties(const String & name) const
{
auto it = table_functions.find(name);
if (it == table_functions.end())
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown table function {}", name);
return tryGetPropertiesImpl(name);
}
return it->second.second;
std::optional<TableFunctionProperties> TableFunctionFactory::tryGetPropertiesImpl(const String & name_param) const
{
String name = getAliasToOrName(name_param);
Value found;
/// Find by exact match.
if (auto it = table_functions.find(name); it != table_functions.end())
{
found = it->second;
}
if (auto jt = case_insensitive_table_functions.find(Poco::toLower(name)); jt != case_insensitive_table_functions.end())
found = jt->second;
if (found.creator)
return found.properties;
return {};
}
TableFunctionFactory & TableFunctionFactory::instance()

View File

@ -3,7 +3,6 @@
#include <TableFunctions/ITableFunction.h>
#include <Common/IFactoryWithAliases.h>
#include <Common/NamePrompter.h>
#include <Common/Documentation.h>
#include <functional>
#include <memory>
@ -18,7 +17,24 @@ namespace DB
class Context;
using TableFunctionCreator = std::function<TableFunctionPtr()>;
using TableFunctionFactoryData = std::pair<TableFunctionCreator, Documentation>;
struct TableFunctionFactoryData
{
TableFunctionCreator creator;
TableFunctionProperties properties;
TableFunctionFactoryData() = default;
TableFunctionFactoryData(const TableFunctionFactoryData &) = default;
TableFunctionFactoryData & operator = (const TableFunctionFactoryData &) = default;
template <typename Creator>
requires (!std::is_same_v<Creator, TableFunctionFactoryData>)
TableFunctionFactoryData(Creator creator_, TableFunctionProperties properties_ = {}) /// NOLINT
: creator(std::forward<Creator>(creator_)), properties(std::move(properties_))
{
}
};
/** Lets you get a table function by its name.
*/
@ -31,15 +47,16 @@ public:
/// No locking, you must register all functions before usage of get.
void registerFunction(
const std::string & name,
TableFunctionCreator creator,
Documentation doc = {},
Value value,
CaseSensitiveness case_sensitiveness = CaseSensitive);
template <typename Function>
void registerFunction(Documentation doc = {}, CaseSensitiveness case_sensitiveness = CaseSensitive)
void registerFunction(TableFunctionProperties properties = {}, CaseSensitiveness case_sensitiveness = CaseSensitive)
{
auto creator = []() -> TableFunctionPtr { return std::make_shared<Function>(); };
registerFunction(Function::name, std::move(creator), std::move(doc), case_sensitiveness);
registerFunction(Function::name,
TableFunctionFactoryData{std::move(creator), {std::move(properties)}} ,
case_sensitiveness);
}
/// Throws an exception if not found.
@ -48,7 +65,7 @@ public:
/// Returns nullptr if not found.
TableFunctionPtr tryGet(const std::string & name, ContextPtr context) const;
Documentation getDocumentation(const std::string & name) const;
std::optional<TableFunctionProperties> tryGetProperties(const String & name) const;
bool isTableFunctionName(const std::string & name) const;
@ -61,6 +78,8 @@ private:
String getFactoryName() const override { return "TableFunctionFactory"; }
std::optional<TableFunctionProperties> tryGetPropertiesImpl(const String & name) const;
TableFunctions table_functions;
TableFunctions case_insensitive_table_functions;
};

View File

@ -91,7 +91,7 @@ StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & /*ast_functio
void registerTableFunctionGenerate(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionGenerateRandom>();
factory.registerFunction<TableFunctionGenerateRandom>({.documentation = {}, .allow_readonly = true});
}
}

View File

@ -52,6 +52,6 @@ StoragePtr TableFunctionNull::executeImpl(const ASTPtr & /*ast_function*/, Conte
void registerTableFunctionNull(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionNull>();
factory.registerFunction<TableFunctionNull>({.documentation = {}, .allow_readonly = true});
}
}

View File

@ -51,8 +51,8 @@ StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_f
void registerTableFunctionNumbers(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionNumbers<true>>();
factory.registerFunction<TableFunctionNumbers<false>>();
factory.registerFunction<TableFunctionNumbers<true>>({.documentation = {}, .allow_readonly = true});
factory.registerFunction<TableFunctionNumbers<false>>({.documentation = {}, .allow_readonly = true});
}
template <bool multithreaded>

View File

@ -346,8 +346,8 @@ void registerTableFunctionRemote(TableFunctionFactory & factory)
{
factory.registerFunction("remote", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote"); });
factory.registerFunction("remoteSecure", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote", /* secure = */ true); });
factory.registerFunction("cluster", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("cluster"); });
factory.registerFunction("clusterAllReplicas", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("clusterAllReplicas"); });
factory.registerFunction("cluster", {[] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("cluster"); }, {.documentation = {}, .allow_readonly = true}});
factory.registerFunction("clusterAllReplicas", {[] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("clusterAllReplicas"); }, {.documentation = {}, .allow_readonly = true}});
}
}

View File

@ -147,7 +147,7 @@ StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, Context
void registerTableFunctionValues(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionValues>({}, TableFunctionFactory::CaseInsensitive);
factory.registerFunction<TableFunctionValues>({.documentation = {}, .allow_readonly = true}, TableFunctionFactory::CaseInsensitive);
}
}

View File

@ -55,7 +55,7 @@ StoragePtr TableFunctionView::executeImpl(
void registerTableFunctionView(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionView>();
factory.registerFunction<TableFunctionView>({.documentation = {}, .allow_readonly = true});
}
}

View File

@ -107,7 +107,7 @@ bool TableFunctionViewIfPermitted::isPermitted(const ContextPtr & context, const
void registerTableFunctionViewIfPermitted(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionViewIfPermitted>();
factory.registerFunction<TableFunctionViewIfPermitted>({.documentation = {}, .allow_readonly = true});
}
}

View File

@ -48,7 +48,7 @@ StoragePtr TableFunctionZeros<multithreaded>::executeImpl(const ASTPtr & ast_fun
void registerTableFunctionZeros(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionZeros<true>>({R"(
factory.registerFunction<TableFunctionZeros<true>>({.documentation = {R"(
Generates a stream of zeros (a table with one column 'zero' of type 'UInt8') of specified size.
This table function is used in performance tests, where you want to spend as little time as possible to data generation while testing some other parts of queries.
@ -62,9 +62,9 @@ This query will test the speed of `randomPrintableASCII` function using single t
See also the `system.zeros` table.
)",
{{"1", "SELECT count() FROM zeros(100000000) WHERE NOT ignore(randomPrintableASCII(10))"}}
});
}});
factory.registerFunction<TableFunctionZeros<false>>({R"(
factory.registerFunction<TableFunctionZeros<false>>({.documentation = {R"(
Generates a stream of zeros (a table with one column 'zero' of type 'UInt8') of specified size.
This table function is used in performance tests, where you want to spend as little time as possible to data generation while testing some other parts of queries.
@ -78,7 +78,7 @@ This query will test the speed of `randomPrintableASCII` function using multiple
See also the `system.zeros` table.
)",
{{"1", "SELECT count() FROM zeros_mt(1000000000) WHERE NOT ignore(randomPrintableASCII(10))"}}
});
}});
}
template <bool multithreaded>

View File

@ -0,0 +1,25 @@
#!/usr/bin/env python3
from commit_status_helper import get_commit
from env_helper import GITHUB_JOB_URL
from get_robot_token import get_best_robot_token
from github_helper import GitHub
from pr_info import PRInfo
RELEASE_READY_STATUS = "Ready for release"
def main():
pr_info = PRInfo()
gh = GitHub(get_best_robot_token(), per_page=100)
commit = get_commit(gh, pr_info.sha)
commit.create_status(
context=RELEASE_READY_STATUS,
description="the release can be created from the commit",
state="success",
target_url=GITHUB_JOB_URL(),
)
if __name__ == "__main__":
main()

View File

@ -118,6 +118,8 @@ class Release:
except subprocess.CalledProcessError:
logging.fatal("Repo contains uncommitted changes")
raise
if self._git.branch != "master":
raise Exception("the script must be launched only from master")
self.set_release_branch()

Some files were not shown because too many files have changed in this diff Show More