Merge branch 'ClickHouse:master' into zvonand-fix-57819

This commit is contained in:
Andrey Zvonov 2023-12-14 20:14:54 +01:00 committed by GitHub
commit 0174b94469
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
156 changed files with 2640 additions and 913 deletions

View File

@ -555,6 +555,27 @@ jobs:
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
##############################################################################################
########################### ClickBench #######################################################
##############################################################################################
ClickBenchAMD64:
needs: [BuilderDebRelease]
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickBench (amd64)
runner_type: func-tester
run_command: |
cd "$REPO_COPY/tests/ci"
python3 clickbench.py "$CHECK_NAME"
ClickBenchAarch64:
needs: [BuilderDebAarch64]
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickBench (aarch64)
runner_type: func-tester-aarch64
run_command: |
cd "$REPO_COPY/tests/ci"
python3 clickbench.py "$CHECK_NAME"
##############################################################################################
######################################### STRESS TESTS #######################################
##############################################################################################
StressTestAsan:

View File

@ -701,6 +701,27 @@ jobs:
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
##############################################################################################
########################### ClickBench #######################################################
##############################################################################################
ClickBenchAMD64:
needs: [BuilderDebRelease]
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickBench (amd64)
runner_type: func-tester
run_command: |
cd "$REPO_COPY/tests/ci"
python3 clickbench.py "$CHECK_NAME"
ClickBenchAarch64:
needs: [BuilderDebAarch64]
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickBench (aarch64)
runner_type: func-tester-aarch64
run_command: |
cd "$REPO_COPY/tests/ci"
python3 clickbench.py "$CHECK_NAME"
##############################################################################################
######################################### STRESS TESTS #######################################
##############################################################################################
StressTestAsan:

View File

@ -125,6 +125,7 @@
"docker/test/server-jepsen",
"docker/test/sqllogic",
"docker/test/sqltest",
"docker/test/clickbench",
"docker/test/stateless"
]
},
@ -145,6 +146,10 @@
"name": "clickhouse/server-jepsen-test",
"dependent": []
},
"docker/test/clickbench": {
"name": "clickhouse/clickbench",
"dependent": []
},
"docker/test/install/deb": {
"name": "clickhouse/install-deb-test",
"dependent": []

View File

@ -34,7 +34,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.11.1.2711"
ARG VERSION="23.11.2.11"
ARG PACKAGES="clickhouse-keeper"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.11.1.2711"
ARG VERSION="23.11.2.11"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -30,7 +30,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.11.1.2711"
ARG VERSION="23.11.2.11"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -12,6 +12,7 @@ RUN apt-get update \
ripgrep \
zstd \
locales \
sudo \
--yes --no-install-recommends
# Sanitizer options for services (clickhouse-server)

View File

@ -21,7 +21,7 @@ EXTRA_ORDER_BY_COLUMNS=${EXTRA_ORDER_BY_COLUMNS:-"check_name, "}
# trace_log needs more columns for symbolization
EXTRA_COLUMNS_TRACE_LOG="${EXTRA_COLUMNS} symbols Array(LowCardinality(String)), lines Array(LowCardinality(String)), "
EXTRA_COLUMNS_EXPRESSION_TRACE_LOG="${EXTRA_COLUMNS_EXPRESSION}, arrayMap(x -> toLowCardinality(demangle(addressToSymbol(x))), trace) AS symbols, arrayMap(x -> toLowCardinality(addressToLine(x)), trace) AS lines"
EXTRA_COLUMNS_EXPRESSION_TRACE_LOG="${EXTRA_COLUMNS_EXPRESSION}, arrayMap(x -> demangle(addressToSymbol(x)), trace)::Array(LowCardinality(String)) AS symbols, arrayMap(x -> addressToLine(x), trace)::Array(LowCardinality(String)) AS lines"
function __set_connection_args

View File

@ -0,0 +1,10 @@
ARG FROM_TAG=latest
FROM clickhouse/test-base:$FROM_TAG
ENV TZ=Europe/Amsterdam
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
COPY *.sh /
COPY *.sql /
CMD ["/bin/bash", "/run.sh"]

View File

@ -0,0 +1,112 @@
ATTACH TABLE hits UUID 'c449dfbf-ba06-4d13-abec-8396559eb955'
(
WatchID BIGINT NOT NULL,
JavaEnable SMALLINT NOT NULL,
Title TEXT NOT NULL,
GoodEvent SMALLINT NOT NULL,
EventTime TIMESTAMP NOT NULL,
EventDate Date NOT NULL,
CounterID INTEGER NOT NULL,
ClientIP INTEGER NOT NULL,
RegionID INTEGER NOT NULL,
UserID BIGINT NOT NULL,
CounterClass SMALLINT NOT NULL,
OS SMALLINT NOT NULL,
UserAgent SMALLINT NOT NULL,
URL TEXT NOT NULL,
Referer TEXT NOT NULL,
IsRefresh SMALLINT NOT NULL,
RefererCategoryID SMALLINT NOT NULL,
RefererRegionID INTEGER NOT NULL,
URLCategoryID SMALLINT NOT NULL,
URLRegionID INTEGER NOT NULL,
ResolutionWidth SMALLINT NOT NULL,
ResolutionHeight SMALLINT NOT NULL,
ResolutionDepth SMALLINT NOT NULL,
FlashMajor SMALLINT NOT NULL,
FlashMinor SMALLINT NOT NULL,
FlashMinor2 TEXT NOT NULL,
NetMajor SMALLINT NOT NULL,
NetMinor SMALLINT NOT NULL,
UserAgentMajor SMALLINT NOT NULL,
UserAgentMinor VARCHAR(255) NOT NULL,
CookieEnable SMALLINT NOT NULL,
JavascriptEnable SMALLINT NOT NULL,
IsMobile SMALLINT NOT NULL,
MobilePhone SMALLINT NOT NULL,
MobilePhoneModel TEXT NOT NULL,
Params TEXT NOT NULL,
IPNetworkID INTEGER NOT NULL,
TraficSourceID SMALLINT NOT NULL,
SearchEngineID SMALLINT NOT NULL,
SearchPhrase TEXT NOT NULL,
AdvEngineID SMALLINT NOT NULL,
IsArtifical SMALLINT NOT NULL,
WindowClientWidth SMALLINT NOT NULL,
WindowClientHeight SMALLINT NOT NULL,
ClientTimeZone SMALLINT NOT NULL,
ClientEventTime TIMESTAMP NOT NULL,
SilverlightVersion1 SMALLINT NOT NULL,
SilverlightVersion2 SMALLINT NOT NULL,
SilverlightVersion3 INTEGER NOT NULL,
SilverlightVersion4 SMALLINT NOT NULL,
PageCharset TEXT NOT NULL,
CodeVersion INTEGER NOT NULL,
IsLink SMALLINT NOT NULL,
IsDownload SMALLINT NOT NULL,
IsNotBounce SMALLINT NOT NULL,
FUniqID BIGINT NOT NULL,
OriginalURL TEXT NOT NULL,
HID INTEGER NOT NULL,
IsOldCounter SMALLINT NOT NULL,
IsEvent SMALLINT NOT NULL,
IsParameter SMALLINT NOT NULL,
DontCountHits SMALLINT NOT NULL,
WithHash SMALLINT NOT NULL,
HitColor CHAR NOT NULL,
LocalEventTime TIMESTAMP NOT NULL,
Age SMALLINT NOT NULL,
Sex SMALLINT NOT NULL,
Income SMALLINT NOT NULL,
Interests SMALLINT NOT NULL,
Robotness SMALLINT NOT NULL,
RemoteIP INTEGER NOT NULL,
WindowName INTEGER NOT NULL,
OpenerName INTEGER NOT NULL,
HistoryLength SMALLINT NOT NULL,
BrowserLanguage TEXT NOT NULL,
BrowserCountry TEXT NOT NULL,
SocialNetwork TEXT NOT NULL,
SocialAction TEXT NOT NULL,
HTTPError SMALLINT NOT NULL,
SendTiming INTEGER NOT NULL,
DNSTiming INTEGER NOT NULL,
ConnectTiming INTEGER NOT NULL,
ResponseStartTiming INTEGER NOT NULL,
ResponseEndTiming INTEGER NOT NULL,
FetchTiming INTEGER NOT NULL,
SocialSourceNetworkID SMALLINT NOT NULL,
SocialSourcePage TEXT NOT NULL,
ParamPrice BIGINT NOT NULL,
ParamOrderID TEXT NOT NULL,
ParamCurrency TEXT NOT NULL,
ParamCurrencyID SMALLINT NOT NULL,
OpenstatServiceName TEXT NOT NULL,
OpenstatCampaignID TEXT NOT NULL,
OpenstatAdID TEXT NOT NULL,
OpenstatSourceID TEXT NOT NULL,
UTMSource TEXT NOT NULL,
UTMMedium TEXT NOT NULL,
UTMCampaign TEXT NOT NULL,
UTMContent TEXT NOT NULL,
UTMTerm TEXT NOT NULL,
FromTag TEXT NOT NULL,
HasGCLID SMALLINT NOT NULL,
RefererHash BIGINT NOT NULL,
URLHash BIGINT NOT NULL,
CLID INTEGER NOT NULL,
PRIMARY KEY (CounterID, EventDate, UserID, EventTime, WatchID)
)
ENGINE = MergeTree
SETTINGS disk = disk(type = cache, path = '/dev/shm/clickhouse/', max_size = '16G',
disk = disk(type = web, endpoint = 'https://clickhouse-datasets-web.s3.us-east-1.amazonaws.com/'));

View File

@ -0,0 +1,43 @@
SELECT COUNT(*) FROM hits;
SELECT COUNT(*) FROM hits WHERE AdvEngineID <> 0;
SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM hits;
SELECT AVG(UserID) FROM hits;
SELECT COUNT(DISTINCT UserID) FROM hits;
SELECT COUNT(DISTINCT SearchPhrase) FROM hits;
SELECT MIN(EventDate), MAX(EventDate) FROM hits;
SELECT AdvEngineID, COUNT(*) FROM hits WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC;
SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM hits GROUP BY RegionID ORDER BY u DESC LIMIT 10;
SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM hits GROUP BY RegionID ORDER BY c DESC LIMIT 10;
SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT UserID, COUNT(*) FROM hits GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase LIMIT 10;
SELECT UserID, extract(minute FROM EventTime) AS m, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, m, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID FROM hits WHERE UserID = 435090932899640449;
SELECT COUNT(*) FROM hits WHERE URL LIKE '%google%';
SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM hits WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM hits WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT * FROM hits WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10;
SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10;
SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10;
SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10;
SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM hits WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM hits WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM hits;
SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT URL, COUNT(*) AS c FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10;
SELECT 1, URL, COUNT(*) AS c FROM hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10;
SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM hits GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10;
SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10;
SELECT Title, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10;
SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10 OFFSET 100;
SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;
SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000;

79
docker/test/clickbench/run.sh Executable file
View File

@ -0,0 +1,79 @@
#!/bin/bash
SCRIPT_PID=$!
(sleep 1200 && kill -9 $SCRIPT_PID) &
# shellcheck disable=SC1091
source /setup_export_logs.sh
# fail on errors, verbose and export all env variables
set -e -x -a
dpkg -i package_folder/clickhouse-common-static_*.deb
dpkg -i package_folder/clickhouse-server_*.deb
dpkg -i package_folder/clickhouse-client_*.deb
# A directory for cache
mkdir /dev/shm/clickhouse
chown clickhouse:clickhouse /dev/shm/clickhouse
# Allow introspection functions, needed for sending the logs
echo "
profiles:
default:
allow_introspection_functions: 1
" > /etc/clickhouse-server/users.d/allow_introspection_functions.yaml
# Enable text_log
echo "
text_log:
" > /etc/clickhouse-server/config.d/text_log.yaml
config_logs_export_cluster /etc/clickhouse-server/config.d/system_logs_export.yaml
clickhouse start
# Wait for the server to start, but not for too long.
for _ in {1..100}
do
clickhouse-client --query "SELECT 1" && break
sleep 1
done
setup_logs_replication
# Load the data
clickhouse-client --time < /create.sql
# Run the queries
set +x
TRIES=3
QUERY_NUM=1
while read -r query; do
echo -n "["
for i in $(seq 1 $TRIES); do
RES=$(clickhouse-client --query_id "q${QUERY_NUM}-${i}" --time --format Null --query "$query" --progress 0 2>&1 ||:)
echo -n "${RES}"
[[ "$i" != "$TRIES" ]] && echo -n ", "
echo "${QUERY_NUM},${i},${RES}" >> /test_output/test_results.tsv
done
echo "],"
QUERY_NUM=$((QUERY_NUM + 1))
done < /queries.sql
set -x
clickhouse-client --query "SELECT total_bytes FROM system.tables WHERE name = 'hits' AND database = 'default'"
clickhouse-client -q "system flush logs" ||:
stop_logs_replication
clickhouse stop
mv /var/log/clickhouse-server/* /test_output/
echo -e "success\tClickBench finished" > /test_output/check_status.tsv

View File

@ -151,7 +151,7 @@ function run_tests()
set +e
if [[ -n "$USE_PARALLEL_REPLICAS" ]] && [[ "$USE_PARALLEL_REPLICAS" -eq 1 ]]; then
clickhouse-test --client="clickhouse-client --use_hedged_requests=0 --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 \
clickhouse-test --client="clickhouse-client --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 \
--max_parallel_replicas=100 --cluster_for_parallel_replicas='parallel_replicas'" \
-j 2 --testname --shard --zookeeper --check-zookeeper-session --no-stateless --no-parallel-replicas --hung-check --print-time "${ADDITIONAL_OPTIONS[@]}" \
"$SKIP_TESTS_OPTION" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt

View File

@ -30,7 +30,7 @@ def build_url(base_url, dataset):
return os.path.join(base_url, dataset, "partitions", AVAILABLE_DATASETS[dataset])
def dowload_with_progress(url, path):
def download_with_progress(url, path):
logging.info("Downloading from %s to temp path %s", url, path)
for i in range(RETRIES_COUNT):
try:
@ -110,7 +110,7 @@ if __name__ == "__main__":
temp_archive_path = _get_temp_file_name()
try:
download_url_for_dataset = build_url(args.url_prefix, dataset)
dowload_with_progress(download_url_for_dataset, temp_archive_path)
download_with_progress(download_url_for_dataset, temp_archive_path)
unpack_to_clickhouse_directory(temp_archive_path, args.clickhouse_data_path)
except Exception as ex:
logging.info("Some exception occured %s", str(ex))

View File

@ -0,0 +1,22 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.11.2.11-stable (6e5411358c8) FIXME as compared to v23.11.1.2711-stable (05bc8ef1e02)
#### Improvement
* Backported in [#57661](https://github.com/ClickHouse/ClickHouse/issues/57661): Handle sigabrt case when getting PostgreSQl table structure with empty array. [#57618](https://github.com/ClickHouse/ClickHouse/pull/57618) ([Mike Kot (Михаил Кот)](https://github.com/myrrc)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Ignore ON CLUSTER clause in grant/revoke queries for management of replicated access entities. [#57538](https://github.com/ClickHouse/ClickHouse/pull/57538) ([MikhailBurdukov](https://github.com/MikhailBurdukov)).
* Fix SIGSEGV for aggregation of sparse columns with any() RESPECT NULL [#57710](https://github.com/ClickHouse/ClickHouse/pull/57710) ([Azat Khuzhin](https://github.com/azat)).
* Fix bug window functions: revert [#39631](https://github.com/ClickHouse/ClickHouse/issues/39631) [#57766](https://github.com/ClickHouse/ClickHouse/pull/57766) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Pin alpine version of integration tests helper container [#57669](https://github.com/ClickHouse/ClickHouse/pull/57669) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -53,7 +53,6 @@ clickhouse-benchmark [keys] < queries_file;
- `--confidence=N` — Level of confidence for T-test. Possible values: 0 (80%), 1 (90%), 2 (95%), 3 (98%), 4 (99%), 5 (99.5%). Default value: 5. In the [comparison mode](#clickhouse-benchmark-comparison-mode) `clickhouse-benchmark` performs the [Independent two-sample Students t-test](https://en.wikipedia.org/wiki/Student%27s_t-test#Independent_two-sample_t-test) to determine whether the two distributions arent different with the selected level of confidence.
- `--cumulative` — Printing cumulative data instead of data per interval.
- `--database=DATABASE_NAME` — ClickHouse database name. Default value: `default`.
- `--json=FILEPATH``JSON` output. When the key is set, `clickhouse-benchmark` outputs a report to the specified JSON-file.
- `--user=USERNAME` — ClickHouse user name. Default value: `default`.
- `--password=PSWD` — ClickHouse user password. Default value: empty string.
- `--stacktrace` — Stack traces output. When the key is set, `clickhouse-bencmark` outputs stack traces of exceptions.

View File

@ -1081,10 +1081,6 @@ Result:
└─────────────────────────────────────────────────────────────┘
```
**See also**
- [arrayFold](#arrayfold)
## arrayReduceInRanges
Applies an aggregate function to array elements in given ranges and returns an array containing the result corresponding to each range. The function will return the same result as multiple `arrayReduce(agg_func, arraySlice(arr1, index, length), ...)`.
@ -1127,56 +1123,6 @@ Result:
└─────────────────────────────┘
```
## arrayFold
Applies a lambda function to one or more equally-sized arrays and collects the result in an accumulator.
**Syntax**
``` sql
arrayFold(lambda_function, arr1, arr2, ..., accumulator)
```
**Example**
Query:
``` sql
SELECT arrayFold( acc,x -> acc + x*2, [1, 2, 3, 4], toInt64(3)) AS res;
```
Result:
``` text
┌─res─┐
│ 23 │
└─────┘
```
**Example with the Fibonacci sequence**
```sql
SELECT arrayFold( acc,x -> (acc.2, acc.2 + acc.1), range(number), (1::Int64, 0::Int64)).1 AS fibonacci
FROM numbers(1,10);
┌─fibonacci─┐
│ 0 │
│ 1 │
│ 1 │
│ 2 │
│ 3 │
│ 5 │
│ 8 │
│ 13 │
│ 21 │
│ 34 │
└───────────┘
```
**See also**
- [arrayReduce](#arrayreduce)
## arrayReverse(arr)
Returns an array of the same size as the original array containing the elements in reverse order.

View File

@ -543,26 +543,52 @@ Like `concatWithSeparator` but assumes that `concatWithSeparator(sep, expr1, exp
A function is called injective if it returns for different arguments different results. In other words: different arguments never produce identical result.
## substring(s, offset, length)
## substring
Returns a substring with `length` many bytes, starting at the byte at index `offset`. Character indexing starts from 1.
Returns the substring of a string `s` which starts at the specified byte index `offset`. Byte counting starts from 1. If `offset` is 0, an empty string is returned. If `offset` is negative, the substring starts `pos` characters from the end of the string, rather than from the beginning. An optional argument `length` specifies the maximum number of bytes the returned substring may have.
**Syntax**
```sql
substring(s, offset, length)
substring(s, offset[, length])
```
Alias:
- `substr`
- `mid`
**Arguments**
- `s` — The string to calculate a substring from. [String](../../sql-reference/data-types/string.md), [FixedString](../../sql-reference/data-types/fixedstring.md) or [Enum](../../sql-reference/data-types/enum.md)
- `offset` — The starting position of the substring in `s` . [(U)Int*](../../sql-reference/data-types/int-uint.md).
- `length` — The maximum length of the substring. [(U)Int*](../../sql-reference/data-types/int-uint.md). Optional.
**Returned value**
A substring of `s` with `length` many bytes, starting at index `offset`.
Type: `String`.
**Example**
``` sql
SELECT 'database' AS db, substr(db, 5), substr(db, 5, 1)
```
Result:
```result
┌─db───────┬─substring('database', 5)─┬─substring('database', 5, 1)─┐
│ database │ base │ b │
└──────────┴──────────────────────────┴─────────────────────────────┘
```
## substringUTF8
Like `substring` but for Unicode code points. Assumes that the string contains valid UTF-8 encoded text. If this assumption is violated, no exception is thrown and the result is undefined.
## substringIndex(s, delim, count)
## substringIndex
Returns the substring of `s` before `count` occurrences of the delimiter `delim`, as in Spark or MySQL.
@ -593,7 +619,7 @@ Result:
└──────────────────────────────────────────────┘
```
## substringIndexUTF8(s, delim, count)
## substringIndexUTF8
Like `substringIndex` but for Unicode code points. Assumes that the string contains valid UTF-8 encoded text. If this assumption is violated, no exception is thrown and the result is undefined.
@ -1225,7 +1251,7 @@ This function also replaces numeric character references with Unicode characters
**Syntax**
``` sql
decodeHTMComponent(x)
decodeHTMLComponent(x)
```
**Arguments**
@ -1242,7 +1268,7 @@ Type: [String](../../sql-reference/data-types/string.md).
``` sql
SELECT decodeHTMLComponent(''CH');
SELECT decodeHMLComponent('I&heartsuit;ClickHouse');
SELECT decodeHTMLComponent('I&heartsuit;ClickHouse');
```
Result:

View File

@ -12,7 +12,7 @@ Compressed files are supported. Compression type is detected by the extension of
**Syntax**
```sql
SELECT <expr_list> INTO OUTFILE file_name [AND STDOUT] [APPEND] [COMPRESSION type [LEVEL level]]
SELECT <expr_list> INTO OUTFILE file_name [AND STDOUT] [APPEND | TRUNCATE] [COMPRESSION type [LEVEL level]]
```
`file_name` and `type` are string literals. Supported compression types are: `'none'`, `'gzip'`, `'deflate'`, `'br'`, `'xz'`, `'zstd'`, `'lz4'`, `'bz2'`.
@ -26,6 +26,7 @@ SELECT <expr_list> INTO OUTFILE file_name [AND STDOUT] [APPEND] [COMPRESSION typ
- The default [output format](../../../interfaces/formats.md) is `TabSeparated` (like in the command-line client batch mode). Use [FORMAT](format.md) clause to change it.
- If `AND STDOUT` is mentioned in the query then the output that is written to the file is also displayed on standard output. If used with compression, the plaintext is displayed on standard output.
- If `APPEND` is mentioned in the query then the output is appended to an existing file. If compression is used, append cannot be used.
- When writing to a file that already exists, `APPEND` or `TRUNCATE` must be used.
**Example**

View File

@ -19,6 +19,7 @@ fuzzJSON({ named_collection [option=value [,..]] | json_str[, random_seed] })
- `json_str` (String) - The source string representing structured data in JSON format.
- `random_seed` (UInt64) - Manual random seed for producing stable results.
- `reuse_output` (boolean) - Reuse the output from a fuzzing process as input for the next fuzzer.
- `malform_output` (boolean) - Generate a string that cannot be parsed as a JSON object.
- `max_output_length` (UInt64) - Maximum allowable length of the generated or perturbed JSON string.
- `probability` (Float64) - The probability to fuzz a JSON field (a key-value pair). Must be within [0, 1] range.
- `max_nesting_level` (UInt64) - The maximum allowed depth of nested structures within the JSON data.
@ -84,3 +85,13 @@ SELECT * FROM fuzzJSON('{"id":1}', 1234) LIMIT 3;
{"BRjE":16137826149911306846}
{"XjKE":15076727133550123563}
```
``` sql
SELECT * FROM fuzzJSON(json_nc, json_str='{"name" : "FuzzJSON"}', random_seed=1337, malform_output=true) LIMIT 3;
```
``` text
U"name":"FuzzJSON*"SpByjZKtr2VAyHCO"falseh
{"name"keFuzzJSON, "g6vVO7TCIk":jTt^
{"DBhz":YFuzzJSON5}
```

View File

@ -68,6 +68,7 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/waitServersToFinish.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/ServerType.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/HTTPRequestHandlerFactoryMain.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/KeeperReadinessHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/HTTP/HTTPServer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/HTTP/ReadHeaders.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/HTTP/HTTPServerConnection.cpp

View File

@ -33,6 +33,7 @@
#include <Server/HTTP/HTTPServer.h>
#include <Server/TCPServer.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/KeeperReadinessHandler.h>
#include "Core/Defines.h"
#include "config.h"
@ -494,6 +495,29 @@ try
std::make_unique<HTTPServer>(
std::move(my_http_context), createPrometheusMainHandlerFactory(*this, config_getter(), async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
});
/// HTTP control endpoints
port_name = "keeper_server.http_control.port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port) mutable
{
auto my_http_context = httpContext();
Poco::Timespan my_keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0);
Poco::Net::HTTPServerParams::Ptr my_http_params = new Poco::Net::HTTPServerParams;
my_http_params->setTimeout(my_http_context->getReceiveTimeout());
my_http_params->setKeepAliveTimeout(my_keep_alive_timeout);
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(my_http_context->getReceiveTimeout());
socket.setSendTimeout(my_http_context->getSendTimeout());
servers->emplace_back(
listen_host,
port_name,
"HTTP Control: http://" + address.toString(),
std::make_unique<HTTPServer>(
std::move(my_http_context), createKeeperHTTPControlMainHandlerFactory(config_getter(), global_context->getKeeperDispatcher(), "KeeperHTTPControlHandler-factory"), server_pool, socket, http_params)
);
});
}
for (auto & server : *servers)

View File

@ -92,6 +92,7 @@
#include <Server/ProxyV1HandlerFactory.h>
#include <Server/TLSHandlerFactory.h>
#include <Server/ProtocolServerAdapter.h>
#include <Server/KeeperReadinessHandler.h>
#include <Server/HTTP/HTTPServer.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <Core/ServerSettings.h>
@ -1554,6 +1555,34 @@ try
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
#endif
});
/// HTTP control endpoints
port_name = "keeper_server.http_control.port";
createServer(config(), listen_host, port_name, listen_try, /* start_server: */ false,
servers_to_start_before_tables,
[&](UInt16 port) -> ProtocolServerAdapter
{
auto http_context = httpContext();
Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0);
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(http_context->getReceiveTimeout());
http_params->setKeepAliveTimeout(keep_alive_timeout);
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config(), socket, listen_host, port);
socket.setReceiveTimeout(http_context->getReceiveTimeout());
socket.setSendTimeout(http_context->getSendTimeout());
return ProtocolServerAdapter(
listen_host,
port_name,
"HTTP Control: http://" + address.toString(),
std::make_unique<HTTPServer>(
std::move(http_context),
createKeeperHTTPControlMainHandlerFactory(
config_getter(),
global_context->getKeeperDispatcher(),
"KeeperHTTPControlHandler-factory"), server_pool, socket, http_params));
});
}
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ClickHouse server built without NuRaft library. Cannot use internal coordination.");

View File

@ -183,6 +183,7 @@ enum class AccessType
M(SYSTEM_REPLICATION_QUEUES, "SYSTEM STOP REPLICATION QUEUES, SYSTEM START REPLICATION QUEUES, STOP REPLICATION QUEUES, START REPLICATION QUEUES", TABLE, SYSTEM) \
M(SYSTEM_DROP_REPLICA, "DROP REPLICA", TABLE, SYSTEM) \
M(SYSTEM_SYNC_REPLICA, "SYNC REPLICA", TABLE, SYSTEM) \
M(SYSTEM_REPLICA_READINESS, "SYSTEM REPLICA READY, SYSTEM REPLICA UNREADY", GLOBAL, SYSTEM) \
M(SYSTEM_RESTART_REPLICA, "RESTART REPLICA", TABLE, SYSTEM) \
M(SYSTEM_RESTORE_REPLICA, "RESTORE REPLICA", TABLE, SYSTEM) \
M(SYSTEM_WAIT_LOADING_PARTS, "WAIT LOADING PARTS", TABLE, SYSTEM) \

View File

@ -2153,32 +2153,19 @@ void QueryAnalyzer::replaceNodesWithPositionalArguments(QueryTreeNodePtr & node_
node_to_replace = &sort_node->getExpression();
auto * constant_node = (*node_to_replace)->as<ConstantNode>();
if (!constant_node
|| (constant_node->getValue().getType() != Field::Types::UInt64 && constant_node->getValue().getType() != Field::Types::Int64))
if (!constant_node || constant_node->getValue().getType() != Field::Types::UInt64)
continue;
UInt64 pos;
if (constant_node->getValue().getType() == Field::Types::UInt64)
{
pos = constant_node->getValue().get<UInt64>();
}
else // Int64
{
auto value = constant_node->getValue().get<Int64>();
pos = value > 0 ? value : projection_nodes.size() + value + 1;
}
if (!pos || pos > projection_nodes.size())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
UInt64 positional_argument_number = constant_node->getValue().get<UInt64>();
if (positional_argument_number == 0 || positional_argument_number > projection_nodes.size())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Positional argument number {} is out of bounds. Expected in range [1, {}]. In scope {}",
pos,
positional_argument_number,
projection_nodes.size(),
scope.scope_node->formatASTForErrorMessage());
*node_to_replace = projection_nodes[--pos];
--positional_argument_number;
*node_to_replace = projection_nodes[positional_argument_number];
}
}

View File

@ -184,12 +184,12 @@ BackupCoordinationRemote::BackupCoordinationRemote(
if (my_is_internal)
{
String alive_node_path = my_zookeeper_path + "/stage/alive|" + my_current_host;
auto code = zk->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNODEEXISTS)
zk->handleEphemeralNodeExistenceNoFailureInjection(alive_node_path, "");
else if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException::fromPath(code, alive_node_path);
/// Delete the ephemeral node from the previous connection so we don't have to wait for keeper to do it automatically.
zk->tryRemove(alive_node_path);
zk->createAncestors(alive_node_path);
zk->create(alive_node_path, "", zkutil::CreateMode::Ephemeral);
}
})
{

View File

@ -60,12 +60,6 @@ void BackupCoordinationStageSync::set(const String & current_host, const String
}
else
{
/// Make an ephemeral node so the initiator can track if the current host is still working.
String alive_node_path = zookeeper_path + "/alive|" + current_host;
auto code = zookeeper->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS)
throw zkutil::KeeperException::fromPath(code, alive_node_path);
zookeeper->createIfNotExists(zookeeper_path + "/started|" + current_host, "");
zookeeper->createIfNotExists(zookeeper_path + "/current|" + current_host + "|" + new_stage, message);
}
@ -106,39 +100,36 @@ Strings BackupCoordinationStageSync::waitFor(const Strings & all_hosts, const St
namespace
{
struct UnreadyHostState
struct UnreadyHost
{
String host;
bool started = false;
bool alive = false;
};
}
struct BackupCoordinationStageSync::State
{
Strings results;
std::map<String, UnreadyHostState> unready_hosts;
std::optional<Strings> results;
std::optional<std::pair<String, Exception>> error;
std::optional<String> host_terminated;
std::optional<String> disconnected_host;
std::optional<UnreadyHost> unready_host;
};
BackupCoordinationStageSync::State BackupCoordinationStageSync::readCurrentState(
const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const
WithRetries::RetriesControlHolder & retries_control_holder,
const Strings & zk_nodes,
const Strings & all_hosts,
const String & stage_to_wait) const
{
auto zookeeper = retries_control_holder.faulty_zookeeper;
auto & retries_ctl = retries_control_holder.retries_ctl;
std::unordered_set<std::string_view> zk_nodes_set{zk_nodes.begin(), zk_nodes.end()};
State state;
if (zk_nodes_set.contains("error"))
{
String errors;
{
auto holder = with_retries.createRetriesControlHolder("readCurrentState");
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
errors = zookeeper->get(zookeeper_path + "/error");
});
}
String errors = zookeeper->get(zookeeper_path + "/error");
ReadBufferFromOwnString buf{errors};
String host;
readStringBinary(host, buf);
@ -146,64 +137,50 @@ BackupCoordinationStageSync::State BackupCoordinationStageSync::readCurrentState
return state;
}
std::optional<UnreadyHost> unready_host;
for (const auto & host : all_hosts)
{
if (!zk_nodes_set.contains("current|" + host + "|" + stage_to_wait))
{
UnreadyHostState unready_host_state;
const String started_node_name = "started|" + host;
const String alive_node_name = "alive|" + host;
const String alive_node_path = zookeeper_path + "/" + alive_node_name;
unready_host_state.started = zk_nodes_set.contains(started_node_name);
/// Because we do retries everywhere we can't fully rely on ephemeral nodes anymore.
/// Though we recreate "alive" node when reconnecting it might be not enough and race condition is possible.
/// And everything we can do here - just retry.
/// In worst case when we won't manage to see the alive node for a long time we will just abort the backup.
unready_host_state.alive = zk_nodes_set.contains(alive_node_name);
if (!unready_host_state.alive)
bool started = zk_nodes_set.contains(started_node_name);
bool alive = zk_nodes_set.contains(alive_node_name);
if (!alive)
{
LOG_TRACE(log, "Seems like host ({}) is dead. Will retry the check to confirm", host);
auto holder = with_retries.createRetriesControlHolder("readCurrentState::checkAliveNode");
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
if (zookeeper->existsNoFailureInjection(alive_node_path))
{
unready_host_state.alive = true;
return;
}
// Retry with backoff. We also check whether it is last retry or no, because we won't to rethrow an exception.
if (!holder.retries_ctl.isLastRetry())
holder.retries_ctl.setKeeperError(Coordination::Error::ZNONODE, "There is no alive node for host {}. Will retry", host);
});
/// If the "alive" node doesn't exist then we don't have connection to the corresponding host.
/// This node is ephemeral so probably it will be recreated soon. We use zookeeper retries to wait.
/// In worst case when we won't manage to see the alive node for a long time we will just abort the backup.
String message;
if (started)
message = fmt::format("Lost connection to host {}", host);
else
message = fmt::format("No connection to host {} yet", host);
if (!retries_ctl.isLastRetry())
message += ", will retry";
retries_ctl.setUserError(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, message);
state.disconnected_host = host;
return state;
}
LOG_TRACE(log, "Host ({}) appeared to be {}", host, unready_host_state.alive ? "alive" : "dead");
state.unready_hosts.emplace(host, unready_host_state);
if (!unready_host_state.alive && unready_host_state.started && !state.host_terminated)
state.host_terminated = host;
if (!unready_host)
unready_host.emplace(UnreadyHost{.host = host, .started = started});
}
}
if (state.host_terminated || !state.unready_hosts.empty())
return state;
auto holder = with_retries.createRetriesControlHolder("waitImpl::collectStagesToWait");
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
if (unready_host)
{
with_retries.renewZooKeeper(zookeeper);
Strings results;
state.unready_host = std::move(unready_host);
return state;
}
for (const auto & host : all_hosts)
results.emplace_back(zookeeper->get(zookeeper_path + "/current|" + host + "|" + stage_to_wait));
state.results = std::move(results);
});
Strings results;
for (const auto & host : all_hosts)
results.emplace_back(zookeeper->get(zookeeper_path + "/current|" + host + "|" + stage_to_wait));
state.results = std::move(results);
return state;
}
@ -229,7 +206,7 @@ Strings BackupCoordinationStageSync::waitImpl(
auto watch = std::make_shared<Poco::Event>();
Strings zk_nodes;
{
auto holder = with_retries.createRetriesControlHolder("waitImpl::getChildren");
auto holder = with_retries.createRetriesControlHolder("waitImpl");
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
{
@ -237,17 +214,23 @@ Strings BackupCoordinationStageSync::waitImpl(
watch->reset();
/// Get zk nodes and subscribe on their changes.
zk_nodes = zookeeper->getChildren(zookeeper_path, nullptr, watch);
/// Read the current state of zk nodes.
state = readCurrentState(holder, zk_nodes, all_hosts, stage_to_wait);
});
}
/// Read and analyze the current state of zk nodes.
state = readCurrentState(zk_nodes, all_hosts, stage_to_wait);
if (state.error || state.host_terminated || state.unready_hosts.empty())
break; /// Error happened or everything is ready.
/// Analyze the current state of zk nodes.
chassert(state.results || state.error || state.disconnected_host || state.unready_host);
/// Log that we will wait
const auto & unready_host = state.unready_hosts.begin()->first;
LOG_INFO(log, "Waiting on ZooKeeper watch for any node to be changed (currently waiting for host {})", unready_host);
if (state.results || state.error || state.disconnected_host)
break; /// Everything is ready or error happened.
/// Log what we will wait.
const auto & unready_host = *state.unready_host;
LOG_INFO(log, "Waiting on ZooKeeper watch for any node to be changed (currently waiting for host {}{})",
unready_host.host,
(!unready_host.started ? " which didn't start the operation yet" : ""));
/// Wait until `watch_callback` is called by ZooKeeper meaning that zk nodes have changed.
{
@ -270,23 +253,23 @@ Strings BackupCoordinationStageSync::waitImpl(
state.error->second.rethrow();
/// Another host terminated without errors.
if (state.host_terminated)
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Host {} suddenly stopped working", *state.host_terminated);
if (state.disconnected_host)
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "No connection to host {}", *state.disconnected_host);
/// Something's unready, timeout is probably not enough.
if (!state.unready_hosts.empty())
if (state.unready_host)
{
const auto & [unready_host, unready_host_state] = *state.unready_hosts.begin();
const auto & unready_host = *state.unready_host;
throw Exception(
ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
"Waited for host {} too long (> {}){}",
unready_host,
unready_host.host,
to_string(*timeout),
unready_host_state.started ? "" : ": Operation didn't start");
unready_host.started ? "" : ": Operation didn't start");
}
LOG_TRACE(log, "Everything is Ok. All hosts achieved stage {}", stage_to_wait);
return state.results;
return std::move(*state.results);
}
}

View File

@ -29,7 +29,7 @@ private:
void createRootNodes();
struct State;
State readCurrentState(const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const;
State readCurrentState(WithRetries::RetriesControlHolder & retries_control_holder, const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const;
Strings waitImpl(const Strings & all_hosts, const String & stage_to_wait, std::optional<std::chrono::milliseconds> timeout) const;

View File

@ -43,12 +43,12 @@ RestoreCoordinationRemote::RestoreCoordinationRemote(
if (my_is_internal)
{
String alive_node_path = my_zookeeper_path + "/stage/alive|" + my_current_host;
auto code = zk->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNODEEXISTS)
zk->handleEphemeralNodeExistenceNoFailureInjection(alive_node_path, "");
else if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException::fromPath(code, alive_node_path);
/// Delete the ephemeral node from the previous connection so we don't have to wait for keeper to do it automatically.
zk->tryRemove(alive_node_path);
zk->createAncestors(alive_node_path);
zk->create(alive_node_path, "", zkutil::CreateMode::Ephemeral);
}
})
{

View File

@ -519,8 +519,9 @@ void ConfigProcessor::doIncludesRecursive(
if (attr_nodes["from_zk"]) /// we have zookeeper subst
{
if (node->hasChildNodes()) /// only allow substitution for nodes with no value
throw Poco::Exception("Element <" + node->nodeName() + "> has value, can't process from_zk substitution");
/// only allow substitution for nodes with no value and without "replace"
if (node->hasChildNodes() && !replace)
throw Poco::Exception("Element <" + node->nodeName() + "> has value and does not have 'replace' attribute, can't process from_zk substitution");
contributing_zk_paths.insert(attr_nodes["from_zk"]->getNodeValue());
@ -544,8 +545,9 @@ void ConfigProcessor::doIncludesRecursive(
if (attr_nodes["from_env"]) /// we have env subst
{
if (node->hasChildNodes()) /// only allow substitution for nodes with no value
throw Poco::Exception("Element <" + node->nodeName() + "> has value, can't process from_env substitution");
/// only allow substitution for nodes with no value and without "replace"
if (node->hasChildNodes() && !replace)
throw Poco::Exception("Element <" + node->nodeName() + "> has value and does not have 'replace' attribute, can't process from_env substitution");
XMLDocumentPtr env_document;
auto get_env_node = [&](const std::string & name) -> const Node *

View File

@ -260,6 +260,7 @@
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M)
#endif
namespace CurrentMetrics
{
#define M(NAME, DOCUMENTATION) extern const Metric NAME = Metric(__COUNTER__);

View File

@ -99,6 +99,7 @@ struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest
struct TestKeeperGetRequest final : GetRequest, TestKeeperRequest
{
TestKeeperGetRequest() = default;
explicit TestKeeperGetRequest(const GetRequest & base) : GetRequest(base) {}
ResponsePtr createResponse() const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
@ -118,6 +119,8 @@ struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest
struct TestKeeperListRequest : ListRequest, TestKeeperRequest
{
TestKeeperListRequest() = default;
explicit TestKeeperListRequest(const ListRequest & base) : ListRequest(base) {}
ResponsePtr createResponse() const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
@ -176,6 +179,14 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
{
requests.push_back(std::make_shared<TestKeeperCheckRequest>(*concrete_request_check));
}
else if (const auto * concrete_request_get = dynamic_cast<const GetRequest *>(generic_request.get()))
{
requests.push_back(std::make_shared<TestKeeperGetRequest>(*concrete_request_get));
}
else if (const auto * concrete_request_list = dynamic_cast<const ListRequest *>(generic_request.get()))
{
requests.push_back(std::make_shared<TestKeeperListRequest>(*concrete_request_list));
}
else
throw Exception::fromMessage(Error::ZBADARGUMENTS, "Illegal command as part of multi ZooKeeper request");
}

View File

@ -497,6 +497,17 @@ bool ZooKeeper::exists(const std::string & path, Coordination::Stat * stat, cons
return existsWatch(path, stat, callbackForEvent(watch));
}
bool ZooKeeper::anyExists(const std::vector<std::string> & paths)
{
auto exists_multi_response = exists(paths);
for (size_t i = 0; i < exists_multi_response.size(); ++i)
{
if (exists_multi_response[i].error == Coordination::Error::ZOK)
return true;
}
return false;
}
bool ZooKeeper::existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{
Coordination::Error code = existsImpl(path, stat, watch_callback);

View File

@ -286,6 +286,8 @@ public:
return exists(paths.begin(), paths.end());
}
bool anyExists(const std::vector<std::string> & paths);
std::string get(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
std::string getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
@ -422,8 +424,9 @@ public:
/// Performs several operations in a transaction.
/// Throws on every error.
Coordination::Responses multi(const Coordination::Requests & requests);
/// Throws only if some operation has returned an "unexpected" error
/// - an error that would cause the corresponding try- method to throw.
/// Throws only if some operation has returned an "unexpected" error - an error that would cause
/// the corresponding try- method to throw.
/// On exception, `responses` may or may not be populated.
Coordination::Error tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses);
/// Throws nothing (even session expired errors)
Coordination::Error tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses);
@ -567,8 +570,11 @@ public:
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
UInt32 getSessionUptime() const { return static_cast<UInt32>(session_uptime.elapsedSeconds()); }
bool hasReachedDeadline() const { return impl->hasReachedDeadline(); }
uint64_t getSessionTimeoutMS() const { return args.session_timeout_ms; }
void setServerCompletelyStarted();
Int8 getConnectedHostIdx() const;

View File

@ -184,7 +184,7 @@ std::vector<std::pair<String, uint16_t>> parseRemoteDescriptionForExternalDataba
}
else
{
result.emplace_back(std::make_pair(address.substr(0, colon), DB::parseFromString<UInt16>(address.substr(colon + 1))));
result.emplace_back(std::make_pair(address.substr(0, colon), parseFromString<UInt16>(address.substr(colon + 1))));
}
}

View File

@ -1,8 +1,12 @@
#pragma once
#include <base/types.h>
#include <vector>
namespace DB
{
/* Parse a string that generates shards and replicas. Separator - one of two characters '|' or ','
* depending on whether shards or replicas are generated.
* For example:

View File

@ -208,6 +208,9 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
return;
}
/// To avoid reference to binding
const auto & snapshot_path_ref = snapshot_path;
SCOPE_EXIT(
{
LOG_INFO(log, "Removing lock file");
@ -223,7 +226,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
}
catch (...)
{
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_file_info.path);
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_path_ref);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});

View File

@ -35,6 +35,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNSUPPORTED_METHOD;
}
struct ContextSharedPart : boost::noncopyable
@ -376,4 +377,9 @@ void Context::updateKeeperConfiguration([[maybe_unused]] const Poco::Util::Abstr
shared->keeper_dispatcher->updateConfiguration(getConfigRef(), getMacros());
}
std::shared_ptr<zkutil::ZooKeeper> Context::getZooKeeper() const
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot connect to ZooKeeper from Keeper");
}
}

View File

@ -21,6 +21,11 @@
#include <memory>
#include "config.h"
namespace zkutil
{
class ZooKeeper;
using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
}
namespace DB
{
@ -153,6 +158,8 @@ public:
void initializeKeeperDispatcher(bool start_async) const;
void shutdownKeeperDispatcher() const;
void updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config);
zkutil::ZooKeeperPtr getZooKeeper() const;
};
}

View File

@ -133,6 +133,8 @@ enum class DefaultTableEngine
ReplacingMergeTree,
ReplicatedMergeTree,
ReplicatedReplacingMergeTree,
SharedMergeTree,
SharedReplacingMergeTree,
Memory,
};

View File

@ -440,6 +440,8 @@ template <typename T> inline bool isFloat(const T & data_type) { return WhichDat
template <typename T> inline bool isNativeNumber(const T & data_type) { return WhichDataType(data_type).isNativeNumber(); }
template <typename T> inline bool isNumber(const T & data_type) { return WhichDataType(data_type).isNumber(); }
template <typename T> inline bool isEnum8(const T & data_type) { return WhichDataType(data_type).isEnum8(); }
template <typename T> inline bool isEnum16(const T & data_type) { return WhichDataType(data_type).isEnum16(); }
template <typename T> inline bool isEnum(const T & data_type) { return WhichDataType(data_type).isEnum(); }
template <typename T> inline bool isDate(const T & data_type) { return WhichDataType(data_type).isDate(); }

View File

@ -175,7 +175,7 @@ static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnSt
offsets.push_back(offset);
if (unlikely(offset > data.size()))
data.resize(roundUpToPowerOfTwoOrZero(std::max(offset, data.size() * 2)));
data.resize_exact(roundUpToPowerOfTwoOrZero(std::max(offset, data.size() * 2)));
if (size)
{

View File

@ -159,6 +159,15 @@ static DataTypePtr convertPostgreSQLDataType(String & type, Fn<void()> auto && r
return res;
}
/// Check if PostgreSQL relation is empty.
/// postgres_table must be already quoted + schema-qualified.
template <typename T>
bool isTableEmpty(T & tx, const String & postgres_table)
{
auto query = fmt::format("SELECT NOT EXISTS (SELECT * FROM {} LIMIT 1);", postgres_table);
pqxx::result result{tx.exec(query)};
return result[0][0].as<bool>();
}
template<typename T>
PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList(
@ -219,12 +228,37 @@ PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList(
{
const auto & name_and_type = columns[i];
/// All rows must contain the same number of dimensions, so limit 1 is ok. If number of dimensions in all rows is not the same -
/// If the relation is empty, then array_ndims returns NULL.
/// ClickHouse cannot support this use case.
if (isTableEmpty(tx, postgres_table))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "PostgreSQL relation containing arrays cannot be empty: {}", postgres_table);
/// All rows must contain the same number of dimensions.
/// 1 is ok. If number of dimensions in all rows is not the same -
/// such arrays are not able to be used as ClickHouse Array at all.
pqxx::result result{tx.exec(fmt::format("SELECT array_ndims({}) FROM {} LIMIT 1", name_and_type.name, postgres_table))};
// array_ndims() may return null for empty array, but we expect 0:
// https://github.com/postgres/postgres/blob/d16a0c1e2e3874cd5adfa9ee968008b6c4b1ae01/src/backend/utils/adt/arrayfuncs.c#L1658
auto dimensions = result[0][0].as<std::optional<int>>().value_or(0);
///
/// For empty arrays, array_ndims([]) will return NULL.
auto postgres_column = doubleQuoteString(name_and_type.name);
pqxx::result result{tx.exec(
fmt::format("SELECT {} IS NULL, array_ndims({}) FROM {} LIMIT 1;", postgres_column, postgres_column, postgres_table))};
/// Nullable(Array) is not supported.
auto is_null_array = result[0][0].as<bool>();
if (is_null_array)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "PostgreSQL array cannot be NULL: {}.{}", postgres_table, postgres_column);
/// Cannot infer dimension of empty arrays.
auto is_empty_array = result[0][1].is_null();
if (is_empty_array)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"PostgreSQL cannot infer dimensions of an empty array: {}.{}",
postgres_table,
postgres_column);
}
int dimensions = result[0][1].as<int>();
/// It is always 1d array if it is in recheck.
DataTypePtr type = assert_cast<const DataTypeArray *>(name_and_type.type.get())->getNestedType();

View File

@ -69,12 +69,6 @@ DictionaryPtr DictionaryFactory::create(
layout_type);
}
DictionaryPtr DictionaryFactory::create(const std::string & name, const ASTCreateQuery & ast, ContextPtr global_context) const
{
auto configuration = getDictionaryConfigurationFromAST(ast, global_context);
return DictionaryFactory::create(name, *configuration, "dictionary", global_context, true);
}
bool DictionaryFactory::isComplex(const std::string & layout_type) const
{
auto it = registered_layouts.find(layout_type);

View File

@ -39,11 +39,6 @@ public:
ContextPtr global_context,
bool created_from_ddl) const;
/// Create dictionary from DDL-query
DictionaryPtr create(const std::string & name,
const ASTCreateQuery & ast,
ContextPtr global_context) const;
using LayoutCreateFunction = std::function<DictionaryPtr(
const std::string & name,
const DictionaryStructure & dict_struct,

View File

@ -6,9 +6,8 @@
#include <Common/filesystemHelpers.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Disks/DiskFactory.h>
#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Interpreters/Context.h>
namespace DB
{

View File

@ -10,6 +10,13 @@
namespace DB
{
/// It's a bug in clang with three-way comparison operator
/// https://github.com/llvm/llvm-project/issues/55919
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
/** Mark is the position in the compressed file. The compressed file consists of adjacent compressed blocks.
* Mark is a tuple - the offset in the file to the start of the compressed block, the offset in the decompressed block to the start of the data.
*/
@ -18,12 +25,7 @@ struct MarkInCompressedFile
size_t offset_in_compressed_file;
size_t offset_in_decompressed_block;
bool operator==(const MarkInCompressedFile & rhs) const
{
return std::tie(offset_in_compressed_file, offset_in_decompressed_block)
== std::tie(rhs.offset_in_compressed_file, rhs.offset_in_decompressed_block);
}
bool operator!=(const MarkInCompressedFile & rhs) const { return !(*this == rhs); }
auto operator<=>(const MarkInCompressedFile &) const = default;
auto asTuple() const { return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block); }
@ -39,6 +41,10 @@ struct MarkInCompressedFile
}
};
#ifdef __clang__
#pragma clang diagnostic pop
#endif
/**
* In-memory representation of an array of marks.
*

View File

@ -11,6 +11,8 @@
#include <Common/typeid_cast.h>
#include <Common/UTF8Helpers.h>
#include <DataTypes/EnumValues.h>
#include "IArraySource.h"
#include "IValueSource.h"
#include "Slices.h"
@ -56,8 +58,8 @@ struct NumericArraySource : public ArraySourceImpl<NumericArraySource<T>>
}
explicit NumericArraySource(const ColumnArray & arr)
: column(typeid_cast<const ColVecType &>(arr.getData()))
, elements(typeid_cast<const ColVecType &>(arr.getData()).getData()), offsets(arr.getOffsets())
: column(typeid_cast<const ColVecType &>(arr.getData()))
, elements(typeid_cast<const ColVecType &>(arr.getData()).getData()), offsets(arr.getOffsets())
{
}
@ -154,17 +156,22 @@ struct ConstSource : public Base
size_t row_num = 0;
explicit ConstSource(const ColumnConst & col_)
: Base(static_cast<const typename Base::Column &>(col_.getDataColumn())), total_rows(col_.size())
: Base(static_cast<const typename Base::Column &>(col_.getDataColumn()))
, total_rows(col_.size())
{
}
template <typename ColumnType>
ConstSource(const ColumnType & col_, size_t total_rows_) : Base(col_), total_rows(total_rows_)
ConstSource(const ColumnType & col_, size_t total_rows_)
: Base(col_)
, total_rows(total_rows_)
{
}
template <typename ColumnType>
ConstSource(const ColumnType & col_, const NullMap & null_map_, size_t total_rows_) : Base(col_, null_map_), total_rows(total_rows_)
ConstSource(const ColumnType & col_, const NullMap & null_map_, size_t total_rows_)
: Base(col_, null_map_)
, total_rows(total_rows_)
{
}
@ -240,7 +247,8 @@ struct StringSource
ColumnString::Offset prev_offset = 0;
explicit StringSource(const ColumnString & col)
: elements(col.getChars()), offsets(col.getOffsets())
: elements(col.getChars())
, offsets(col.getOffsets())
{
}
@ -313,6 +321,96 @@ struct StringSource
}
};
/// Treats Enum values as Strings, modeled after StringSource
template <typename EnumDataType>
struct EnumSource
{
using Column = typename EnumDataType::ColumnType;
using Slice = NumericArraySlice<UInt8>;
using SinkType = StringSink;
const typename Column::Container & data;
const EnumDataType & data_type;
size_t row_num = 0;
EnumSource(const Column & col, const EnumDataType & data_type_)
: data(col.getData())
, data_type(data_type_)
{
}
void next()
{
++row_num;
}
bool isEnd() const
{
return row_num == data.size();
}
size_t rowNum() const
{
return row_num;
}
size_t getSizeForReserve() const
{
return data.size();
}
size_t getElementSize() const
{
std::string_view name = data_type.getNameForValue(data[row_num]).toView();
return name.size();
}
size_t getColumnSize() const
{
return data.size();
}
Slice getWhole() const
{
std::string_view name = data_type.getNameForValue(data[row_num]).toView();
return {reinterpret_cast<const UInt8 *>(name.data()), name.size()};
}
Slice getSliceFromLeft(size_t offset) const
{
std::string_view name = data_type.getNameForValue(data[row_num]).toView();
if (offset >= name.size())
return {reinterpret_cast<const UInt8 *>(name.data()), 0};
return {reinterpret_cast<const UInt8 *>(name.data()) + offset, name.size() - offset};
}
Slice getSliceFromLeft(size_t offset, size_t length) const
{
std::string_view name = data_type.getNameForValue(data[row_num]).toView();
if (offset >= name.size())
return {reinterpret_cast<const UInt8 *>(name.data()), 0};
return {reinterpret_cast<const UInt8 *>(name.data()) + offset, std::min(length, name.size() - offset)};
}
Slice getSliceFromRight(size_t offset) const
{
std::string_view name = data_type.getNameForValue(data[row_num]).toView();
if (offset > name.size())
return {reinterpret_cast<const UInt8 *>(name.data()), name.size()};
return {reinterpret_cast<const UInt8 *>(name.data()) + name.size() - offset, offset};
}
Slice getSliceFromRight(size_t offset, size_t length) const
{
std::string_view name = data_type.getNameForValue(data[row_num]).toView();
if (offset > name.size())
return {reinterpret_cast<const UInt8 *>(name.data()), length + name.size() > offset ? std::min(name.size(), length + name.size() - offset) : 0};
return {reinterpret_cast<const UInt8 *>(name.data()) + name.size() - offset, std::min(length, offset)};
}
};
/// Differs to StringSource by having 'offset' and 'length' in code points instead of bytes in getSlice* methods.
/** NOTE: The behaviour of substring and substringUTF8 is inconsistent when negative offset is greater than string size:
@ -419,7 +517,7 @@ struct FixedStringSource
size_t column_size = 0;
explicit FixedStringSource(const ColumnFixedString & col)
: string_size(col.getN())
: string_size(col.getN())
{
const auto & chars = col.getChars();
pos = chars.data();
@ -553,7 +651,8 @@ struct GenericArraySource : public ArraySourceImpl<GenericArraySource>
}
explicit GenericArraySource(const ColumnArray & arr)
: elements(arr.getData()), offsets(arr.getOffsets())
: elements(arr.getData())
, offsets(arr.getOffsets())
{
}
@ -813,7 +912,10 @@ struct NullableValueSource : public ValueSource
const NullMap & null_map;
template <typename Column>
explicit NullableValueSource(const Column & col, const NullMap & null_map_) : ValueSource(col), null_map(null_map_) {}
NullableValueSource(const Column & col, const NullMap & null_map_)
: ValueSource(col)
, null_map(null_map_)
{}
void accept(ValueSourceVisitor & visitor) override { visitor.visit(*this); }

View File

@ -1,236 +0,0 @@
#include "FunctionArrayMapped.h"
#include <Functions/FunctionFactory.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_ARRAYS_DONT_MATCH;
extern const int TYPE_MISMATCH;
}
/**
* arrayFold(x1,...,xn,accum -> expression, array1,...,arrayn, accum_initial) - apply the expression to each element of the array (or set of arrays).
*/
class ArrayFold : public IFunction
{
public:
static constexpr auto name = "arrayFold";
static FunctionPtr create(ContextPtr) { return std::make_shared<ArrayFold>(); }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
void getLambdaArgumentTypes(DataTypes & arguments) const override
{
if (arguments.size() < 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires as arguments a lambda function, at least one array and an accumulator", getName());
DataTypes accumulator_and_array_types(arguments.size() - 1);
accumulator_and_array_types[0] = arguments.back();
for (size_t i = 1; i < accumulator_and_array_types.size(); ++i)
{
const auto * array_type = checkAndGetDataType<DataTypeArray>(&*arguments[i]);
if (!array_type)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Argument {} of function {} must be of type Array, found {} instead", i + 1, getName(), arguments[i]->getName());
accumulator_and_array_types[i] = recursiveRemoveLowCardinality(array_type->getNestedType());
}
const auto * lambda_function_type = checkAndGetDataType<DataTypeFunction>(arguments[0].get());
if (!lambda_function_type || lambda_function_type->getArgumentTypes().size() != accumulator_and_array_types.size())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "First argument of function {} must be a lambda function with {} arguments, found {} instead.",
getName(), accumulator_and_array_types.size(), arguments[0]->getName());
arguments[0] = std::make_shared<DataTypeFunction>(accumulator_and_array_types);
}
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() < 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires as arguments a lambda function, at least one array and an accumulator", getName());
const auto * lambda_function_type = checkAndGetDataType<DataTypeFunction>(arguments[0].type.get());
if (!lambda_function_type)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "First argument for function {} must be a function", getName());
auto accumulator_type = arguments.back().type;
auto lambda_type = lambda_function_type->getReturnType();
if (!accumulator_type->equals(*lambda_type))
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Return type of lambda function must be the same as the accumulator type, inferred return type of lambda: {}, inferred type of accumulator: {}",
lambda_type->getName(), accumulator_type->getName());
return accumulator_type;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto & lambda_function_with_type_and_name = arguments[0];
if (!lambda_function_with_type_and_name.column)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "First argument for function {} must be a function", getName());
const auto * lambda_function = typeid_cast<const ColumnFunction *>(lambda_function_with_type_and_name.column.get());
if (!lambda_function)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "First argument for function {} must be a function", getName());
ColumnPtr offsets_column;
ColumnPtr column_first_array_ptr;
const ColumnArray * column_first_array = nullptr;
ColumnsWithTypeAndName arrays;
arrays.reserve(arguments.size() - 1);
/// Validate input types and get input array columns in convenient form
for (size_t i = 1; i < arguments.size() - 1; ++i)
{
const auto & array_with_type_and_name = arguments[i];
ColumnPtr column_array_ptr = array_with_type_and_name.column;
const auto * column_array = checkAndGetColumn<ColumnArray>(column_array_ptr.get());
if (!column_array)
{
const ColumnConst * column_const_array = checkAndGetColumnConst<ColumnArray>(column_array_ptr.get());
if (!column_const_array)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Expected array column, found {}", column_array_ptr->getName());
column_array_ptr = recursiveRemoveLowCardinality(column_const_array->convertToFullColumn());
column_array = checkAndGetColumn<ColumnArray>(column_array_ptr.get());
}
const DataTypePtr & array_type_ptr = array_with_type_and_name.type;
const auto * array_type = checkAndGetDataType<DataTypeArray>(array_type_ptr.get());
if (!array_type)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Expected array type, found {}", array_type_ptr->getName());
if (!offsets_column)
offsets_column = column_array->getOffsetsPtr();
else
{
/// The first condition is optimization: do not compare data if the pointers are equal.
if (column_array->getOffsetsPtr() != offsets_column
&& column_array->getOffsets() != typeid_cast<const ColumnArray::ColumnOffsets &>(*offsets_column).getData())
throw Exception(ErrorCodes::SIZES_OF_ARRAYS_DONT_MATCH, "Arrays passed to {} must have equal size", getName());
}
if (i == 1)
{
column_first_array_ptr = column_array_ptr;
column_first_array = column_array;
}
arrays.emplace_back(ColumnWithTypeAndName(column_array->getDataPtr(),
recursiveRemoveLowCardinality(array_type->getNestedType()),
array_with_type_and_name.name));
}
ssize_t rows_count = input_rows_count;
ssize_t data_row_count = arrays[0].column->size();
size_t array_count = arrays.size();
if (rows_count == 0)
return arguments.back().column->convertToFullColumnIfConst()->cloneEmpty();
ColumnPtr current_column = arguments.back().column->convertToFullColumnIfConst();
MutableColumnPtr result_data = arguments.back().column->convertToFullColumnIfConst()->cloneEmpty();
size_t max_array_size = 0;
const auto & offsets = column_first_array->getOffsets();
IColumn::Selector selector(data_row_count);
size_t cur_ind = 0;
ssize_t cur_arr = 0;
/// skip to the first non empty array
if (data_row_count)
while (offsets[cur_arr] == 0)
++cur_arr;
/// selector[i] is an index that i_th data element has in an array it corresponds to
for (ssize_t i = 0; i < data_row_count; ++i)
{
selector[i] = cur_ind;
cur_ind++;
if (cur_ind > max_array_size)
max_array_size = cur_ind;
while (cur_arr < rows_count && cur_ind >= offsets[cur_arr] - offsets[cur_arr - 1])
{
++cur_arr;
cur_ind = 0;
}
}
std::vector<MutableColumns> data_arrays;
data_arrays.resize(array_count);
/// Split each data column to columns containing elements of only Nth index in array
if (max_array_size > 0)
for (size_t i = 0; i < array_count; ++i)
data_arrays[i] = arrays[i].column->scatter(max_array_size, selector);
size_t prev_size = rows_count;
IColumn::Permutation inverse_permutation(rows_count);
size_t inverse_permutation_count = 0;
/// current_column after each iteration contains value of accumulator after applying values under indexes of arrays.
/// At each iteration only rows of current_column with arrays that still has unapplied elements are kept.
/// Discarded rows which contain finished calculations are added to result_data column and as we insert them we save their original row_number in inverse_permutation vector
for (size_t ind = 0; ind < max_array_size; ++ind)
{
IColumn::Selector prev_selector(prev_size);
size_t prev_ind = 0;
for (ssize_t irow = 0; irow < rows_count; ++irow)
{
if (offsets[irow] - offsets[irow - 1] > ind)
prev_selector[prev_ind++] = 1;
else if (offsets[irow] - offsets[irow - 1] == ind)
{
inverse_permutation[inverse_permutation_count++] = irow;
prev_selector[prev_ind++] = 0;
}
}
auto prev = current_column->scatter(2, prev_selector);
result_data->insertRangeFrom(*(prev[0]), 0, prev[0]->size());
auto res_lambda = lambda_function->cloneResized(prev[1]->size());
auto * res_lambda_ptr = typeid_cast<ColumnFunction *>(res_lambda.get());
res_lambda_ptr->appendArguments(std::vector({ColumnWithTypeAndName(std::move(prev[1]), arguments.back().type, arguments.back().name)}));
for (size_t i = 0; i < array_count; i++)
res_lambda_ptr->appendArguments(std::vector({ColumnWithTypeAndName(std::move(data_arrays[i][ind]), arrays[i].type, arrays[i].name)}));
current_column = IColumn::mutate(res_lambda_ptr->reduce().column);
prev_size = current_column->size();
}
result_data->insertRangeFrom(*current_column, 0, current_column->size());
for (ssize_t irow = 0; irow < rows_count; ++irow)
if (offsets[irow] - offsets[irow - 1] == max_array_size)
inverse_permutation[inverse_permutation_count++] = irow;
/// We have result_data containing result for every row and inverse_permutation which contains indexes of rows in input it corresponds to.
/// Now we need to invert inverse_permuation and apply it to result_data to get rows in right order.
IColumn::Permutation perm(rows_count);
for (ssize_t i = 0; i < rows_count; i++)
perm[inverse_permutation[i]] = i;
return result_data->permute(perm, 0);
}
private:
String getName() const override
{
return name;
}
};
REGISTER_FUNCTION(ArrayFold)
{
factory.registerFunction<ArrayFold>(FunctionDocumentation{.description=R"(
Function arrayFold(x1,...,xn,accum -> expression, array1,...,arrayn, accum_initial) applies lambda function to a number of equally-sized arrays
and collects the result in an accumulator.
)", .examples{{"sum", "SELECT arrayFold(x,acc -> acc+x, [1,2,3,4], toInt64(1));", "11"}}, .categories{"Array"}});
}
}

View File

@ -1,15 +1,16 @@
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/GatherUtils/Algorithms.h>
#include <Functions/GatherUtils/GatherUtils.h>
#include <Functions/GatherUtils/Sources.h>
#include <Functions/GatherUtils/Sinks.h>
#include <Functions/GatherUtils/Slices.h>
#include <Functions/GatherUtils/Algorithms.h>
#include <Functions/GatherUtils/Sources.h>
#include <Functions/IFunction.h>
#include <IO/WriteHelpers.h>
@ -20,50 +21,50 @@ using namespace GatherUtils;
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ZERO_ARRAY_OR_TUPLE_INDEX;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ZERO_ARRAY_OR_TUPLE_INDEX;
}
namespace
{
/// If 'is_utf8' - measure offset and length in code points instead of bytes.
/// UTF8 variant is not available for FixedString arguments.
template <bool is_utf8>
class FunctionSubstring : public IFunction
{
public:
static constexpr auto name = is_utf8 ? "substringUTF8" : "substring";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionSubstring>();
}
String getName() const override
{
return name;
}
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionSubstring>(); }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
size_t number_of_arguments = arguments.size();
const size_t number_of_arguments = arguments.size();
if (number_of_arguments < 2 || number_of_arguments > 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Number of arguments for function {} doesn't match: "
"passed {}, should be 2 or 3", getName(), number_of_arguments);
if ((is_utf8 && !isString(arguments[0])) || !isStringOrFixedString(arguments[0]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}",
arguments[0]->getName(), getName());
if constexpr (is_utf8)
{
/// UTF8 variant is not available for FixedString and Enum arguments.
if (!isString(arguments[0]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of first argument of function {}",
arguments[0]->getName(), getName());
}
else
{
if (!isStringOrFixedString(arguments[0]) && !isEnum(arguments[0]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of first argument of function {}",
arguments[0]->getName(), getName());
}
if (!isNativeNumber(arguments[1]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of second argument of function {}",
@ -77,44 +78,40 @@ public:
}
template <typename Source>
ColumnPtr executeForSource(const ColumnPtr & column_start, const ColumnPtr & column_length,
const ColumnConst * column_start_const, const ColumnConst * column_length_const,
Int64 start_value, Int64 length_value, Source && source,
size_t input_rows_count) const
ColumnPtr executeForSource(const ColumnPtr & column_offset, const ColumnPtr & column_length,
bool column_offset_const, bool column_length_const,
Int64 offset, Int64 length,
Source && source, size_t input_rows_count) const
{
auto col_res = ColumnString::create();
if (!column_length)
{
if (column_start_const)
if (column_offset_const)
{
if (start_value > 0)
sliceFromLeftConstantOffsetUnbounded(
source, StringSink(*col_res, input_rows_count), static_cast<size_t>(start_value - 1));
else if (start_value < 0)
sliceFromRightConstantOffsetUnbounded(
source, StringSink(*col_res, input_rows_count), -static_cast<size_t>(start_value));
if (offset > 0)
sliceFromLeftConstantOffsetUnbounded(source, StringSink(*col_res, input_rows_count), static_cast<size_t>(offset - 1));
else if (offset < 0)
sliceFromRightConstantOffsetUnbounded(source, StringSink(*col_res, input_rows_count), -static_cast<size_t>(offset));
else
throw Exception(ErrorCodes::ZERO_ARRAY_OR_TUPLE_INDEX, "Indices in strings are 1-based");
}
else
sliceDynamicOffsetUnbounded(source, StringSink(*col_res, input_rows_count), *column_start);
sliceDynamicOffsetUnbounded(source, StringSink(*col_res, input_rows_count), *column_offset);
}
else
{
if (column_start_const && column_length_const)
if (column_offset_const && column_length_const)
{
if (start_value > 0)
sliceFromLeftConstantOffsetBounded(
source, StringSink(*col_res, input_rows_count), static_cast<size_t>(start_value - 1), length_value);
else if (start_value < 0)
sliceFromRightConstantOffsetBounded(
source, StringSink(*col_res, input_rows_count), -static_cast<size_t>(start_value), length_value);
if (offset > 0)
sliceFromLeftConstantOffsetBounded(source, StringSink(*col_res, input_rows_count), static_cast<size_t>(offset - 1), length);
else if (offset < 0)
sliceFromRightConstantOffsetBounded(source, StringSink(*col_res, input_rows_count), -static_cast<size_t>(offset), length);
else
throw Exception(ErrorCodes::ZERO_ARRAY_OR_TUPLE_INDEX, "Indices in strings are 1-based");
}
else
sliceDynamicOffsetBounded(source, StringSink(*col_res, input_rows_count), *column_start, *column_length);
sliceDynamicOffsetBounded(source, StringSink(*col_res, input_rows_count), *column_offset, *column_length);
}
return col_res;
@ -122,58 +119,60 @@ public:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
size_t number_of_arguments = arguments.size();
const size_t number_of_arguments = arguments.size();
ColumnPtr column_string = arguments[0].column;
ColumnPtr column_start = arguments[1].column;
ColumnPtr column_offset = arguments[1].column;
ColumnPtr column_length;
if (number_of_arguments == 3)
column_length = arguments[2].column;
const ColumnConst * column_start_const = checkAndGetColumn<ColumnConst>(column_start.get());
const ColumnConst * column_offset_const = checkAndGetColumn<ColumnConst>(column_offset.get());
const ColumnConst * column_length_const = nullptr;
if (number_of_arguments == 3)
column_length_const = checkAndGetColumn<ColumnConst>(column_length.get());
Int64 start_value = 0;
Int64 length_value = 0;
Int64 offset = 0;
Int64 length = 0;
if (column_start_const)
start_value = column_start_const->getInt(0);
if (column_offset_const)
offset = column_offset_const->getInt(0);
if (column_length_const)
length_value = column_length_const->getInt(0);
length = column_length_const->getInt(0);
if constexpr (is_utf8)
{
if (const ColumnString * col = checkAndGetColumn<ColumnString>(column_string.get()))
return executeForSource(column_start, column_length, column_start_const, column_length_const, start_value,
length_value, UTF8StringSource(*col), input_rows_count);
else if (const ColumnConst * col_const = checkAndGetColumnConst<ColumnString>(column_string.get()))
return executeForSource(column_start, column_length, column_start_const, column_length_const, start_value,
length_value, ConstSource<UTF8StringSource>(*col_const), input_rows_count);
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}",
arguments[0].column->getName(), getName());
return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, UTF8StringSource(*col), input_rows_count);
if (const ColumnConst * col_const = checkAndGetColumnConst<ColumnString>(column_string.get()))
return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, ConstSource<UTF8StringSource>(*col_const), input_rows_count);
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}", arguments[0].column->getName(), getName());
}
else
{
if (const ColumnString * col = checkAndGetColumn<ColumnString>(column_string.get()))
return executeForSource(column_start, column_length, column_start_const, column_length_const, start_value,
length_value, StringSource(*col), input_rows_count);
else if (const ColumnFixedString * col_fixed = checkAndGetColumn<ColumnFixedString>(column_string.get()))
return executeForSource(column_start, column_length, column_start_const, column_length_const, start_value,
length_value, FixedStringSource(*col_fixed), input_rows_count);
else if (const ColumnConst * col_const = checkAndGetColumnConst<ColumnString>(column_string.get()))
return executeForSource(column_start, column_length, column_start_const, column_length_const, start_value,
length_value, ConstSource<StringSource>(*col_const), input_rows_count);
else if (const ColumnConst * col_const_fixed = checkAndGetColumnConst<ColumnFixedString>(column_string.get()))
return executeForSource(column_start, column_length, column_start_const, column_length_const, start_value,
length_value, ConstSource<FixedStringSource>(*col_const_fixed), input_rows_count);
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}",
arguments[0].column->getName(), getName());
return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, StringSource(*col), input_rows_count);
if (const ColumnFixedString * col_fixed = checkAndGetColumn<ColumnFixedString>(column_string.get()))
return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, FixedStringSource(*col_fixed), input_rows_count);
if (const ColumnConst * col_const = checkAndGetColumnConst<ColumnString>(column_string.get()))
return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, ConstSource<StringSource>(*col_const), input_rows_count);
if (const ColumnConst * col_const_fixed = checkAndGetColumnConst<ColumnFixedString>(column_string.get()))
return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, ConstSource<FixedStringSource>(*col_const_fixed), input_rows_count);
if (isEnum(arguments[0].type))
{
if (const typename DataTypeEnum8::ColumnType * col_enum8 = checkAndGetColumn<typename DataTypeEnum8::ColumnType>(column_string.get()))
{
const auto * type_enum8 = assert_cast<const DataTypeEnum8 *>(arguments[0].type.get());
return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, EnumSource<DataTypeEnum8>(*col_enum8, *type_enum8), input_rows_count);
}
if (const typename DataTypeEnum16::ColumnType * col_enum16 = checkAndGetColumn<typename DataTypeEnum16::ColumnType>(column_string.get()))
{
const auto * type_enum16 = assert_cast<const DataTypeEnum16 *>(arguments[0].type.get());
return executeForSource(column_offset, column_length, column_offset_const, column_length_const, offset, length, EnumSource<DataTypeEnum16>(*col_enum16, *type_enum16), input_rows_count);
}
}
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}", arguments[0].column->getName(), getName());
}
}
};
@ -183,8 +182,8 @@ public:
REGISTER_FUNCTION(Substring)
{
factory.registerFunction<FunctionSubstring<false>>({}, FunctionFactory::CaseInsensitive);
factory.registerAlias("substr", "substring", FunctionFactory::CaseInsensitive);
factory.registerAlias("mid", "substring", FunctionFactory::CaseInsensitive); /// from MySQL dialect
factory.registerAlias("substr", "substring", FunctionFactory::CaseInsensitive); // MySQL alias
factory.registerAlias("mid", "substring", FunctionFactory::CaseInsensitive); /// MySQL alias
factory.registerFunction<FunctionSubstring<true>>({}, FunctionFactory::CaseSensitive);
}

View File

@ -515,7 +515,9 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
// We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below
constexpr ResourceCost estimated_cost = 1;
ResourceGuard rlock(read_settings.resource_link, estimated_cost);
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
rlock.unlock();
if (outcome.IsSuccess())

View File

@ -13,9 +13,9 @@
namespace DB::S3
{
std::shared_ptr<Aws::Http::HttpClient>
PocoHTTPClientFactory::CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const
PocoHTTPClientFactory::CreateHttpClient(const Aws::Client::ClientConfiguration & client_configuration) const
{
return std::make_shared<PocoHTTPClient>(static_cast<const PocoHTTPClientConfiguration &>(clientConfiguration));
return std::make_shared<PocoHTTPClient>(static_cast<const PocoHTTPClientConfiguration &>(client_configuration));
}
std::shared_ptr<Aws::Http::HttpRequest> PocoHTTPClientFactory::CreateHttpRequest(

View File

@ -15,7 +15,7 @@ class PocoHTTPClientFactory : public Aws::Http::HttpClientFactory
public:
~PocoHTTPClientFactory() override = default;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpClient>
CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const override;
CreateHttpClient(const Aws::Client::ClientConfiguration & client_configuration) const override;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest>
CreateHttpRequest(const Aws::String & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest>

View File

@ -655,6 +655,7 @@ namespace
void performCopy()
{
LOG_TEST(log, "Copy object {} to {} using native copy", src_key, dest_key);
if (!supports_multipart_copy || size <= upload_settings.max_single_operation_copy_size)
performSingleOperationCopy();
else

View File

@ -16,6 +16,7 @@
#include <Common/Throttler_fwd.h>
#include <IO/S3/URI.h>
#include <IO/S3/Credentials.h>
#include <aws/core/Aws.h>
#include <aws/s3/S3Errors.h>

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
{
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int LOGICAL_ERROR;
extern const int CLUSTER_DOESNT_EXIST;
}
namespace ClusterProxy
@ -322,11 +323,44 @@ void executeQueryWithParallelReplicas(
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast,
ContextPtr context,
std::shared_ptr<const StorageLimitsList> storage_limits,
const ClusterPtr & not_optimized_cluster)
std::shared_ptr<const StorageLimitsList> storage_limits)
{
const auto & settings = context->getSettingsRef();
/// check cluster for parallel replicas
if (settings.cluster_for_parallel_replicas.value.empty())
{
throw Exception(
ErrorCodes::CLUSTER_DOESNT_EXIST,
"Reading in parallel from replicas is enabled but cluster to execute query is not provided. Please set "
"'cluster_for_parallel_replicas' setting");
}
auto not_optimized_cluster = context->getCluster(settings.cluster_for_parallel_replicas);
auto new_context = Context::createCopy(context);
/// check hedged connections setting
if (settings.use_hedged_requests.value)
{
if (settings.use_hedged_requests.changed)
{
LOG_WARNING(
&Poco::Logger::get("executeQueryWithParallelReplicas"),
"Setting 'use_hedged_requests' explicitly with enabled 'allow_experimental_parallel_reading_from_replicas' has no effect. "
"Hedged connections are not used for parallel reading from replicas");
}
else
{
LOG_INFO(
&Poco::Logger::get("executeQueryWithParallelReplicas"),
"Disabling 'use_hedged_requests' in favor of 'allow_experimental_parallel_reading_from_replicas'. Hedged connections are "
"not used for parallel reading from replicas");
}
/// disable hedged connections -> parallel replicas uses own logic to choose replicas
new_context->setSetting("use_hedged_requests", Field{false});
}
auto scalars = new_context->hasQueryContext() ? new_context->getQueryContext()->getScalars() : Scalars{};
UInt64 shard_num = 0; /// shard_num is 1-based, so 0 - no shard specified

View File

@ -71,8 +71,7 @@ void executeQueryWithParallelReplicas(
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast,
ContextPtr context,
std::shared_ptr<const StorageLimitsList> storage_limits,
const ClusterPtr & not_optimized_cluster);
std::shared_ptr<const StorageLimitsList> storage_limits);
}
}

View File

@ -5020,7 +5020,7 @@ Context::ParallelReplicasMode Context::getParallelReplicasMode() const
if (!settings_ref.parallel_replicas_custom_key.value.empty())
return CUSTOM_KEY;
if (settings_ref.allow_experimental_parallel_reading_from_replicas > 0 && !settings_ref.use_hedged_requests)
if (settings_ref.allow_experimental_parallel_reading_from_replicas > 0)
return READ_TASKS;
return SAMPLE_KEY;

View File

@ -45,6 +45,7 @@
#include <Access/Common/AllowedClientHosts.h>
#include <Databases/IDatabase.h>
#include <Databases/DatabaseReplicated.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/Freeze.h>
@ -92,6 +93,7 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int ABORTED;
extern const int SUPPORT_IS_DISABLED;
}
@ -442,6 +444,10 @@ BlockIO InterpreterSystemQuery::execute()
result.pipeline = QueryPipeline(std::move(source));
break;
}
case Type::DROP_DISK_METADATA_CACHE:
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
}
case Type::DROP_SCHEMA_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_SCHEMA_CACHE);
@ -611,6 +617,10 @@ BlockIO InterpreterSystemQuery::execute()
case Type::SYNC_DATABASE_REPLICA:
syncReplicatedDatabase(query);
break;
case Type::REPLICA_UNREADY:
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
case Type::REPLICA_READY:
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
case Type::SYNC_TRANSACTION_LOG:
syncTransactionLog();
break;
@ -1119,6 +1129,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
break;
}
case Type::DROP_DISK_METADATA_CACHE:
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
case Type::RELOAD_DICTIONARY:
case Type::RELOAD_DICTIONARIES:
case Type::RELOAD_EMBEDDED_DICTIONARIES:
@ -1245,6 +1257,9 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_SYNC_REPLICA, query.getDatabase(), query.getTable());
break;
}
case Type::REPLICA_READY:
case Type::REPLICA_UNREADY:
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
case Type::RESTART_REPLICA:
{
required_access.emplace_back(AccessType::SYSTEM_RESTART_REPLICA, query.getDatabase(), query.getTable());

View File

@ -57,6 +57,7 @@ private:
void restartReplica(const StorageID & replica, ContextMutablePtr system_context);
void restartReplicas(ContextMutablePtr system_context);
void syncReplica(ASTSystemQuery & query);
void setReplicaReadiness(bool ready);
void waitLoadingParts();
void syncReplicatedDatabase(ASTSystemQuery & query);

View File

@ -244,6 +244,31 @@ private:
}
};
#ifdef PRINT_ASSEMBLY
class AssemblyPrinter
{
public:
explicit AssemblyPrinter(llvm::TargetMachine &target_machine_)
: target_machine(target_machine_)
{
}
void print(llvm::Module & module)
{
llvm::legacy::PassManager pass_manager;
target_machine.Options.MCOptions.AsmVerbose = true;
if (target_machine.addPassesToEmitFile(pass_manager, llvm::errs(), nullptr, llvm::CodeGenFileType::CGFT_AssemblyFile))
throw Exception(ErrorCodes::CANNOT_COMPILE_CODE, "MachineCode cannot be printed");
pass_manager.run(module);
}
private:
llvm::TargetMachine & target_machine;
};
#endif
/** MemoryManager for module.
* Keep total allocated size during RuntimeDyld linker execution.
*/
@ -375,6 +400,11 @@ CHJIT::CompiledModule CHJIT::compileModule(std::unique_ptr<llvm::Module> module)
{
runOptimizationPassesOnModule(*module);
#ifdef PRINT_ASSEMBLY
AssemblyPrinter assembly_printer(*machine);
assembly_printer.print(*module);
#endif
auto buffer = compiler->compile(*module);
llvm::Expected<std::unique_ptr<llvm::object::ObjectFile>> object = llvm::object::ObjectFile::createObjectFile(*buffer);

View File

@ -27,29 +27,14 @@ bool replaceForPositionalArguments(ASTPtr & argument, const ASTSelectQuery * sel
return false;
auto which = ast_literal->value.getType();
if (which != Field::Types::UInt64 && which != Field::Types::Int64)
if (which != Field::Types::UInt64)
return false;
UInt64 pos;
if (which == Field::Types::UInt64)
{
pos = ast_literal->value.get<UInt64>();
}
else if (which == Field::Types::Int64)
{
auto value = ast_literal->value.get<Int64>();
pos = value > 0 ? value : columns.size() + value + 1;
}
else
{
return false;
}
auto pos = ast_literal->value.get<UInt64>();
if (!pos || pos > columns.size())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Positional argument out of bounds: {} (expected in range [1, {}]", pos, columns.size());
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Positional argument out of bounds: {} (expected in range [1, {}]",
pos, columns.size());
const auto & column = columns[--pos];
if (typeid_cast<const ASTIdentifier *>(column.get()) || typeid_cast<const ASTLiteral *>(column.get()))

View File

@ -179,7 +179,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|| type == Type::RELOAD_DICTIONARY
|| type == Type::RELOAD_MODEL
|| type == Type::RELOAD_FUNCTION
|| type == Type::RESTART_DISK)
|| type == Type::RESTART_DISK
|| type == Type::DROP_DISK_METADATA_CACHE)
{
if (table)
{

View File

@ -32,6 +32,7 @@ public:
DROP_COMPILED_EXPRESSION_CACHE,
#endif
DROP_FILESYSTEM_CACHE,
DROP_DISK_METADATA_CACHE,
DROP_SCHEMA_CACHE,
DROP_FORMAT_SCHEMA_CACHE,
#if USE_AWS_S3
@ -49,6 +50,8 @@ public:
SYNC_DATABASE_REPLICA,
SYNC_TRANSACTION_LOG,
SYNC_FILE_CACHE,
REPLICA_READY,
REPLICA_UNREADY,
RELOAD_DICTIONARY,
RELOAD_DICTIONARIES,
RELOAD_MODEL,

View File

@ -12,6 +12,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}
[[nodiscard]] static bool parseQueryWithOnClusterAndMaybeTable(std::shared_ptr<ASTSystemQuery> & res, IParser::Pos & pos,
Expected & expected, bool require_table, bool allow_string_literal)
{
@ -427,6 +432,10 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
return false;
break;
}
case Type::DROP_DISK_METADATA_CACHE:
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
}
case Type::DROP_SCHEMA_CACHE:
{
if (ParserKeyword{"FOR"}.ignore(pos, expected))

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/SipHash.h>
#include <Common/OptimizedRegularExpression.h>
#include <AggregateFunctions/IAggregateFunction.h>
@ -122,6 +123,17 @@ struct Pattern
AggregateFunctionPtr function;
Retentions retentions; /// Must be ordered by 'age' descending.
enum { TypeUndef, TypeRetention, TypeAggregation, TypeAll } type = TypeAll; /// The type of defined pattern, filled automatically
void updateHash(SipHash & hash) const
{
hash.update(rule_type);
hash.update(regexp_str);
hash.update(function->getName());
for (const auto & r : retentions)
{
hash.update(r.age);
hash.update(r.precision);
}
}
};
bool operator==(const Pattern & a, const Pattern & b);
@ -142,6 +154,21 @@ struct Params
Graphite::Patterns patterns;
Graphite::Patterns patterns_plain;
Graphite::Patterns patterns_tagged;
void updateHash(SipHash & hash) const
{
hash.update(path_column_name);
hash.update(time_column_name);
hash.update(value_column_name);
hash.update(value_column_name);
hash.update(version_column_name);
hash.update(patterns_typed);
for (const auto & p : patterns)
p.updateHash(hash);
for (const auto & p : patterns_plain)
p.updateHash(hash);
for (const auto & p : patterns_tagged)
p.updateHash(hash);
}
};
using RollupRule = std::pair<const RetentionPattern *, const AggregationPattern *>;

View File

@ -741,8 +741,11 @@ void AggregatingTransform::initGenerate()
auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
Pipes pipes;
for (auto & variant : prepared_data)
{
/// Converts hash tables to blocks with data (finalized or not).
pipes.emplace_back(std::make_shared<ConvertingAggregatedToChunksSource>(params, variant));
}
Pipe pipe = Pipe::unitePipes(std::move(pipes));
if (!pipe.empty())
{
@ -796,21 +799,23 @@ void AggregatingTransform::initGenerate()
}
}
const auto & tmp_data = params->aggregator.getTemporaryData();
size_t num_streams = 0;
size_t compressed_size = 0;
size_t uncompressed_size = 0;
Pipe pipe;
Pipes pipes;
/// Merge external data from all aggregators used in query.
for (const auto & aggregator : *params->aggregator_list_ptr)
{
Pipes pipes;
const auto & tmp_data = aggregator.getTemporaryData();
for (auto * tmp_stream : tmp_data.getStreams())
pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(tmp_stream)));
pipe = Pipe::unitePipes(std::move(pipes));
num_streams += tmp_data.getStreams().size();
compressed_size += tmp_data.getStat().compressed_size;
uncompressed_size += tmp_data.getStat().uncompressed_size;
}
size_t num_streams = tmp_data.getStreams().size();
size_t compressed_size = tmp_data.getStat().compressed_size;
size_t uncompressed_size = tmp_data.getStat().uncompressed_size;
LOG_DEBUG(
log,
"Will merge {} temporary files of size {} compressed, {} uncompressed.",
@ -818,6 +823,7 @@ void AggregatingTransform::initGenerate()
ReadableSize(compressed_size),
ReadableSize(uncompressed_size));
auto pipe = Pipe::unitePipes(std::move(pipes));
addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads);
processors = Pipe::detachProcessors(std::move(pipe));

View File

@ -1061,8 +1061,13 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
response.setChunkedTransferEncoding(true);
HTMLForm params(default_settings, request);
with_stacktrace = params.getParsed<bool>("stacktrace", false);
close_session = params.getParsed<bool>("close_session", false);
if (params.getParsed<bool>("stacktrace", false) && server.config().getBool("enable_http_stacktrace", true))
with_stacktrace = true;
if (params.getParsed<bool>("close_session", false) && server.config().getBool("enable_http_close_session", true))
close_session = true;
if (close_session)
session_id = params.get("session_id");

View File

@ -0,0 +1,94 @@
#include <Server/KeeperReadinessHandler.h>
#if USE_NURAFT
#include <memory>
#include <IO/HTTPCommon.h>
#include <Coordination/KeeperDispatcher.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/HTTPHandlerRequestFilter.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
namespace DB
{
void KeeperReadinessHandler::handleRequest(HTTPServerRequest & /*request*/, HTTPServerResponse & response)
{
try
{
auto is_leader = keeper_dispatcher->isLeader();
auto is_follower = keeper_dispatcher->isFollower() && keeper_dispatcher->hasLeader();
auto is_observer = keeper_dispatcher->isObserver() && keeper_dispatcher->hasLeader();
auto data = keeper_dispatcher->getKeeper4LWInfo();
auto status = is_leader || is_follower || is_observer;
Poco::JSON::Object json, details;
details.set("role", data.getRole());
details.set("hasLeader", keeper_dispatcher->hasLeader());
json.set("details", details);
json.set("status", status ? "ok" : "fail");
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
if (!status)
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
*response.send() << oss.str();
}
catch (...)
{
tryLogCurrentException("KeeperReadinessHandler");
try
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
{
/// We have not sent anything yet and we don't even know if we need to compress response.
*response.send() << getCurrentExceptionMessage(false) << std::endl;
}
}
catch (...)
{
LOG_ERROR((&Poco::Logger::get("KeeperReadinessHandler")), "Cannot send exception to client");
}
}
}
HTTPRequestHandlerFactoryPtr createKeeperHTTPControlMainHandlerFactory(
const Poco::Util::AbstractConfiguration & config,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher,
const std::string & name)
{
auto factory = std::make_shared<HTTPRequestHandlerFactoryMain>(name);
using Factory = HandlingRuleHTTPHandlerFactory<KeeperReadinessHandler>;
Factory::Creator creator = [keeper_dispatcher]() -> std::unique_ptr<KeeperReadinessHandler>
{
return std::make_unique<KeeperReadinessHandler>(keeper_dispatcher);
};
auto readiness_handler = std::make_shared<Factory>(std::move(creator));
readiness_handler->attachStrictPath(config.getString("keeper_server.http_control.readiness.endpoint", "/ready"));
readiness_handler->allowGetAndHeadRequest();
factory->addHandler(readiness_handler);
return factory;
}
}
#endif

View File

@ -0,0 +1,36 @@
#pragma once
#include <config.h>
#if USE_NURAFT
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Coordination/KeeperDispatcher.h>
namespace DB
{
class KeeperReadinessHandler : public HTTPRequestHandler, WithContext
{
private:
std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
public:
explicit KeeperReadinessHandler(std::shared_ptr<KeeperDispatcher> keeper_dispatcher_)
: keeper_dispatcher(keeper_dispatcher_)
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
};
HTTPRequestHandlerFactoryPtr
createKeeperHTTPControlMainHandlerFactory(
const Poco::Util::AbstractConfiguration & config,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher,
const std::string & name);
}
#endif

View File

@ -28,11 +28,17 @@ void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServe
{
HTMLForm params(getContext()->getSettingsRef(), request);
/// Even if lag is small, output detailed information about the lag.
bool verbose = params.get("verbose", "") == "1";
const auto & config = getContext()->getConfigRef();
const MergeTreeSettings & settings = getContext()->getReplicatedMergeTreeSettings();
/// Even if lag is small, output detailed information about the lag.
bool verbose = false;
bool enable_verbose = config.getBool("enable_verbose_replicas_status", true);
if (params.get("verbose", "") == "1" && enable_verbose)
verbose = true;
bool ok = true;
WriteBufferFromOwnString message;
@ -78,13 +84,13 @@ void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServe
}
}
const auto & config = getContext()->getConfigRef();
setResponseDefaultHeaders(response, config.getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT));
if (!ok)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
verbose = true;
if (enable_verbose)
verbose = true;
}
if (verbose)

View File

@ -144,6 +144,9 @@ bool ServerType::shouldStop(const std::string & port_name) const
port_custom_name = port_name.substr(protocols_size, port_name.size() - protocols_size - ports_size + 1);
}
else if (port_name == "cloud.port")
port_type = Type::CLOUD;
else
return false;

View File

@ -26,6 +26,7 @@ public:
QUERIES_ALL,
QUERIES_DEFAULT,
QUERIES_CUSTOM,
CLOUD,
END
};

View File

@ -661,10 +661,19 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config,
if (kafka_consumer_weak_ptr_ptr)
{
/// NOTE: statistics should be consumed, otherwise it creates too much
/// entries in the queue, that leads to memory leak and slow shutdown.
///
/// This is the case when you have kafka table but no SELECT from it or
/// materialized view attached.
///
/// So for now it is disabled by default, until properly fixed.
#if 0
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
{
kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable.
}
#endif
if (kafka_config.get("statistics.interval.ms") != "0")
{

View File

@ -161,26 +161,44 @@ void DefaultCoordinator::updateReadingState(InitialAllRangesAnnouncement announc
PartRefs parts_diff;
/// To get rid of duplicates
for (auto && part: announcement.description)
for (auto && part_ranges: announcement.description)
{
auto the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(),
[&part] (const Part & other) { return other.description.info.getPartNameV1() == part.info.getPartNameV1(); });
Part part{.description = std::move(part_ranges), .replicas = {announcement.replica_num}};
const MergeTreePartInfo & announced_part = part.description.info;
/// We have the same part - add the info about presence on current replica to it
if (the_same_it != all_parts_to_read.end())
auto it = std::lower_bound(cbegin(all_parts_to_read), cend(all_parts_to_read), part);
if (it != all_parts_to_read.cend())
{
the_same_it->replicas.insert(announcement.replica_num);
continue;
const MergeTreePartInfo & found_part = it->description.info;
if (found_part == announced_part)
{
/// We have the same part - add the info about presence on current replica
it->replicas.insert(announcement.replica_num);
continue;
}
else
{
/// check if it is covering or covered part
/// need to compare with 2 nearest parts in set, - lesser and greater than the part from the announcement
bool is_disjoint = found_part.isDisjoint(announced_part);
if (it != all_parts_to_read.cbegin() && is_disjoint)
{
const MergeTreePartInfo & lesser_part = (--it)->description.info;
is_disjoint &= lesser_part.isDisjoint(announced_part);
}
if (!is_disjoint)
continue;
}
}
else if (!all_parts_to_read.empty())
{
/// the announced part is greatest - check if it's disjoint with lesser part
const MergeTreePartInfo & lesser_part = all_parts_to_read.crbegin()->description.info;
if (!lesser_part.isDisjoint(announced_part))
continue;
}
auto covering_or_the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(),
[&part] (const Part & other) { return !other.description.info.isDisjoint(part.info); });
/// It is covering part or we have covering - skip it
if (covering_or_the_same_it != all_parts_to_read.end())
continue;
auto [insert_it, _] = all_parts_to_read.emplace(Part{.description = std::move(part), .replicas = {announcement.replica_num}});
auto [insert_it, _] = all_parts_to_read.emplace(std::move(part));
parts_diff.push_back(insert_it);
}
@ -300,20 +318,20 @@ void DefaultCoordinator::selectPartsAndRanges(const PartRefs & container, size_t
while (!part->description.ranges.empty() && current_mark_size < min_number_of_marks)
{
auto & range = part->description.ranges.front();
const size_t needed = min_number_of_marks - current_mark_size;
if (range.getNumberOfMarks() > min_number_of_marks)
if (range.getNumberOfMarks() > needed)
{
auto new_range = range;
range.begin += min_number_of_marks;
new_range.end = new_range.begin + min_number_of_marks;
auto range_we_take = MarkRange{range.begin, range.begin + needed};
response.description.back().ranges.emplace_back(range_we_take);
current_mark_size += range_we_take.getNumberOfMarks();
response.description.back().ranges.emplace_back(new_range);
current_mark_size += new_range.getNumberOfMarks();
continue;
range.begin += needed;
break;
}
current_mark_size += part->description.ranges.front().getNumberOfMarks();
response.description.back().ranges.emplace_back(part->description.ranges.front());
response.description.back().ranges.emplace_back(range);
current_mark_size += range.getNumberOfMarks();
part->description.ranges.pop_front();
}
}
@ -473,23 +491,21 @@ ParallelReadResponse InOrderCoordinator<mode>::handleRequest(ParallelReadRequest
{
while (!global_part_it->description.ranges.empty() && current_mark_size < request.min_number_of_marks)
{
auto range = global_part_it->description.ranges.back();
auto & range = global_part_it->description.ranges.back();
const size_t needed = request.min_number_of_marks - current_mark_size;
if (range.getNumberOfMarks() > request.min_number_of_marks)
if (range.getNumberOfMarks() > needed)
{
auto new_range = range;
range.end -= request.min_number_of_marks;
new_range.begin = new_range.end - request.min_number_of_marks;
auto range_we_take = MarkRange{range.end - needed, range.end};
part.ranges.emplace_front(range_we_take);
current_mark_size += range_we_take.getNumberOfMarks();
global_part_it->description.ranges.back() = range;
part.ranges.emplace_front(new_range);
current_mark_size += new_range.getNumberOfMarks();
continue;
range.end -= needed;
break;
}
current_mark_size += global_part_it->description.ranges.back().getNumberOfMarks();
part.ranges.emplace_front(global_part_it->description.ranges.back());
part.ranges.emplace_front(range);
current_mark_size += range.getNumberOfMarks();
global_part_it->description.ranges.pop_back();
}
}
@ -497,23 +513,21 @@ ParallelReadResponse InOrderCoordinator<mode>::handleRequest(ParallelReadRequest
{
while (!global_part_it->description.ranges.empty() && current_mark_size < request.min_number_of_marks)
{
auto range = global_part_it->description.ranges.front();
auto & range = global_part_it->description.ranges.front();
const size_t needed = request.min_number_of_marks - current_mark_size;
if (range.getNumberOfMarks() > request.min_number_of_marks)
if (range.getNumberOfMarks() > needed)
{
auto new_range = range;
range.begin += request.min_number_of_marks;
new_range.end = new_range.begin + request.min_number_of_marks;
auto range_we_take = MarkRange{range.begin, range.begin + needed};
part.ranges.emplace_back(range_we_take);
current_mark_size += range_we_take.getNumberOfMarks();
global_part_it->description.ranges.front() = range;
part.ranges.emplace_back(new_range);
current_mark_size += new_range.getNumberOfMarks();
continue;
range.begin += needed;
break;
}
current_mark_size += global_part_it->description.ranges.front().getNumberOfMarks();
part.ranges.emplace_back(global_part_it->description.ranges.front());
part.ranges.emplace_back(range);
current_mark_size += range.getNumberOfMarks();
global_part_it->description.ranges.pop_front();
}
}

View File

@ -6,6 +6,9 @@
#include <Parsers/ExpressionListParsers.h>
#include <IO/Operators.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Common/SipHash.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
namespace DB
@ -49,6 +52,17 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
index_granularity = data_settings->index_granularity;
merging_params_mode = static_cast<int>(data.merging_params.mode);
sign_column = data.merging_params.sign_column;
is_deleted_column = data.merging_params.is_deleted_column;
columns_to_sum = fmt::format("{}", fmt::join(data.merging_params.columns_to_sum.begin(), data.merging_params.columns_to_sum.end(), ","));
version_column = data.merging_params.version_column;
if (data.merging_params.mode == MergeTreeData::MergingParams::Graphite)
{
SipHash graphite_hash;
data.merging_params.graphite_params.updateHash(graphite_hash);
WriteBufferFromOwnString wb;
writeText(graphite_hash.get128(), wb);
graphite_params_hash = std::move(wb.str());
}
/// This code may looks strange, but previously we had only one entity: PRIMARY KEY (or ORDER BY, it doesn't matter)
/// Now we have two different entities ORDER BY and it's optional prefix -- PRIMARY KEY.
@ -90,6 +104,22 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
{
/// Important notes: new added field must always be append to the end of serialized metadata
/// for backward compatible.
/// In addition, two consecutive fields should not share any prefix, otherwise deserialize may fails.
/// For example, if you have two field `v1` and `v2` serialized as:
/// if (!v1.empty()) out << "v1: " << v1 << "\n";
/// if (!v2.empty()) out << "v2: " << v2 << "\n";
/// Let say if `v1` is empty and v2 is non-empty, then `v1` is not in serialized metadata.
/// Later, to deserialize the metadata, `read` will sequentially check if each field with `checkString`.
/// When it begin to check for `v1` and `v2`, the metadata buffer look like this:
/// v2: <v2 value>
/// ^
/// cursor
/// `checkString("v1: ", in)` will be called first and it moves the cursor to `2` instead of `v`, so the
/// subsequent call `checkString("v2: ", in)` will also fails.
out << "metadata format version: 1\n"
<< "date column: " << date_column << "\n"
<< "sampling expression: " << sampling_expression << "\n"
@ -121,6 +151,19 @@ void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
if (!constraints.empty())
out << "constraints: " << constraints << "\n";
if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
{
out << "merge parameters format version: " << merge_params_version << "\n";
if (!version_column.empty())
out << "version column: " << version_column << "\n";
if (!is_deleted_column.empty())
out << "is_deleted column: " << is_deleted_column << "\n";
if (!columns_to_sum.empty())
out << "columns to sum: " << columns_to_sum << "\n";
if (!graphite_params_hash.empty())
out << "graphite hash: " << graphite_params_hash << "\n";
}
}
String ReplicatedMergeTreeTableMetadata::toString() const
@ -170,6 +213,26 @@ void ReplicatedMergeTreeTableMetadata::read(ReadBuffer & in)
if (checkString("constraints: ", in))
in >> constraints >> "\n";
if (checkString("merge parameters format version: ", in))
in >> merge_params_version >> "\n";
else
merge_params_version = REPLICATED_MERGE_TREE_METADATA_LEGACY_VERSION;
if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
{
if (checkString("version column: ", in))
in >> version_column >> "\n";
if (checkString("is_deleted column: ", in))
in >> is_deleted_column >> "\n";
if (checkString("columns to sum: ", in))
in >> columns_to_sum >> "\n";
if (checkString("graphite hash: ", in))
in >> graphite_params_hash >> "\n";
}
}
ReplicatedMergeTreeTableMetadata ReplicatedMergeTreeTableMetadata::parse(const String & s)
@ -210,6 +273,25 @@ void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const Replicat
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in sign column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.sign_column, sign_column);
if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS && from_zk.merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
{
if (version_column != from_zk.version_column)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in version column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.version_column, version_column);
if (is_deleted_column != from_zk.is_deleted_column)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in is_deleted column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.is_deleted_column, is_deleted_column);
if (columns_to_sum != from_zk.columns_to_sum)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in sum columns. "
"Stored in ZooKeeper: {}, local: {}", from_zk.columns_to_sum, columns_to_sum);
if (graphite_params_hash != from_zk.graphite_params_hash)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in graphite params. "
"Stored in ZooKeeper hash: {}, local hash: {}", from_zk.graphite_params_hash, graphite_params_hash);
}
/// NOTE: You can make a less strict check of match expressions so that tables do not break from small changes
/// in formatAST code.
String parsed_zk_primary_key = formattedAST(KeyDescription::parse(from_zk.primary_key, columns, context).expression_list_ast);

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <base/types.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <IO/ReadBufferFromString.h>
namespace DB
{
@ -17,11 +18,20 @@ class ReadBuffer;
*/
struct ReplicatedMergeTreeTableMetadata
{
static constexpr int REPLICATED_MERGE_TREE_METADATA_LEGACY_VERSION = 1;
static constexpr int REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS = 2;
String date_column;
String sampling_expression;
UInt64 index_granularity;
/// Merging related params
int merging_params_mode;
int merge_params_version = REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS;
String sign_column;
String version_column;
String is_deleted_column;
String columns_to_sum;
String graphite_params_hash;
String primary_key;
MergeTreeDataFormatVersion data_format_version;
String partition_key;

View File

@ -248,10 +248,10 @@ Field generateRandomFixedValue(const StorageFuzzJSON::Configuration & config, pc
return f;
}
String fuzzJSONKey(const StorageFuzzJSON::Configuration & config, pcg64 & rnd, const String & source)
String fuzzString(UInt64 min_length, UInt64 max_length, pcg64 & rnd, const String & source, std::function<char(pcg64 &)> charGen)
{
String result;
result.reserve(config.max_key_length);
result.reserve(max_length);
using FA = FuzzAction;
auto get_action = [&]() -> FuzzAction
@ -261,7 +261,7 @@ String fuzzJSONKey(const StorageFuzzJSON::Configuration & config, pcg64 & rnd, c
};
size_t i = 0;
while (i < source.size() && result.size() < config.max_key_length)
while (i < source.size() && result.size() < max_length)
{
auto action = get_action();
switch (action)
@ -271,12 +271,12 @@ String fuzzJSONKey(const StorageFuzzJSON::Configuration & config, pcg64 & rnd, c
}
break;
case FA::Edit: {
result.push_back(generateRandomKeyCharacter(rnd));
result.push_back(charGen(rnd));
++i;
}
break;
case FA::Add: {
result.push_back(generateRandomKeyCharacter(rnd));
result.push_back(charGen(rnd));
}
break;
default:
@ -284,12 +284,24 @@ String fuzzJSONKey(const StorageFuzzJSON::Configuration & config, pcg64 & rnd, c
}
}
while (result.size() < config.min_key_length)
result.push_back(generateRandomKeyCharacter(rnd));
while (result.size() < min_length)
result.push_back(charGen(rnd));
return result;
}
String fuzzJSONKey(const StorageFuzzJSON::Configuration & config, pcg64 & rnd, const String & key)
{
return fuzzString(config.min_key_length, config.max_key_length, rnd, key, generateRandomKeyCharacter);
}
// Randomly modify structural characters (e.g. '{', '}', '[', ']', ':', '"') to generate output that cannot be parsed as JSON.
String fuzzJSONStructure(const StorageFuzzJSON::Configuration & config, pcg64 & rnd, const String & s)
{
return config.should_malform_output ? fuzzString(/*min_length*/ 0, /*max_length*/ s.size(), rnd, s, generateRandomStringValueCharacter)
: s;
}
std::shared_ptr<JSONNode>
generateRandomJSONNode(const StorageFuzzJSON::Configuration & config, pcg64 & rnd, bool with_key, JSONValue::Type type)
{
@ -397,7 +409,7 @@ void fuzzJSONObject(
if (next_node->key)
{
writeDoubleQuoted(*next_node->key, out);
out << ":";
out << fuzzJSONStructure(config, rnd, ":");
}
auto & val = next_node->value;
@ -405,7 +417,11 @@ void fuzzJSONObject(
if (val.fixed)
{
if (val.fixed->getType() == Field::Types::Which::String)
writeDoubleQuoted(val.fixed->get<String>(), out);
{
out << fuzzJSONStructure(config, rnd, "\"");
writeText(val.fixed->get<String>(), out);
out << fuzzJSONStructure(config, rnd, "\"");
}
else
writeFieldText(*val.fixed, out);
}
@ -414,9 +430,9 @@ void fuzzJSONObject(
if (!val.array && !val.object)
return;
const auto & [op, cl, node_list] = val.array ? std::make_tuple('[', ']', *val.array) : std::make_tuple('{', '}', *val.object);
const auto & [op, cl, node_list] = val.array ? std::make_tuple("[", "]", *val.array) : std::make_tuple("{", "}", *val.object);
out << op;
out << fuzzJSONStructure(config, rnd, op);
bool first = true;
for (const auto & ptr : node_list)
@ -426,7 +442,7 @@ void fuzzJSONObject(
WriteBufferFromOwnString child_out;
if (!first)
child_out << ", ";
child_out << fuzzJSONStructure(config, rnd, ", ");
first = false;
fuzzJSONObject(ptr, child_out, config, rnd, depth + 1, node_count);
@ -435,7 +451,7 @@ void fuzzJSONObject(
break;
out << child_out.str();
}
out << cl;
out << fuzzJSONStructure(config, rnd, cl);
}
}
@ -554,10 +570,11 @@ Pipe StorageFuzzJSON::read(
return Pipe::unitePipes(std::move(pipes));
}
static constexpr std::array<std::string_view, 13> optional_configuration_keys
static constexpr std::array<std::string_view, 14> optional_configuration_keys
= {"json_str",
"random_seed",
"reuse_output",
"malform_output",
"probability",
"max_output_length",
"max_nesting_level",
@ -583,6 +600,9 @@ void StorageFuzzJSON::processNamedCollectionResult(Configuration & configuration
if (collection.has("reuse_output"))
configuration.should_reuse_output = static_cast<bool>(collection.get<UInt64>("reuse_output"));
if (collection.has("malform_output"))
configuration.should_malform_output = static_cast<bool>(collection.get<UInt64>("malform_output"));
if (collection.has("probability"))
{
configuration.probability = collection.get<Float64>("probability");

View File

@ -27,6 +27,7 @@ public:
String json_str = "{}";
UInt64 random_seed = randomSeed();
bool should_reuse_output = false;
bool should_malform_output = false;
Float64 probability = 0.25;
UInt64 max_output_length = 1024;

View File

@ -211,17 +211,12 @@ void StorageMergeTree::read(
{
if (local_context->canUseParallelReplicasOnInitiator() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree)
{
auto table_id = getStorageID();
const auto table_id = getStorageID();
const auto & modified_query_ast = ClusterProxy::rewriteSelectQuery(
local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
String cluster_for_parallel_replicas = local_context->getSettingsRef().cluster_for_parallel_replicas;
auto cluster = local_context->getCluster(cluster_for_parallel_replicas);
Block header;
if (local_context->getSettingsRef().allow_experimental_analyzer)
header = InterpreterSelectQueryAnalyzer::getSampleBlock(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze());
else
@ -240,17 +235,22 @@ void StorageMergeTree::read(
select_stream_factory,
modified_query_ast,
local_context,
query_info.storage_limits,
cluster);
query_info.storage_limits);
}
else
{
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree;
if (auto plan = reader.read(
column_names, storage_snapshot, query_info,
local_context, max_block_size, num_streams,
processed_stage, nullptr, enable_parallel_reading))
column_names,
storage_snapshot,
query_info,
local_context,
max_block_size,
num_streams,
processed_stage,
nullptr,
enable_parallel_reading))
query_plan = std::move(*plan);
}
}
@ -829,8 +829,13 @@ void StorageMergeTree::loadDeduplicationLog()
auto disk = getDisks()[0];
std::string path = fs::path(relative_data_path) / "deduplication_logs";
deduplication_log = std::make_unique<MergeTreeDeduplicationLog>(path, settings->non_replicated_deduplication_window, format_version, disk);
deduplication_log->load();
/// If either there is already a deduplication log, or we will be able to use it.
if (disk->exists(path) || !disk->isReadOnly())
{
deduplication_log = std::make_unique<MergeTreeDeduplicationLog>(path, settings->non_replicated_deduplication_window, format_version, disk);
deduplication_log->load();
}
}
void StorageMergeTree::loadMutations()

View File

@ -5338,7 +5338,7 @@ void StorageReplicatedMergeTree::read(
return readLocalSequentialConsistencyImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
if (local_context->canUseParallelReplicasOnInitiator())
return readParallelReplicasImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
return readParallelReplicasImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage);
readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
}
@ -5367,18 +5367,11 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t /*max_block_size*/,
const size_t /*num_streams*/)
QueryProcessingStage::Enum processed_stage)
{
auto table_id = getStorageID();
auto scalars = local_context->hasQueryContext() ? local_context->getQueryContext()->getScalars() : Scalars{};
String cluster_for_parallel_replicas = local_context->getSettingsRef().cluster_for_parallel_replicas;
auto parallel_replicas_cluster = local_context->getCluster(cluster_for_parallel_replicas);
ASTPtr modified_query_ast;
Block header;
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
auto modified_query_tree = buildQueryTreeForShard(query_info, query_info.query_tree);
@ -5389,6 +5382,7 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
}
else
{
const auto table_id = getStorageID();
modified_query_ast = ClusterProxy::rewriteSelectQuery(local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
header
@ -5407,8 +5401,7 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
select_stream_factory,
modified_query_ast,
local_context,
query_info.storage_limits,
parallel_replicas_cluster);
query_info.storage_limits);
}
void StorageReplicatedMergeTree::readLocalImpl(

View File

@ -582,9 +582,7 @@ private:
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams);
QueryProcessingStage::Enum processed_stage);
template <class Func>
void foreachActiveParts(Func && func, bool select_sequential_consistency) const;

View File

@ -9,6 +9,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/logger_useful.h>
#include <Parsers/formatAST.h>
namespace DB

View File

@ -28,6 +28,7 @@ NamesAndTypesList StorageSystemMutations::getNamesAndTypes()
{ "parts_to_do_names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
{ "parts_to_do", std::make_shared<DataTypeInt64>() },
{ "is_done", std::make_shared<DataTypeUInt8>() },
{ "is_killed", std::make_shared<DataTypeUInt8>() },
{ "latest_failed_part", std::make_shared<DataTypeString>() },
{ "latest_fail_time", std::make_shared<DataTypeDateTime>() },
{ "latest_fail_reason", std::make_shared<DataTypeString>() },
@ -138,6 +139,7 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, ContextPtr c
res_columns[col_num++]->insert(parts_to_do_names);
res_columns[col_num++]->insert(parts_to_do_names.size());
res_columns[col_num++]->insert(status.is_done);
res_columns[col_num++]->insert(status.is_killed);
res_columns[col_num++]->insert(status.latest_failed_part);
res_columns[col_num++]->insert(UInt64(status.latest_fail_time));
res_columns[col_num++]->insert(status.latest_fail_reason);

View File

@ -285,6 +285,8 @@ StorageSystemPartsBase::StorageSystemPartsBase(const StorageID & table_id_, Name
auto add_alias = [&](const String & alias_name, const String & column_name)
{
if (!tmp_columns.has(column_name))
return;
ColumnDescription column(alias_name, tmp_columns.get(column_name).type);
column.default_desc.kind = ColumnDefaultKind::Alias;
column.default_desc.expression = std::make_shared<ASTIdentifier>(column_name);

View File

@ -329,6 +329,8 @@ CI_CONFIG = CiConfig(
"SQLancer (debug)": TestConfig("package_debug"),
"Sqllogic test (release)": TestConfig("package_release"),
"SQLTest": TestConfig("package_release"),
"ClickBench (amd64)": TestConfig("package_release"),
"ClickBench (aarch64)": TestConfig("package_aarch64"),
"libFuzzer tests": TestConfig("fuzzers"),
},
)
@ -507,6 +509,11 @@ CHECK_DESCRIPTIONS = [
"successfully startup without any errors, crashes or sanitizer asserts",
lambda x: x.startswith("Upgrade check ("),
),
CheckDescription(
"ClickBench",
"Runs [ClickBench](https://github.com/ClickHouse/ClickBench/) with instant-attach table",
lambda x: x.startswith("ClickBench"),
),
CheckDescription(
"Falback for unknown",
"There's no description for the check yet, please add it to "

229
tests/ci/clickbench.py Normal file
View File

@ -0,0 +1,229 @@
#!/usr/bin/env python3
import argparse
import csv
import logging
import os
import subprocess
import sys
import atexit
from pathlib import Path
from typing import List, Tuple
from github import Github
from build_download_helper import download_all_deb_packages
from clickhouse_helper import (
CiLogsCredentials,
ClickHouseHelper,
prepare_tests_results_for_clickhouse,
)
from commit_status_helper import (
RerunHelper,
get_commit,
override_status,
post_commit_status,
update_mergeable_check,
)
from docker_pull_helper import DockerImage, get_image_with_version
from env_helper import TEMP_PATH, REPORTS_PATH
from get_robot_token import get_best_robot_token
from pr_info import FORCE_TESTS_LABEL, PRInfo
from s3_helper import S3Helper
from stopwatch import Stopwatch
from tee_popen import TeePopen
from upload_result_helper import upload_results
from report import TestResults
def get_image_name() -> str:
return "clickhouse/clickbench"
def get_run_command(
builds_path: Path,
result_path: Path,
server_log_path: Path,
additional_envs: List[str],
ci_logs_args: str,
image: DockerImage,
) -> str:
envs = [f"-e {e}" for e in additional_envs]
env_str = " ".join(envs)
return (
f"docker run --shm-size=16g --volume={builds_path}:/package_folder "
f"{ci_logs_args}"
f"--volume={result_path}:/test_output "
f"--volume={server_log_path}:/var/log/clickhouse-server "
f"--cap-add=SYS_PTRACE {env_str} {image}"
)
def process_results(
result_directory: Path,
server_log_path: Path,
) -> Tuple[str, str, TestResults, List[Path]]:
test_results = [] # type: TestResults
additional_files = [] # type: List[Path]
# Just upload all files from result_directory.
# If task provides processed results, then it's responsible for content of result_directory.
if result_directory.exists():
additional_files = [p for p in result_directory.iterdir() if p.is_file()]
if server_log_path.exists():
additional_files = additional_files + [
p for p in server_log_path.iterdir() if p.is_file()
]
status = []
status_path = result_directory / "check_status.tsv"
if status_path.exists():
logging.info("Found check_status.tsv")
with open(status_path, "r", encoding="utf-8") as status_file:
status = list(csv.reader(status_file, delimiter="\t"))
if len(status) != 1 or len(status[0]) != 2:
logging.info("Files in result folder %s", os.listdir(result_directory))
return "error", "Invalid check_status.tsv", test_results, additional_files
state, description = status[0][0], status[0][1]
try:
results_path = result_directory / "test_results.tsv"
if results_path.exists():
logging.info("Found %s", results_path.name)
else:
logging.info("Files in result folder %s", os.listdir(result_directory))
return "error", "Not found test_results.tsv", test_results, additional_files
except Exception as e:
return (
"error",
f"Cannot parse test_results.tsv ({e})",
test_results,
additional_files,
)
return state, description, test_results, additional_files
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("check_name")
return parser.parse_args()
def main():
logging.basicConfig(level=logging.INFO)
stopwatch = Stopwatch()
temp_path = Path(TEMP_PATH)
temp_path.mkdir(parents=True, exist_ok=True)
reports_path = Path(REPORTS_PATH)
args = parse_args()
check_name = args.check_name
gh = Github(get_best_robot_token(), per_page=100)
pr_info = PRInfo()
commit = get_commit(gh, pr_info.sha)
atexit.register(update_mergeable_check, gh, pr_info, check_name)
rerun_helper = RerunHelper(commit, check_name)
if rerun_helper.is_already_finished_by_status():
logging.info("Check is already finished according to github status, exiting")
sys.exit(0)
image_name = get_image_name()
docker_image = get_image_with_version(reports_path, image_name)
packages_path = temp_path / "packages"
packages_path.mkdir(parents=True, exist_ok=True)
download_all_deb_packages(check_name, reports_path, packages_path)
server_log_path = temp_path / "server_log"
server_log_path.mkdir(parents=True, exist_ok=True)
result_path = temp_path / "result_path"
result_path.mkdir(parents=True, exist_ok=True)
run_log_path = result_path / "run.log"
additional_envs = [] # type: List[str]
ci_logs_credentials = CiLogsCredentials(temp_path / "export-logs-config.sh")
ci_logs_args = ci_logs_credentials.get_docker_arguments(
pr_info, stopwatch.start_time_str, check_name
)
run_command = get_run_command(
packages_path,
result_path,
server_log_path,
additional_envs,
ci_logs_args,
docker_image,
)
logging.info("Going to run ClickBench: %s", run_command)
with TeePopen(run_command, run_log_path) as process:
retcode = process.wait()
if retcode == 0:
logging.info("Run successfully")
else:
logging.info("Run failed")
try:
subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {temp_path}", shell=True)
except subprocess.CalledProcessError:
logging.warning("Failed to change files owner in %s, ignoring it", temp_path)
ci_logs_credentials.clean_ci_logs_from_credentials(run_log_path)
s3_helper = S3Helper()
state, description, test_results, additional_logs = process_results(
result_path, server_log_path
)
state = override_status(state, check_name)
ch_helper = ClickHouseHelper()
report_url = upload_results(
s3_helper,
pr_info.number,
pr_info.sha,
test_results,
[run_log_path] + additional_logs,
check_name,
)
print(f"::notice:: {check_name} Report url: {report_url}")
post_commit_status(commit, state, report_url, description, check_name, pr_info)
prepared_events = prepare_tests_results_for_clickhouse(
pr_info,
test_results,
state,
stopwatch.duration_seconds,
stopwatch.start_time_str,
report_url,
check_name,
)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
if state != "success":
if FORCE_TESTS_LABEL in pr_info.labels:
print(f"'{FORCE_TESTS_LABEL}' enabled, will report success")
else:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -72,7 +72,7 @@ def process_results(result_directory: Path) -> Tuple[str, str, TestResults]:
status = []
status_path = result_directory / "check_status.tsv"
if status_path.exists():
logging.info("Found test_results.tsv")
logging.info("Found %s", status_path.name)
with open(status_path, "r", encoding="utf-8") as status_file:
status = list(csv.reader(status_file, delimiter="\t"))
if len(status) != 1 or len(status[0]) != 2:

View File

@ -169,7 +169,7 @@ def process_results(
status = []
status_path = result_directory / "check_status.tsv"
if status_path.exists():
logging.info("Found test_results.tsv")
logging.info("Found %s", status_path.name)
with open(status_path, "r", encoding="utf-8") as status_file:
status = list(csv.reader(status_file, delimiter="\t"))

View File

@ -118,7 +118,7 @@ def process_results(
status = []
status_path = result_directory / "check_status.tsv"
if status_path.exists():
logging.info("Found test_results.tsv")
logging.info("Found %s", status_path.name)
with open(status_path, "r", encoding="utf-8") as status_file:
status = list(csv.reader(status_file, delimiter="\t"))

View File

@ -0,0 +1,29 @@
<!-- alternative graphite config, for testing 02910_replicated_merge_parameters_must_consistent -->
<clickhouse>
<graphite_rollup_alternative>
<version_column_name>Version</version_column_name>
<pattern>
<regexp>sum</regexp>
<function>any</function>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>17280</age>
<precision>6000</precision>
</retention>
</pattern>
<default>
<function>any</function>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>17280</age>
<precision>6000</precision>
</retention>
</default>
</graphite_rollup_alternative>
</clickhouse>

View File

@ -26,6 +26,7 @@ ln -sf $SRC_PATH/config.d/macros.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/secure_ports.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/

View File

@ -279,6 +279,13 @@ def get_leader(cluster, nodes):
raise Exception("No leader in Keeper cluster.")
def get_any_follower(cluster, nodes):
for node in nodes:
if is_follower(cluster, node):
return node
raise Exception("No followers in Keeper cluster.")
def get_fake_zk(cluster, node, timeout: float = 30.0) -> KazooClient:
_fake = KazooClient(
hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout

View File

@ -2,6 +2,7 @@
<profiles>
<default>
<max_query_size from_env="MAX_QUERY_SIZE" />
<max_threads replace="1" from_env="MAX_THREADS">1</max_threads>
</default>
</profiles>
<users>

View File

@ -1,6 +1,7 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
@ -36,9 +37,13 @@ node7 = cluster.add_instance(
"configs/000-config_with_env_subst.xml",
"configs/010-env_subst_override.xml",
],
env_variables={"MAX_QUERY_SIZE": "121212"},
env_variables={
# overridden with 424242
"MAX_QUERY_SIZE": "121212",
"MAX_THREADS": "2",
},
instance_env_variables=True,
) # overridden with 424242
)
@pytest.fixture(scope="module")
@ -91,6 +96,65 @@ def test_config(start_cluster):
node7.query("select value from system.settings where name = 'max_query_size'")
== "424242\n"
)
assert (
node7.query("select value from system.settings where name = 'max_threads'")
== "2\n"
)
def test_config_invalid_overrides(start_cluster):
node7.replace_config(
"/etc/clickhouse-server/users.d/000-config_with_env_subst.xml",
"""
<clickhouse>
<profiles>
<default>
<max_query_size from_env="MAX_QUERY_SIZE" />
<max_threads from_env="MAX_THREADS">100</max_threads>
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
<quota>default</quota>
</default>
<include incl="users_1" />
<include incl="users_2" />
</users>
</clickhouse>
""",
)
with pytest.raises(
QueryRuntimeException,
match="Failed to preprocess config '/etc/clickhouse-server/users.xml': Exception: Element <max_threads> has value and does not have 'replace' attribute, can't process from_env substitution",
):
node7.query("SYSTEM RELOAD CONFIG")
node7.replace_config(
"/etc/clickhouse-server/users.d/000-config_with_env_subst.xml",
"""
<clickhouse>
<profiles>
<default>
<max_query_size from_env="MAX_QUERY_SIZE" />
<max_threads replace="1" from_env="MAX_THREADS">1</max_threads>
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
<quota>default</quota>
</default>
<include incl="users_1" />
<include incl="users_2" />
</users>
</clickhouse>
""",
)
node7.query("SYSTEM RELOAD CONFIG")
def test_include_config(start_cluster):

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
<http_control>
<port>9182</port>
</http_control>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
<http_control>
<port>9182</port>
</http_control>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
<http_control>
<port>9182</port>
</http_control>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,74 @@
#!/usr/bin/env python3
import os
import pytest
import requests
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
import helpers.keeper_utils as keeper_utils
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
node1 = cluster.add_instance(
"node1", main_configs=["configs/enable_keeper1.xml"], stay_alive=True
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/enable_keeper2.xml"], stay_alive=True
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/enable_keeper3.xml"], stay_alive=True
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_http_readiness_basic_responses(started_cluster):
leader = keeper_utils.get_leader(cluster, [node1, node2, node3])
response = requests.get(
"http://{host}:{port}/ready".format(host=leader.ip_address, port=9182)
)
assert response.status_code == 200
readiness_data = response.json()
assert readiness_data["status"] == "ok"
assert readiness_data["details"]["role"] == "leader"
follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3])
response = requests.get(
"http://{host}:{port}/ready".format(host=follower.ip_address, port=9182)
)
assert response.status_code == 200
readiness_data = response.json()
assert readiness_data["status"] == "ok"
assert readiness_data["details"]["role"] == "follower"
assert readiness_data["details"]["hasLeader"] == True
def test_http_readiness_partitioned_cluster(started_cluster):
with PartitionManager() as pm:
leader = keeper_utils.get_leader(cluster, [node1, node2, node3])
follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3])
pm.partition_instances(leader, follower)
keeper_utils.wait_until_quorum_lost(cluster, follower)
response = requests.get(
"http://{host}:{port}/ready".format(host=follower.ip_address, port=9182)
)
print(response.json())
assert response.status_code == 503
readiness_data = response.json()
assert readiness_data["status"] == "fail"
assert readiness_data["details"]["role"] == "follower"
assert readiness_data["details"]["hasLeader"] == False

Some files were not shown because too many files have changed in this diff Show More