Merge branch 'ClickHouse:master' into bakwc-patch-2

This commit is contained in:
Filipp Ozinov 2023-08-17 18:37:00 +04:00 committed by GitHub
commit ce87451b66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
338 changed files with 3567 additions and 2373 deletions

View File

@ -73,8 +73,8 @@ struct uint128
uint128() = default;
uint128(uint64 low64_, uint64 high64_) : low64(low64_), high64(high64_) {}
friend bool operator ==(const uint128 & x, const uint128 & y) { return (x.low64 == y.low64) && (x.high64 == y.high64); }
friend bool operator !=(const uint128 & x, const uint128 & y) { return !(x == y); }
friend auto operator<=>(const uint128 &, const uint128 &) = default;
};
inline uint64 Uint128Low64(const uint128 & x) { return x.low64; }

2
contrib/curl vendored

@ -1 +1 @@
Subproject commit b0edf0b7dae44d9e66f270a257cf654b35d5263d
Subproject commit eb3b049df526bf125eda23218e680ce7fa9ec46c

View File

@ -8,125 +8,122 @@ endif()
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/curl")
set (SRCS
"${LIBRARY_DIR}/lib/fopen.c"
"${LIBRARY_DIR}/lib/noproxy.c"
"${LIBRARY_DIR}/lib/idn.c"
"${LIBRARY_DIR}/lib/cfilters.c"
"${LIBRARY_DIR}/lib/cf-socket.c"
"${LIBRARY_DIR}/lib/altsvc.c"
"${LIBRARY_DIR}/lib/amigaos.c"
"${LIBRARY_DIR}/lib/asyn-thread.c"
"${LIBRARY_DIR}/lib/base64.c"
"${LIBRARY_DIR}/lib/bufq.c"
"${LIBRARY_DIR}/lib/bufref.c"
"${LIBRARY_DIR}/lib/cf-h1-proxy.c"
"${LIBRARY_DIR}/lib/cf-haproxy.c"
"${LIBRARY_DIR}/lib/cf-https-connect.c"
"${LIBRARY_DIR}/lib/file.c"
"${LIBRARY_DIR}/lib/timeval.c"
"${LIBRARY_DIR}/lib/base64.c"
"${LIBRARY_DIR}/lib/hostip.c"
"${LIBRARY_DIR}/lib/progress.c"
"${LIBRARY_DIR}/lib/formdata.c"
"${LIBRARY_DIR}/lib/cookie.c"
"${LIBRARY_DIR}/lib/http.c"
"${LIBRARY_DIR}/lib/sendf.c"
"${LIBRARY_DIR}/lib/url.c"
"${LIBRARY_DIR}/lib/dict.c"
"${LIBRARY_DIR}/lib/if2ip.c"
"${LIBRARY_DIR}/lib/speedcheck.c"
"${LIBRARY_DIR}/lib/ldap.c"
"${LIBRARY_DIR}/lib/version.c"
"${LIBRARY_DIR}/lib/getenv.c"
"${LIBRARY_DIR}/lib/escape.c"
"${LIBRARY_DIR}/lib/mprintf.c"
"${LIBRARY_DIR}/lib/telnet.c"
"${LIBRARY_DIR}/lib/netrc.c"
"${LIBRARY_DIR}/lib/getinfo.c"
"${LIBRARY_DIR}/lib/transfer.c"
"${LIBRARY_DIR}/lib/strcase.c"
"${LIBRARY_DIR}/lib/easy.c"
"${LIBRARY_DIR}/lib/curl_fnmatch.c"
"${LIBRARY_DIR}/lib/curl_log.c"
"${LIBRARY_DIR}/lib/fileinfo.c"
"${LIBRARY_DIR}/lib/krb5.c"
"${LIBRARY_DIR}/lib/memdebug.c"
"${LIBRARY_DIR}/lib/http_chunks.c"
"${LIBRARY_DIR}/lib/strtok.c"
"${LIBRARY_DIR}/lib/cf-socket.c"
"${LIBRARY_DIR}/lib/cfilters.c"
"${LIBRARY_DIR}/lib/conncache.c"
"${LIBRARY_DIR}/lib/connect.c"
"${LIBRARY_DIR}/lib/llist.c"
"${LIBRARY_DIR}/lib/hash.c"
"${LIBRARY_DIR}/lib/multi.c"
"${LIBRARY_DIR}/lib/content_encoding.c"
"${LIBRARY_DIR}/lib/share.c"
"${LIBRARY_DIR}/lib/http_digest.c"
"${LIBRARY_DIR}/lib/md4.c"
"${LIBRARY_DIR}/lib/md5.c"
"${LIBRARY_DIR}/lib/http_negotiate.c"
"${LIBRARY_DIR}/lib/inet_pton.c"
"${LIBRARY_DIR}/lib/strtoofft.c"
"${LIBRARY_DIR}/lib/strerror.c"
"${LIBRARY_DIR}/lib/amigaos.c"
"${LIBRARY_DIR}/lib/cookie.c"
"${LIBRARY_DIR}/lib/curl_addrinfo.c"
"${LIBRARY_DIR}/lib/curl_des.c"
"${LIBRARY_DIR}/lib/curl_endian.c"
"${LIBRARY_DIR}/lib/curl_fnmatch.c"
"${LIBRARY_DIR}/lib/curl_get_line.c"
"${LIBRARY_DIR}/lib/curl_gethostname.c"
"${LIBRARY_DIR}/lib/curl_gssapi.c"
"${LIBRARY_DIR}/lib/curl_memrchr.c"
"${LIBRARY_DIR}/lib/curl_multibyte.c"
"${LIBRARY_DIR}/lib/curl_ntlm_core.c"
"${LIBRARY_DIR}/lib/curl_ntlm_wb.c"
"${LIBRARY_DIR}/lib/curl_path.c"
"${LIBRARY_DIR}/lib/curl_range.c"
"${LIBRARY_DIR}/lib/curl_rtmp.c"
"${LIBRARY_DIR}/lib/curl_sasl.c"
"${LIBRARY_DIR}/lib/curl_sspi.c"
"${LIBRARY_DIR}/lib/curl_threads.c"
"${LIBRARY_DIR}/lib/curl_trc.c"
"${LIBRARY_DIR}/lib/dict.c"
"${LIBRARY_DIR}/lib/doh.c"
"${LIBRARY_DIR}/lib/dynbuf.c"
"${LIBRARY_DIR}/lib/dynhds.c"
"${LIBRARY_DIR}/lib/easy.c"
"${LIBRARY_DIR}/lib/escape.c"
"${LIBRARY_DIR}/lib/file.c"
"${LIBRARY_DIR}/lib/fileinfo.c"
"${LIBRARY_DIR}/lib/fopen.c"
"${LIBRARY_DIR}/lib/formdata.c"
"${LIBRARY_DIR}/lib/getenv.c"
"${LIBRARY_DIR}/lib/getinfo.c"
"${LIBRARY_DIR}/lib/gopher.c"
"${LIBRARY_DIR}/lib/hash.c"
"${LIBRARY_DIR}/lib/headers.c"
"${LIBRARY_DIR}/lib/hmac.c"
"${LIBRARY_DIR}/lib/hostasyn.c"
"${LIBRARY_DIR}/lib/hostip.c"
"${LIBRARY_DIR}/lib/hostip4.c"
"${LIBRARY_DIR}/lib/hostip6.c"
"${LIBRARY_DIR}/lib/hostsyn.c"
"${LIBRARY_DIR}/lib/hsts.c"
"${LIBRARY_DIR}/lib/http.c"
"${LIBRARY_DIR}/lib/http2.c"
"${LIBRARY_DIR}/lib/http_aws_sigv4.c"
"${LIBRARY_DIR}/lib/http_chunks.c"
"${LIBRARY_DIR}/lib/http_digest.c"
"${LIBRARY_DIR}/lib/http_negotiate.c"
"${LIBRARY_DIR}/lib/http_ntlm.c"
"${LIBRARY_DIR}/lib/http_proxy.c"
"${LIBRARY_DIR}/lib/idn.c"
"${LIBRARY_DIR}/lib/if2ip.c"
"${LIBRARY_DIR}/lib/imap.c"
"${LIBRARY_DIR}/lib/inet_ntop.c"
"${LIBRARY_DIR}/lib/inet_pton.c"
"${LIBRARY_DIR}/lib/krb5.c"
"${LIBRARY_DIR}/lib/ldap.c"
"${LIBRARY_DIR}/lib/llist.c"
"${LIBRARY_DIR}/lib/md4.c"
"${LIBRARY_DIR}/lib/md5.c"
"${LIBRARY_DIR}/lib/memdebug.c"
"${LIBRARY_DIR}/lib/mime.c"
"${LIBRARY_DIR}/lib/mprintf.c"
"${LIBRARY_DIR}/lib/mqtt.c"
"${LIBRARY_DIR}/lib/multi.c"
"${LIBRARY_DIR}/lib/netrc.c"
"${LIBRARY_DIR}/lib/nonblock.c"
"${LIBRARY_DIR}/lib/noproxy.c"
"${LIBRARY_DIR}/lib/openldap.c"
"${LIBRARY_DIR}/lib/parsedate.c"
"${LIBRARY_DIR}/lib/pingpong.c"
"${LIBRARY_DIR}/lib/pop3.c"
"${LIBRARY_DIR}/lib/progress.c"
"${LIBRARY_DIR}/lib/psl.c"
"${LIBRARY_DIR}/lib/rand.c"
"${LIBRARY_DIR}/lib/rename.c"
"${LIBRARY_DIR}/lib/rtsp.c"
"${LIBRARY_DIR}/lib/select.c"
"${LIBRARY_DIR}/lib/splay.c"
"${LIBRARY_DIR}/lib/strdup.c"
"${LIBRARY_DIR}/lib/sendf.c"
"${LIBRARY_DIR}/lib/setopt.c"
"${LIBRARY_DIR}/lib/sha256.c"
"${LIBRARY_DIR}/lib/share.c"
"${LIBRARY_DIR}/lib/slist.c"
"${LIBRARY_DIR}/lib/smb.c"
"${LIBRARY_DIR}/lib/smtp.c"
"${LIBRARY_DIR}/lib/socketpair.c"
"${LIBRARY_DIR}/lib/socks.c"
"${LIBRARY_DIR}/lib/curl_addrinfo.c"
"${LIBRARY_DIR}/lib/socks_gssapi.c"
"${LIBRARY_DIR}/lib/socks_sspi.c"
"${LIBRARY_DIR}/lib/curl_sspi.c"
"${LIBRARY_DIR}/lib/slist.c"
"${LIBRARY_DIR}/lib/nonblock.c"
"${LIBRARY_DIR}/lib/curl_memrchr.c"
"${LIBRARY_DIR}/lib/imap.c"
"${LIBRARY_DIR}/lib/pop3.c"
"${LIBRARY_DIR}/lib/smtp.c"
"${LIBRARY_DIR}/lib/pingpong.c"
"${LIBRARY_DIR}/lib/rtsp.c"
"${LIBRARY_DIR}/lib/curl_threads.c"
"${LIBRARY_DIR}/lib/warnless.c"
"${LIBRARY_DIR}/lib/hmac.c"
"${LIBRARY_DIR}/lib/curl_rtmp.c"
"${LIBRARY_DIR}/lib/openldap.c"
"${LIBRARY_DIR}/lib/curl_gethostname.c"
"${LIBRARY_DIR}/lib/gopher.c"
"${LIBRARY_DIR}/lib/http_proxy.c"
"${LIBRARY_DIR}/lib/asyn-thread.c"
"${LIBRARY_DIR}/lib/curl_gssapi.c"
"${LIBRARY_DIR}/lib/http_ntlm.c"
"${LIBRARY_DIR}/lib/curl_ntlm_wb.c"
"${LIBRARY_DIR}/lib/curl_ntlm_core.c"
"${LIBRARY_DIR}/lib/curl_sasl.c"
"${LIBRARY_DIR}/lib/rand.c"
"${LIBRARY_DIR}/lib/curl_multibyte.c"
"${LIBRARY_DIR}/lib/conncache.c"
"${LIBRARY_DIR}/lib/cf-h1-proxy.c"
"${LIBRARY_DIR}/lib/http2.c"
"${LIBRARY_DIR}/lib/smb.c"
"${LIBRARY_DIR}/lib/curl_endian.c"
"${LIBRARY_DIR}/lib/curl_des.c"
"${LIBRARY_DIR}/lib/speedcheck.c"
"${LIBRARY_DIR}/lib/splay.c"
"${LIBRARY_DIR}/lib/strcase.c"
"${LIBRARY_DIR}/lib/strdup.c"
"${LIBRARY_DIR}/lib/strerror.c"
"${LIBRARY_DIR}/lib/strtok.c"
"${LIBRARY_DIR}/lib/strtoofft.c"
"${LIBRARY_DIR}/lib/system_win32.c"
"${LIBRARY_DIR}/lib/mime.c"
"${LIBRARY_DIR}/lib/sha256.c"
"${LIBRARY_DIR}/lib/setopt.c"
"${LIBRARY_DIR}/lib/curl_path.c"
"${LIBRARY_DIR}/lib/curl_range.c"
"${LIBRARY_DIR}/lib/psl.c"
"${LIBRARY_DIR}/lib/doh.c"
"${LIBRARY_DIR}/lib/urlapi.c"
"${LIBRARY_DIR}/lib/curl_get_line.c"
"${LIBRARY_DIR}/lib/altsvc.c"
"${LIBRARY_DIR}/lib/socketpair.c"
"${LIBRARY_DIR}/lib/bufref.c"
"${LIBRARY_DIR}/lib/bufq.c"
"${LIBRARY_DIR}/lib/dynbuf.c"
"${LIBRARY_DIR}/lib/dynhds.c"
"${LIBRARY_DIR}/lib/hsts.c"
"${LIBRARY_DIR}/lib/http_aws_sigv4.c"
"${LIBRARY_DIR}/lib/mqtt.c"
"${LIBRARY_DIR}/lib/rename.c"
"${LIBRARY_DIR}/lib/headers.c"
"${LIBRARY_DIR}/lib/telnet.c"
"${LIBRARY_DIR}/lib/timediff.c"
"${LIBRARY_DIR}/lib/vauth/vauth.c"
"${LIBRARY_DIR}/lib/timeval.c"
"${LIBRARY_DIR}/lib/transfer.c"
"${LIBRARY_DIR}/lib/url.c"
"${LIBRARY_DIR}/lib/urlapi.c"
"${LIBRARY_DIR}/lib/vauth/cleartext.c"
"${LIBRARY_DIR}/lib/vauth/cram.c"
"${LIBRARY_DIR}/lib/vauth/digest.c"
@ -138,23 +135,24 @@ set (SRCS
"${LIBRARY_DIR}/lib/vauth/oauth2.c"
"${LIBRARY_DIR}/lib/vauth/spnego_gssapi.c"
"${LIBRARY_DIR}/lib/vauth/spnego_sspi.c"
"${LIBRARY_DIR}/lib/vauth/vauth.c"
"${LIBRARY_DIR}/lib/version.c"
"${LIBRARY_DIR}/lib/vquic/vquic.c"
"${LIBRARY_DIR}/lib/vtls/openssl.c"
"${LIBRARY_DIR}/lib/vssh/libssh.c"
"${LIBRARY_DIR}/lib/vssh/libssh2.c"
"${LIBRARY_DIR}/lib/vtls/bearssl.c"
"${LIBRARY_DIR}/lib/vtls/gtls.c"
"${LIBRARY_DIR}/lib/vtls/vtls.c"
"${LIBRARY_DIR}/lib/vtls/nss.c"
"${LIBRARY_DIR}/lib/vtls/wolfssl.c"
"${LIBRARY_DIR}/lib/vtls/hostcheck.c"
"${LIBRARY_DIR}/lib/vtls/keylog.c"
"${LIBRARY_DIR}/lib/vtls/mbedtls.c"
"${LIBRARY_DIR}/lib/vtls/openssl.c"
"${LIBRARY_DIR}/lib/vtls/schannel.c"
"${LIBRARY_DIR}/lib/vtls/schannel_verify.c"
"${LIBRARY_DIR}/lib/vtls/sectransp.c"
"${LIBRARY_DIR}/lib/vtls/gskit.c"
"${LIBRARY_DIR}/lib/vtls/mbedtls.c"
"${LIBRARY_DIR}/lib/vtls/bearssl.c"
"${LIBRARY_DIR}/lib/vtls/keylog.c"
"${LIBRARY_DIR}/lib/vtls/vtls.c"
"${LIBRARY_DIR}/lib/vtls/wolfssl.c"
"${LIBRARY_DIR}/lib/vtls/x509asn1.c"
"${LIBRARY_DIR}/lib/vtls/hostcheck.c"
"${LIBRARY_DIR}/lib/vssh/libssh2.c"
"${LIBRARY_DIR}/lib/vssh/libssh.c"
"${LIBRARY_DIR}/lib/warnless.c"
)
add_library (_curl ${SRCS})

View File

@ -12,6 +12,7 @@ ENV \
# install systemd packages
RUN apt-get update && \
apt-get install -y --no-install-recommends \
sudo \
systemd \
&& \
apt-get clean && \

View File

@ -1,18 +1,7 @@
# docker build -t clickhouse/performance-comparison .
# Using ubuntu:22.04 over 20.04 as all other images, since:
# a) ubuntu 20.04 has too old parallel, and does not support --memsuspend
# b) anyway for perf tests it should not be important (backward compatiblity
# with older ubuntu had been checked lots of times in various tests)
FROM ubuntu:22.04
# ARG for quick switch to a given ubuntu mirror
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
ENV LANG=C.UTF-8
ENV TZ=Europe/Amsterdam
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ARG FROM_TAG=latest
FROM clickhouse/test-base:$FROM_TAG
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends \
@ -56,10 +45,9 @@ COPY * /
# node #0 should be less stable because of system interruptions. We bind
# randomly to node 1 or 0 to gather some statistics on that. We have to bind
# both servers and the tmpfs on which the database is stored. How to do it
# through Yandex Sandbox API is unclear, but by default tmpfs uses
# is unclear, but by default tmpfs uses
# 'process allocation policy', not sure which process but hopefully the one that
# writes to it, so just bind the downloader script as well. We could also try to
# remount it with proper options in Sandbox task.
# writes to it, so just bind the downloader script as well.
# https://www.kernel.org/doc/Documentation/filesystems/tmpfs.txt
# Double-escaped backslashes are a tribute to the engineering wonder of docker --
# it gives '/bin/sh: 1: [bash,: not found' otherwise.

View File

@ -90,7 +90,7 @@ function configure
set +m
wait_for_server $LEFT_SERVER_PORT $left_pid
echo Server for setup started
echo "Server for setup started"
clickhouse-client --port $LEFT_SERVER_PORT --query "create database test" ||:
clickhouse-client --port $LEFT_SERVER_PORT --query "rename table datasets.hits_v1 to test.hits" ||:
@ -156,9 +156,9 @@ function restart
wait_for_server $RIGHT_SERVER_PORT $right_pid
echo right ok
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.tables where database != 'system'"
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.tables where database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')"
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.build_options"
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.tables where database != 'system'"
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.tables where database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')"
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.build_options"
# Check again that both servers we started are running -- this is important
@ -352,14 +352,12 @@ function get_profiles
wait
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.query_log where type in ('QueryFinish', 'ExceptionWhileProcessing') format TSVWithNamesAndTypes" > left-query-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.query_thread_log format TSVWithNamesAndTypes" > left-query-thread-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.trace_log format TSVWithNamesAndTypes" > left-trace-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select arrayJoin(trace) addr, concat(splitByChar('/', addressToLine(addr))[-1], '#', demangle(addressToSymbol(addr)) ) name from system.trace_log group by addr format TSVWithNamesAndTypes" > left-addresses.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.metric_log format TSVWithNamesAndTypes" > left-metric-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.asynchronous_metric_log format TSVWithNamesAndTypes" > left-async-metric-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.query_log where type in ('QueryFinish', 'ExceptionWhileProcessing') format TSVWithNamesAndTypes" > right-query-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.query_thread_log format TSVWithNamesAndTypes" > right-query-thread-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.trace_log format TSVWithNamesAndTypes" > right-trace-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select arrayJoin(trace) addr, concat(splitByChar('/', addressToLine(addr))[-1], '#', demangle(addressToSymbol(addr)) ) name from system.trace_log group by addr format TSVWithNamesAndTypes" > right-addresses.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.metric_log format TSVWithNamesAndTypes" > right-metric-log.tsv ||: &

View File

@ -19,31 +19,6 @@
<opentelemetry_span_log remove="remove"/>
<session_log remove="remove"/>
<!-- performance tests does not uses real block devices,
instead they stores everything in memory.
And so, to avoid extra memory reference switch *_log to Memory engine. -->
<query_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</query_log>
<query_thread_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</query_thread_log>
<trace_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</trace_log>
<metric_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</metric_log>
<asynchronous_metric_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</asynchronous_metric_log>
<uncompressed_cache_size>1000000000</uncompressed_cache_size>
<asynchronous_metrics_update_period_s>10</asynchronous_metrics_update_period_s>

View File

@ -31,8 +31,6 @@ function download
# Test all of them.
declare -a urls_to_try=(
"$S3_URL/PRs/$left_pr/$left_sha/$BUILD_NAME/performance.tar.zst"
"$S3_URL/$left_pr/$left_sha/$BUILD_NAME/performance.tar.zst"
"$S3_URL/$left_pr/$left_sha/$BUILD_NAME/performance.tgz"
)
for path in "${urls_to_try[@]}"

View File

@ -130,7 +130,7 @@ then
git -C right/ch diff --name-only "$base" pr -- :!tests/performance :!docker/test/performance-comparison | tee other-changed-files.txt
fi
# Set python output encoding so that we can print queries with Russian letters.
# Set python output encoding so that we can print queries with non-ASCII letters.
export PYTHONIOENCODING=utf-8
# By default, use the main comparison script from the tested package, so that we
@ -151,11 +151,7 @@ export PATH
export REF_PR
export REF_SHA
# Try to collect some core dumps. I've seen two patterns in Sandbox:
# 1) |/home/zomb-sandbox/venv/bin/python /home/zomb-sandbox/client/sandbox/bin/coredumper.py %e %p %g %u %s %P %c
# Not sure what this script does (puts them to sandbox resources, logs some messages?),
# and it's not accessible from inside docker anyway.
# 2) something like %e.%p.core.dmp. The dump should end up in the workspace directory.
# Try to collect some core dumps.
# At least we remove the ulimit and then try to pack some common file names into output.
ulimit -c unlimited
cat /proc/sys/kernel/core_pattern

View File

@ -1,5 +1,5 @@
# docker build -t clickhouse/style-test .
FROM ubuntu:20.04
FROM ubuntu:22.04
ARG ACT_VERSION=0.2.33
ARG ACTIONLINT_VERSION=1.6.22

View File

@ -190,7 +190,7 @@ These are the schema conversion manipulations you can do with table overrides fo
* Modify [column TTL](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#mergetree-column-ttl).
* Modify [column compression codec](/docs/en/sql-reference/statements/create/table.md/#codecs).
* Add [ALIAS columns](/docs/en/sql-reference/statements/create/table.md/#alias).
* Add [skipping indexes](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-data_skipping-indexes)
* Add [skipping indexes](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-data_skipping-indexes). Note that you need to enable `use_skip_indexes_if_final` setting to make them work (MaterializedMySQL is using `SELECT ... FINAL` by default)
* Add [projections](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#projections). Note that projection optimizations are
disabled when using `SELECT ... FINAL` (which MaterializedMySQL does by default), so their utility is limited here.
`INDEX ... TYPE hypothesis` as [described in the v21.12 blog post]](https://clickhouse.com/blog/en/2021/clickhouse-v21.12-released/)

View File

@ -1,4 +1,4 @@
# Approximate Nearest Neighbor Search Indexes [experimental] {#table_engines-ANNIndex}
# Approximate Nearest Neighbor Search Indexes [experimental]
Nearest neighborhood search is the problem of finding the M closest points for a given point in an N-dimensional vector space. The most
straightforward approach to solve this problem is a brute force search where the distance between all points in the vector space and the
@ -17,7 +17,7 @@ In terms of SQL, the nearest neighborhood problem can be expressed as follows:
``` sql
SELECT *
FROM table
FROM table_with_ann_index
ORDER BY Distance(vectors, Point)
LIMIT N
```
@ -32,7 +32,7 @@ An alternative formulation of the nearest neighborhood search problem looks as f
``` sql
SELECT *
FROM table
FROM table_with_ann_index
WHERE Distance(vectors, Point) < MaxDistance
LIMIT N
```
@ -45,12 +45,12 @@ With brute force search, both queries are expensive (linear in the number of poi
`Point` must be computed. To speed this process up, Approximate Nearest Neighbor Search Indexes (ANN indexes) store a compact representation
of the search space (using clustering, search trees, etc.) which allows to compute an approximate answer much quicker (in sub-linear time).
# Creating and Using ANN Indexes
# Creating and Using ANN Indexes {#creating_using_ann_indexes}
Syntax to create an ANN index over an [Array](../../../sql-reference/data-types/array.md) column:
```sql
CREATE TABLE table
CREATE TABLE table_with_ann_index
(
`id` Int64,
`vectors` Array(Float32),
@ -63,7 +63,7 @@ ORDER BY id;
Syntax to create an ANN index over a [Tuple](../../../sql-reference/data-types/tuple.md) column:
```sql
CREATE TABLE table
CREATE TABLE table_with_ann_index
(
`id` Int64,
`vectors` Tuple(Float32[, Float32[, ...]]),
@ -83,7 +83,7 @@ ANN indexes support two types of queries:
``` sql
SELECT *
FROM table
FROM table_with_ann_index
[WHERE ...]
ORDER BY Distance(vectors, Point)
LIMIT N
@ -93,7 +93,7 @@ ANN indexes support two types of queries:
``` sql
SELECT *
FROM table
FROM table_with_ann_index
WHERE Distance(vectors, Point) < MaxDistance
LIMIT N
```
@ -103,7 +103,7 @@ To avoid writing out large vectors, you can use [query
parameters](/docs/en/interfaces/cli.md#queries-with-parameters-cli-queries-with-parameters), e.g.
```bash
clickhouse-client --param_vec='hello' --query="SELECT * FROM table WHERE L2Distance(vectors, {vec: Array(Float32)}) < 1.0"
clickhouse-client --param_vec='hello' --query="SELECT * FROM table_with_ann_index WHERE L2Distance(vectors, {vec: Array(Float32)}) < 1.0"
```
:::
@ -138,7 +138,7 @@ back to a smaller `GRANULARITY` values only in case of problems like excessive m
was specified for ANN indexes, the default value is 100 million.
# Available ANN Indexes
# Available ANN Indexes {#available_ann_indexes}
- [Annoy](/docs/en/engines/table-engines/mergetree-family/annindexes.md#annoy-annoy)
@ -165,7 +165,7 @@ space in random linear surfaces (lines in 2D, planes in 3D etc.).
Syntax to create an Annoy index over an [Array](../../../sql-reference/data-types/array.md) column:
```sql
CREATE TABLE table
CREATE TABLE table_with_annoy_index
(
id Int64,
vectors Array(Float32),
@ -178,7 +178,7 @@ ORDER BY id;
Syntax to create an ANN index over a [Tuple](../../../sql-reference/data-types/tuple.md) column:
```sql
CREATE TABLE table
CREATE TABLE table_with_annoy_index
(
id Int64,
vectors Tuple(Float32[, Float32[, ...]]),
@ -188,23 +188,17 @@ ENGINE = MergeTree
ORDER BY id;
```
Annoy currently supports `L2Distance` and `cosineDistance` as distance function `Distance`. If no distance function was specified during
index creation, `L2Distance` is used as default. Parameter `NumTrees` is the number of trees which the algorithm creates (default if not
specified: 100). Higher values of `NumTree` mean more accurate search results but slower index creation / query times (approximately
linearly) as well as larger index sizes.
Annoy currently supports two distance functions:
- `L2Distance`, also called Euclidean distance, is the length of a line segment between two points in Euclidean space
([Wikipedia](https://en.wikipedia.org/wiki/Euclidean_distance)).
- `cosineDistance`, also called cosine similarity, is the cosine of the angle between two (non-zero) vectors
([Wikipedia](https://en.wikipedia.org/wiki/Cosine_similarity)).
`L2Distance` is also called Euclidean distance, the Euclidean distance between two points in Euclidean space is the length of a line segment between the two points.
For example: If we have point P(p1,p2), Q(q1,q2), their distance will be d(p,q)
![L2Distance](https://en.wikipedia.org/wiki/Euclidean_distance#/media/File:Euclidean_distance_2d.svg)
For normalized data, `L2Distance` is usually a better choice, otherwise `cosineDistance` is recommended to compensate for scale. If no
distance function was specified during index creation, `L2Distance` is used as default.
`cosineDistance` also called cosine similarity is a measure of similarity between two non-zero vectors defined in an inner product space. Cosine similarity is the cosine of the angle between the vectors; that is, it is the dot product of the vectors divided by the product of their lengths.
![cosineDistance](https://www.tyrrell4innovation.ca/wp-content/uploads/2021/06/rsz_jenny_du_miword.png)
The Euclidean distance corresponds to the L2-norm of a difference between vectors. The cosine similarity is proportional to the dot product of two vectors and inversely proportional to the product of their magnitudes.
![compare](https://www.researchgate.net/publication/320914786/figure/fig2/AS:558221849841664@1510101868614/The-difference-between-Euclidean-distance-and-cosine-similarity.png)
In one sentence: cosine similarity care only about the angle between them, but do not care about the "distance" we normally think.
![L2 distance](https://www.baeldung.com/wp-content/uploads/sites/4/2020/06/4-1.png)
![cosineDistance](https://www.baeldung.com/wp-content/uploads/sites/4/2020/06/5.png)
Parameter `NumTrees` is the number of trees which the algorithm creates (default if not specified: 100). Higher values of `NumTree` mean
more accurate search results but slower index creation / query times (approximately linearly) as well as larger index sizes.
:::note
Indexes over columns of type `Array` will generally work faster than indexes on `Tuple` columns. All arrays **must** have same length. Use

View File

@ -11,7 +11,7 @@ Inserts data into a table.
**Syntax**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
```
You can specify a list of columns to insert using the `(c1, c2, c3)`. You can also use an expression with column [matcher](../../sql-reference/statements/select/index.md#asterisk) such as `*` and/or [modifiers](../../sql-reference/statements/select/index.md#select-modifiers) such as [APPLY](../../sql-reference/statements/select/index.md#apply-modifier), [EXCEPT](../../sql-reference/statements/select/index.md#except-modifier), [REPLACE](../../sql-reference/statements/select/index.md#replace-modifier).
@ -107,7 +107,7 @@ If table has [constraints](../../sql-reference/statements/create/table.md#constr
**Syntax**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] SELECT ...
```
Columns are mapped according to their position in the SELECT clause. However, their names in the SELECT expression and the table for INSERT may differ. If necessary, type casting is performed.
@ -126,7 +126,7 @@ To insert a default value instead of `NULL` into a column with not nullable data
**Syntax**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
```
Use the syntax above to insert data from a file, or files, stored on the **client** side. `file_name` and `type` are string literals. Input file [format](../../interfaces/formats.md) must be set in the `FORMAT` clause.

View File

@ -11,7 +11,7 @@ sidebar_label: INSERT INTO
**Синтаксис**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
```
Вы можете указать список столбцов для вставки, используя синтаксис `(c1, c2, c3)`. Также можно использовать выражение cо [звездочкой](../../sql-reference/statements/select/index.md#asterisk) и/или модификаторами, такими как [APPLY](../../sql-reference/statements/select/index.md#apply-modifier), [EXCEPT](../../sql-reference/statements/select/index.md#except-modifier), [REPLACE](../../sql-reference/statements/select/index.md#replace-modifier).
@ -100,7 +100,7 @@ INSERT INTO t FORMAT TabSeparated
**Синтаксис**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] SELECT ...
```
Соответствие столбцов определяется их позицией в секции SELECT. При этом, их имена в выражении SELECT и в таблице для INSERT, могут отличаться. При необходимости выполняется приведение типов данных, эквивалентное соответствующему оператору CAST.
@ -120,7 +120,7 @@ INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
**Синтаксис**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
```
Используйте этот синтаксис, чтобы вставить данные из файла, который хранится на стороне **клиента**. `file_name` и `type` задаются в виде строковых литералов. [Формат](../../interfaces/formats.md) входного файла должен быть задан в секции `FORMAT`.

View File

@ -8,7 +8,7 @@ INSERT INTO 语句主要用于向系统中添加数据.
查询的基本格式:
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
```
您可以在查询中指定要插入的列的列表,如:`[(c1, c2, c3)]`。您还可以使用列[匹配器](../../sql-reference/statements/select/index.md#asterisk)的表达式,例如`*`和/或[修饰符](../../sql-reference/statements/select/index.md#select-modifiers),例如 [APPLY](../../sql-reference/statements/select/index.md#apply-modifier) [EXCEPT](../../sql-reference/statements/select/index.md#apply-modifier) [REPLACE](../../sql-reference/statements/select/index.md#replace-modifier)。
@ -71,7 +71,7 @@ INSERT INTO [db.]table [(c1, c2, c3)] FORMAT format_name data_set
例如下面的查询所使用的输入格式就与上面INSERT … VALUES的中使用的输入格式相同
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] FORMAT Values (v11, v12, v13), (v21, v22, v23), ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] FORMAT Values (v11, v12, v13), (v21, v22, v23), ...
```
ClickHouse会清除数据前所有的空白字符与一个换行符如果有换行符的话。所以在进行查询时我们建议您将数据放入到输入输出格式名称后的新的一行中去如果数据是以空白字符开始的这将非常重要
@ -93,7 +93,7 @@ INSERT INTO t FORMAT TabSeparated
### 使用`SELECT`的结果写入 {#inserting-the-results-of-select}
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] SELECT ...
```
写入与SELECT的列的对应关系是使用位置来进行对应的尽管它们在SELECT表达式与INSERT中的名称可能是不同的。如果需要会对它们执行对应的类型转换。

View File

@ -997,7 +997,9 @@ namespace
{
/// sudo respects limits in /etc/security/limits.conf e.g. open files,
/// that's why we are using it instead of the 'clickhouse su' tool.
command = fmt::format("sudo -u '{}' {}", user, command);
/// by default, sudo resets all the ENV variables, but we should preserve
/// the values /etc/default/clickhouse in /etc/init.d/clickhouse file
command = fmt::format("sudo --preserve-env -u '{}' {}", user, command);
}
fmt::print("Will run {}\n", command);

View File

@ -2,6 +2,8 @@
#include <sys/resource.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
#include <base/getMemoryAmount.h>
#include <base/errnoToString.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/String.h>
@ -655,43 +657,66 @@ void LocalServer::processConfig()
/// There is no need for concurrent queries, override max_concurrent_queries.
global_context->getProcessList().setMaxSize(0);
/// Size of cache for uncompressed blocks. Zero means disabled.
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", "");
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0);
const size_t memory_amount = getMemoryAmount();
const double cache_size_to_ram_max_ratio = config().getDouble("cache_size_to_ram_max_ratio", 0.5);
const size_t max_cache_size = static_cast<size_t>(memory_amount * cache_size_to_ram_max_ratio);
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", DEFAULT_UNCOMPRESSED_CACHE_POLICY);
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE);
if (uncompressed_cache_size > max_cache_size)
{
uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (uncompressed_cache_size)
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size);
/// Size of cache for marks (index of MergeTree family of tables).
String mark_cache_policy = config().getString("mark_cache_policy", "");
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);
String mark_cache_policy = config().getString("mark_cache_policy", DEFAULT_MARK_CACHE_POLICY);
size_t mark_cache_size = config().getUInt64("mark_cache_size", DEFAULT_MARK_CACHE_MAX_SIZE);
if (!mark_cache_size)
LOG_ERROR(log, "Too low mark cache size will lead to severe performance degradation.");
if (mark_cache_size > max_cache_size)
{
mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size));
}
if (mark_cache_size)
global_context->setMarkCache(mark_cache_policy, mark_cache_size);
/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
if (index_uncompressed_cache_size > max_cache_size)
{
index_uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);
/// Size of cache for index marks (index of MergeTree skip indices).
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
if (index_mark_cache_size > max_cache_size)
{
index_mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);
/// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", DEFAULT_MMAP_CACHE_MAX_SIZE);
if (mmap_cache_size > max_cache_size)
{
mmap_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (mmap_cache_size)
global_context->setMMappedFileCache(mmap_cache_size);
/// In Server.cpp (./clickhouse-server), we would initialize the query cache here.
/// Intentionally not doing this in clickhouse-local as it doesn't make sense.
#if USE_EMBEDDED_COMPILER
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
size_t compiled_expression_cache_size = config().getUInt64("compiled_expression_cache_size", compiled_expression_cache_size_default);
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
size_t compiled_expression_cache_elements_size
= config().getUInt64("compiled_expression_cache_elements_size", compiled_expression_cache_elements_size_default);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
size_t compiled_expression_cache_max_elements = config().getUInt64("compiled_expression_cache_elements_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
#endif
/// NOTE: it is important to apply any overrides before

View File

@ -365,17 +365,14 @@ static void transformFixedString(const UInt8 * src, UInt8 * dst, size_t size, UI
hash.update(seed);
hash.update(i);
const auto checksum = getSipHash128AsArray(hash);
if (size >= 16)
{
char * hash_dst = reinterpret_cast<char *>(std::min(pos, end - 16));
hash.get128(hash_dst);
auto * hash_dst = std::min(pos, end - 16);
memcpy(hash_dst, checksum.data(), checksum.size());
}
else
{
char value[16];
hash.get128(value);
memcpy(dst, value, end - dst);
}
memcpy(dst, checksum.data(), end - dst);
pos += 16;
++i;
@ -401,7 +398,7 @@ static void transformUUID(const UUID & src_uuid, UUID & dst_uuid, UInt64 seed)
hash.update(reinterpret_cast<const char *>(&src), sizeof(UUID));
/// Saving version and variant from an old UUID
hash.get128(reinterpret_cast<char *>(&dst));
dst = hash.get128();
dst.items[1] = (dst.items[1] & 0x1fffffffffffffffull) | (src.items[1] & 0xe000000000000000ull);
dst.items[0] = (dst.items[0] & 0xffffffffffff0fffull) | (src.items[0] & 0x000000000000f000ull);

View File

@ -29,6 +29,7 @@
#include <Common/ShellCommand.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <Common/formatReadable.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getExecutablePath.h>
@ -658,7 +659,7 @@ try
global_context->addWarningMessage("Server was built with sanitizer. It will work slowly.");
#endif
const auto memory_amount = getMemoryAmount();
const size_t memory_amount = getMemoryAmount();
LOG_INFO(log, "Available RAM: {}; physical cores: {}; logical cores: {}.",
formatReadableSizeWithBinarySuffix(memory_amount),
@ -1485,16 +1486,14 @@ try
/// Set up caches.
size_t max_cache_size = static_cast<size_t>(memory_amount * server_settings.cache_size_to_ram_max_ratio);
const size_t max_cache_size = static_cast<size_t>(memory_amount * server_settings.cache_size_to_ram_max_ratio);
String uncompressed_cache_policy = server_settings.uncompressed_cache_policy;
LOG_INFO(log, "Uncompressed cache policy name {}", uncompressed_cache_policy);
size_t uncompressed_cache_size = server_settings.uncompressed_cache_size;
if (uncompressed_cache_size > max_cache_size)
{
uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Uncompressed cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
LOG_INFO(log, "Lowered uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size);
@ -1513,39 +1512,59 @@ try
server_settings.async_insert_queue_flush_on_shutdown));
}
size_t mark_cache_size = server_settings.mark_cache_size;
String mark_cache_policy = server_settings.mark_cache_policy;
size_t mark_cache_size = server_settings.mark_cache_size;
if (!mark_cache_size)
LOG_ERROR(log, "Too low mark cache size will lead to severe performance degradation.");
if (mark_cache_size > max_cache_size)
{
mark_cache_size = max_cache_size;
LOG_INFO(log, "Mark cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(mark_cache_size));
LOG_INFO(log, "Lowered mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size));
}
global_context->setMarkCache(mark_cache_policy, mark_cache_size);
if (server_settings.index_uncompressed_cache_size)
size_t index_uncompressed_cache_size = server_settings.index_uncompressed_cache_size;
if (index_uncompressed_cache_size > max_cache_size)
{
index_uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(server_settings.index_uncompressed_cache_size);
if (server_settings.index_mark_cache_size)
size_t index_mark_cache_size = server_settings.index_mark_cache_size;
if (index_mark_cache_size > max_cache_size)
{
index_mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_mark_cache_size)
global_context->setIndexMarkCache(server_settings.index_mark_cache_size);
if (server_settings.mmap_cache_size)
size_t mmap_cache_size = server_settings.mmap_cache_size;
if (mmap_cache_size > max_cache_size)
{
mmap_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (mmap_cache_size)
global_context->setMMappedFileCache(server_settings.mmap_cache_size);
/// A cache for query results.
global_context->setQueryCache(config());
size_t query_cache_max_size_in_bytes = config().getUInt64("query_cache.max_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_SIZE);
size_t query_cache_max_entries = config().getUInt64("query_cache.max_entries", DEFAULT_QUERY_CACHE_MAX_ENTRIES);
size_t query_cache_query_cache_max_entry_size_in_bytes = config().getUInt64("query_cache.max_entry_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES);
size_t query_cache_max_entry_size_in_rows = config().getUInt64("query_cache.max_entry_rows_in_rows", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS);
if (query_cache_max_size_in_bytes > max_cache_size)
{
query_cache_max_size_in_bytes = max_cache_size;
LOG_INFO(log, "Lowered query cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setQueryCache(query_cache_max_size_in_bytes, query_cache_max_entries, query_cache_query_cache_max_entry_size_in_bytes, query_cache_max_entry_size_in_rows);
#if USE_EMBEDDED_COMPILER
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
size_t compiled_expression_cache_size = config().getUInt64("compiled_expression_cache_size", compiled_expression_cache_size_default);
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
size_t compiled_expression_cache_elements_size = config().getUInt64("compiled_expression_cache_elements_size", compiled_expression_cache_elements_size_default);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
size_t compiled_expression_cache_max_elements = config().getUInt64("compiled_expression_cache_elements_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
#endif
/// Set path for format schema files

View File

@ -315,10 +315,9 @@ struct Adder
{
StringRef value = column.getDataAt(row_num);
UInt128 key;
SipHash hash;
hash.update(value.data, value.size);
hash.get128(key);
const auto key = hash.get128();
data.set.template insert<const UInt128 &, use_single_level_hash_table>(key);
}

View File

@ -107,9 +107,7 @@ struct UniqVariadicHash<true, false>
++column;
}
UInt128 key;
hash.get128(key);
return key;
return hash.get128();
}
};
@ -131,9 +129,7 @@ struct UniqVariadicHash<true, true>
++column;
}
UInt128 key;
hash.get128(key);
return key;
return hash.get128();
}
};

View File

@ -20,7 +20,7 @@ struct QueryTreeNodeWithHash
{}
QueryTreeNodePtrType node = nullptr;
std::pair<UInt64, UInt64> hash;
CityHash_v1_0_2::uint128 hash;
};
template <typename T>
@ -55,6 +55,6 @@ struct std::hash<DB::QueryTreeNodeWithHash<T>>
{
size_t operator()(const DB::QueryTreeNodeWithHash<T> & node_with_hash) const
{
return node_with_hash.hash.first;
return node_with_hash.hash.low64;
}
};

View File

@ -229,10 +229,7 @@ IQueryTreeNode::Hash IQueryTreeNode::getTreeHash() const
}
}
Hash result;
hash_state.get128(result);
return result;
return getSipHash128AsPair(hash_state);
}
QueryTreeNodePtr IQueryTreeNode::clone() const

View File

@ -106,7 +106,7 @@ public:
*/
bool isEqual(const IQueryTreeNode & rhs, CompareOptions compare_options = { .compare_aliases = true }) const;
using Hash = std::pair<UInt64, UInt64>;
using Hash = CityHash_v1_0_2::uint128;
using HashState = SipHash;
/** Get tree hash identifying current tree

View File

@ -2033,7 +2033,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
auto & nearest_query_scope_query_node = nearest_query_scope->scope_node->as<QueryNode &>();
auto & mutable_context = nearest_query_scope_query_node.getMutableContext();
auto scalar_query_hash_string = std::to_string(node_with_hash.hash.first) + '_' + std::to_string(node_with_hash.hash.second);
auto scalar_query_hash_string = DB::toString(node_with_hash.hash);
if (mutable_context->hasQueryContext())
mutable_context->getQueryContext()->addScalar(scalar_query_hash_string, scalar_block);

View File

@ -105,6 +105,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int CANNOT_OPEN_FILE;
extern const int FILE_ALREADY_EXISTS;
extern const int USER_SESSION_LIMIT_EXCEEDED;
}
}
@ -846,7 +847,9 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
visitor.visit(parsed_query);
/// Get new query after substitutions.
query = serializeAST(*parsed_query);
if (visitor.getNumberOfReplacedParameters())
query = serializeAST(*parsed_query);
chassert(!query.empty());
}
if (allow_merge_tree_settings && parsed_query->as<ASTCreateQuery>())
@ -1331,7 +1334,9 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
visitor.visit(parsed_query);
/// Get new query after substitutions.
query = serializeAST(*parsed_query);
if (visitor.getNumberOfReplacedParameters())
query = serializeAST(*parsed_query);
chassert(!query.empty());
}
/// Process the query that requires transferring data blocks to the server.
@ -1810,7 +1815,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
}
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
{
const String & new_database = use_query->database;
const String & new_database = use_query->getDatabase();
/// If the client initiates the reconnection, it takes the settings from the config.
config().setString("database", new_database);
/// If the connection initiates the reconnection, it uses its variable.
@ -2408,6 +2413,13 @@ void ClientBase::runInteractive()
}
}
if (suggest && suggest->getLastError() == ErrorCodes::USER_SESSION_LIMIT_EXCEEDED)
{
// If a separate connection loading suggestions failed to open a new session,
// use the main session to receive them.
suggest->load(*connection, connection_parameters.timeouts, config().getInt("suggestion_limit"));
}
try
{
if (!processQueryText(input))

View File

@ -521,8 +521,7 @@ void QueryFuzzer::fuzzCreateQuery(ASTCreateQuery & create)
if (create.storage)
create.storage->updateTreeHash(sip_hash);
IAST::Hash hash;
sip_hash.get128(hash);
const auto hash = getSipHash128AsPair(sip_hash);
/// Save only tables with unique definition.
if (created_tables_hashes.insert(hash).second)

View File

@ -22,9 +22,11 @@ namespace DB
{
namespace ErrorCodes
{
extern const int OK;
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int DEADLOCK_AVOIDED;
extern const int USER_SESSION_LIMIT_EXCEEDED;
}
Suggest::Suggest()
@ -121,21 +123,24 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
}
catch (const Exception & e)
{
last_error = e.code();
if (e.code() == ErrorCodes::DEADLOCK_AVOIDED)
continue;
/// Client can successfully connect to the server and
/// get ErrorCodes::USER_SESSION_LIMIT_EXCEEDED for suggestion connection.
/// We should not use std::cerr here, because this method works concurrently with the main thread.
/// WriteBufferFromFileDescriptor will write directly to the file descriptor, avoiding data race on std::cerr.
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
out.next();
else if (e.code() != ErrorCodes::USER_SESSION_LIMIT_EXCEEDED)
{
/// We should not use std::cerr here, because this method works concurrently with the main thread.
/// WriteBufferFromFileDescriptor will write directly to the file descriptor, avoiding data race on std::cerr.
///
/// USER_SESSION_LIMIT_EXCEEDED is ignored here. The client will try to receive
/// suggestions using the main connection later.
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
out.next();
}
}
catch (...)
{
last_error = getCurrentExceptionCode();
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
out.next();
@ -148,6 +153,21 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
});
}
void Suggest::load(IServerConnection & connection,
const ConnectionTimeouts & timeouts,
Int32 suggestion_limit)
{
try
{
fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true));
}
catch (...)
{
std::cerr << "Suggestions loading exception: " << getCurrentExceptionMessage(false, true) << std::endl;
last_error = getCurrentExceptionCode();
}
}
void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
{
connection.sendQuery(
@ -176,6 +196,7 @@ void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & t
return;
case Protocol::Server::EndOfStream:
last_error = ErrorCodes::OK;
return;
default:

View File

@ -7,6 +7,7 @@
#include <Client/LocalConnection.h>
#include <Client/LineReader.h>
#include <IO/ConnectionTimeouts.h>
#include <atomic>
#include <thread>
@ -28,9 +29,15 @@ public:
template <typename ConnectionType>
void load(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit);
void load(IServerConnection & connection,
const ConnectionTimeouts & timeouts,
Int32 suggestion_limit);
/// Older server versions cannot execute the query loading suggestions.
static constexpr int MIN_SERVER_REVISION = DBMS_MIN_PROTOCOL_VERSION_WITH_VIEW_IF_PERMITTED;
int getLastError() const { return last_error.load(); }
private:
void fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query);
@ -38,6 +45,8 @@ private:
/// Words are fetched asynchronously.
std::thread loading_thread;
std::atomic<int> last_error { -1 };
};
}

View File

@ -524,7 +524,7 @@ void ColumnAggregateFunction::insertDefault()
pushBackAndCreateState(data, arena, func.get());
}
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin, const UInt8 *) const
{
WriteBufferFromArena out(arena, begin);
func->serialize(data[n], out, version);

View File

@ -162,7 +162,7 @@ public:
void insertDefault() override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * src_arena) override;

View File

@ -205,7 +205,7 @@ void ColumnArray::insertData(const char * pos, size_t length)
}
StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
size_t array_size = sizeAt(n);
size_t offset = offsetAt(n);

View File

@ -77,7 +77,7 @@ public:
StringRef getDataAt(size_t n) const override;
bool isDefaultAt(size_t n) const override;
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -88,7 +88,7 @@ public:
void insertData(const char *, size_t) override { throwMustBeDecompressed(); }
void insertDefault() override { throwMustBeDecompressed(); }
void popBack(size_t) override { throwMustBeDecompressed(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeDecompressed(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeDecompressed(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeDecompressed(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeDecompressed(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeDecompressed(); }

View File

@ -151,7 +151,7 @@ public:
s -= n;
}
StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin) const override
StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin, const UInt8 *) const override
{
return data->serializeValueIntoArena(0, arena, begin);
}

View File

@ -59,9 +59,26 @@ bool ColumnDecimal<T>::hasEqualValues() const
}
template <is_decimal T>
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
auto * pos = arena.allocContinue(sizeof(T), begin);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &data[n], sizeof(T));
return StringRef(pos, sizeof(T));
}

View File

@ -80,7 +80,7 @@ public:
Float64 getFloat64(size_t n) const final { return DecimalUtils::convertTo<Float64>(data[n], scale); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -86,11 +86,28 @@ void ColumnFixedString::insertData(const char * pos, size_t length)
memset(chars.data() + old_size + length, 0, n - length);
}
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
auto * pos = arena.allocContinue(n, begin);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &chars[n * index], n);
return StringRef(pos, n);
return res;
}
const char * ColumnFixedString::deserializeAndInsertFromArena(const char * pos)

View File

@ -115,7 +115,7 @@ public:
chars.resize_assume_reserved(chars.size() - n * elems);
}
StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -96,7 +96,7 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot insert into {}", getName());
}
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot serialize from {}", getName());
}

View File

@ -255,7 +255,7 @@ void ColumnLowCardinality::insertData(const char * pos, size_t length)
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertData(pos, length));
}
StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return getDictionary().serializeValueIntoArena(getIndexes().getUInt(n), arena, begin);
}

View File

@ -87,7 +87,7 @@ public:
void popBack(size_t n) override { idx.popBack(n); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -111,7 +111,7 @@ void ColumnMap::popBack(size_t n)
nested->popBack(n);
}
StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return nested->serializeValueIntoArena(n, arena, begin);
}

View File

@ -58,7 +58,7 @@ public:
void insert(const Field & x) override;
void insertDefault() override;
void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -4,6 +4,10 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/WeakHash.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsDateTime.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
@ -34,6 +38,7 @@ ColumnNullable::ColumnNullable(MutableColumnPtr && nested_column_, MutableColumn
{
/// ColumnNullable cannot have constant nested column. But constant argument could be passed. Materialize it.
nested_column = getNestedColumn().convertToFullColumnIfConst();
nested_type = nested_column->getDataType();
if (!getNestedColumn().canBeInsideNullable())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "{} cannot be inside Nullable column", getNestedColumn().getName());
@ -134,21 +139,77 @@ void ColumnNullable::insertData(const char * pos, size_t length)
}
}
StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
const auto & arr = getNullMapData();
static constexpr auto s = sizeof(arr[0]);
char * pos;
auto * pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);
if (arr[n])
return StringRef(pos, s);
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
switch (nested_type)
{
case TypeIndex::UInt8:
return static_cast<const ColumnUInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt16:
return static_cast<const ColumnUInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt32:
return static_cast<const ColumnUInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt64:
return static_cast<const ColumnUInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt128:
return static_cast<const ColumnUInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt256:
return static_cast<const ColumnUInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int8:
return static_cast<const ColumnInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int16:
return static_cast<const ColumnInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int32:
return static_cast<const ColumnInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int64:
return static_cast<const ColumnInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int128:
return static_cast<const ColumnInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int256:
return static_cast<const ColumnInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float32:
return static_cast<const ColumnFloat32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float64:
return static_cast<const ColumnFloat64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date:
return static_cast<const ColumnDate *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date32:
return static_cast<const ColumnDate32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime:
return static_cast<const ColumnDateTime *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime64:
return static_cast<const ColumnDateTime64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::String:
return static_cast<const ColumnString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::FixedString:
return static_cast<const ColumnFixedString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal32:
return static_cast<const ColumnDecimal<Decimal32> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal64:
return static_cast<const ColumnDecimal<Decimal64> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal128:
return static_cast<const ColumnDecimal<Decimal128> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal256:
return static_cast<const ColumnDecimal<Decimal256> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UUID:
return static_cast<const ColumnUUID *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv4:
return static_cast<const ColumnIPv4 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv6:
return static_cast<const ColumnIPv6 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
default:
pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);
if (arr[n])
return StringRef(pos, s);
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
}
}
const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos)

View File

@ -6,6 +6,7 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include "Core/TypeId.h"
#include "config.h"
@ -62,7 +63,7 @@ public:
StringRef getDataAt(size_t) const override;
/// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
@ -212,6 +213,8 @@ public:
private:
WrappedPtr nested_column;
WrappedPtr null_map;
// optimize serializeValueIntoArena
TypeIndex nested_type;
template <bool negative>
void applyNullMapImpl(const NullMap & map);

View File

@ -244,7 +244,7 @@ public:
StringRef getDataAt(size_t) const override { throwMustBeConcrete(); }
bool isDefaultAt(size_t) const override { throwMustBeConcrete(); }
void insertData(const char *, size_t) override { throwMustBeConcrete(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeConcrete(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeConcrete(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeConcrete(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeConcrete(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeConcrete(); }

View File

@ -150,7 +150,7 @@ void ColumnSparse::insertData(const char * pos, size_t length)
insertSingleValue([&](IColumn & column) { column.insertData(pos, length); });
}
StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return values->serializeValueIntoArena(getValueIndex(n), arena, begin);
}

View File

@ -78,7 +78,7 @@ public:
/// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char *) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;

View File

@ -213,17 +213,30 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
}
StringRef ColumnString::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnString::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
size_t string_size = sizeAt(n);
size_t offset = offsetAt(n);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
res.size = sizeof(string_size) + string_size;
char * pos = arena.allocContinue(res.size, begin);
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(string_size) + string_size;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(string_size) + string_size;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &string_size, sizeof(string_size));
memcpy(pos + sizeof(string_size), &chars[offset], string_size);
res.data = pos;
return res;
}

View File

@ -11,6 +11,7 @@
#include <Common/memcmpSmall.h>
#include <Common/assert_cast.h>
#include <Core/Field.h>
#include <Common/Arena.h>
class Collator;
@ -168,7 +169,7 @@ public:
offsets.resize_assume_reserved(offsets.size() - n);
}
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -171,7 +171,7 @@ void ColumnTuple::popBack(size_t n)
column->popBack(n);
}
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
StringRef res(begin, 0);
for (const auto & column : columns)

View File

@ -61,7 +61,7 @@ public:
void insertFrom(const IColumn & src_, size_t n) override;
void insertDefault() override;
void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -79,7 +79,7 @@ public:
Float32 getFloat32(size_t n) const override { return getNestedColumn()->getFloat32(n); }
bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); }
bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash_func) const override
{
@ -373,7 +373,7 @@ size_t ColumnUnique<ColumnType>::uniqueInsertData(const char * pos, size_t lengt
}
template <typename ColumnType>
StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
if (is_nullable)
{
@ -670,8 +670,9 @@ UInt128 ColumnUnique<ColumnType>::IncrementalHash::getHash(const ColumnType & co
for (size_t i = 0; i < column_size; ++i)
column.updateHashWithValue(i, sip_hash);
hash = sip_hash.get128();
std::lock_guard lock(mutex);
sip_hash.get128(hash);
cur_hash = hash;
num_added_rows.store(column_size);
}

View File

@ -49,11 +49,28 @@ namespace ErrorCodes
}
template <typename T>
StringRef ColumnVector<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnVector<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
auto * pos = arena.allocContinue(sizeof(T), begin);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
unalignedStore<T>(pos, data[n]);
return StringRef(pos, sizeof(T));
return res;
}
template <typename T>

View File

@ -174,7 +174,7 @@ public:
data.resize_assume_reserved(data.size() - n);
}
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -218,7 +218,7 @@ public:
* For example, to obtain unambiguous representation of Array of strings, strings data should be interleaved with their sizes.
* Parameter begin should be used with Arena::allocContinue.
*/
virtual StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const = 0;
virtual StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit = nullptr) const = 0;
/// Deserializes a value that was serialized using IColumn::serializeValueIntoArena method.
/// Returns pointer to the position after the read data.

View File

@ -57,7 +57,7 @@ public:
++s;
}
StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin) const override
StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin, const UInt8 *) const override
{
/// Has to put one useless byte into Arena, because serialization into zero number of bytes is ambiguous.
char * res = arena.allocContinue(1, begin);

View File

@ -117,7 +117,7 @@ void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const
const char * pos = nullptr;
for (size_t i = 0; i < num_values; ++i)
{
auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos);
auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos, nullptr);
const char * new_pos;
column_unique->uniqueDeserializeAndInsertFromArena(ref.data, new_pos);
ASSERT_EQ(new_pos - ref.data, ref.size) << "Deserialized data has different sizes at position " << i;
@ -140,8 +140,8 @@ void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const
const char * pos_lc = nullptr;
for (size_t i = 0; i < num_values; ++i)
{
auto ref_string = column.serializeValueIntoArena(i, arena_string, pos_string);
auto ref_lc = column_unique->serializeValueIntoArena(idx->getUInt(i), arena_lc, pos_lc);
auto ref_string = column.serializeValueIntoArena(i, arena_string, pos_string, nullptr);
auto ref_lc = column_unique->serializeValueIntoArena(idx->getUInt(i), arena_lc, pos_lc, nullptr);
ASSERT_EQ(ref_string, ref_lc) << "Serialized data is different from pattern at position " << i;
}
}

View File

@ -51,10 +51,11 @@ public:
{
auto on_weight_loss_function = [&](size_t weight_loss) { onRemoveOverflowWeightLoss(weight_loss); };
static constexpr std::string_view default_cache_policy = "SLRU";
if (cache_policy_name.empty())
{
static constexpr auto default_cache_policy = "SLRU";
cache_policy_name = default_cache_policy;
}
if (cache_policy_name == "LRU")
{

View File

@ -13,6 +13,8 @@
* (~ 700 MB/sec, 15 million strings per second)
*/
#include "TransformEndianness.hpp"
#include <bit>
#include <string>
#include <type_traits>
@ -22,14 +24,12 @@
#include <base/unaligned.h>
#include <Common/Exception.h>
#include <city.h>
namespace DB
{
namespace ErrorCodes
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
#define SIPROUND \
do \
@ -161,71 +161,50 @@ public:
}
}
template <typename T>
template <typename Transform = void, typename T>
ALWAYS_INLINE void update(const T & x)
{
if constexpr (std::endian::native == std::endian::big)
{
T rev_x = x;
char *start = reinterpret_cast<char *>(&rev_x);
char *end = start + sizeof(T);
std::reverse(start, end);
update(reinterpret_cast<const char *>(&rev_x), sizeof(rev_x)); /// NOLINT
auto transformed_x = x;
if constexpr (!std::is_same_v<Transform, void>)
transformed_x = Transform()(x);
else
DB::transformEndianness<std::endian::little>(transformed_x);
update(reinterpret_cast<const char *>(&transformed_x), sizeof(transformed_x)); /// NOLINT
}
else
update(reinterpret_cast<const char *>(&x), sizeof(x)); /// NOLINT
}
ALWAYS_INLINE void update(const std::string & x)
{
update(x.data(), x.length());
}
ALWAYS_INLINE void update(const std::string & x) { update(x.data(), x.length()); }
ALWAYS_INLINE void update(const std::string_view x) { update(x.data(), x.size()); }
ALWAYS_INLINE void update(const char * s) { update(std::string_view(s)); }
ALWAYS_INLINE void update(const std::string_view x)
{
update(x.data(), x.size());
}
/// Get the result in some form. This can only be done once!
ALWAYS_INLINE void get128(char * out)
{
finalize();
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
unalignedStore<UInt64>(out + 8, v0 ^ v1);
unalignedStore<UInt64>(out, v2 ^ v3);
#else
unalignedStore<UInt64>(out, v0 ^ v1);
unalignedStore<UInt64>(out + 8, v2 ^ v3);
#endif
}
template <typename T>
ALWAYS_INLINE void get128(T & lo, T & hi)
{
static_assert(sizeof(T) == 8);
finalize();
lo = v0 ^ v1;
hi = v2 ^ v3;
}
template <typename T>
ALWAYS_INLINE void get128(T & dst)
{
static_assert(sizeof(T) == 16);
get128(reinterpret_cast<char *>(&dst));
}
UInt64 get64()
ALWAYS_INLINE UInt64 get64()
{
finalize();
return v0 ^ v1 ^ v2 ^ v3;
}
UInt128 get128()
template <typename T>
requires (sizeof(T) == 8)
ALWAYS_INLINE void get128(T & lo, T & hi)
{
finalize();
lo = v0 ^ v1;
hi = v2 ^ v3;
}
ALWAYS_INLINE UInt128 get128()
{
UInt128 res;
get128(res);
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
get128(res.items[1], res.items[0]);
#else
get128(res.items[0], res.items[1]);
#endif
return res;
}
@ -247,9 +226,7 @@ public:
{
lo = std::byteswap(lo);
hi = std::byteswap(hi);
auto tmp = hi;
hi = lo;
lo = tmp;
std::swap(lo, hi);
}
UInt128 res = hi;
@ -265,11 +242,18 @@ public:
#include <cstddef>
inline void sipHash128(const char * data, const size_t size, char * out)
inline std::array<char, 16> getSipHash128AsArray(SipHash & sip_hash)
{
SipHash hash;
hash.update(data, size);
hash.get128(out);
std::array<char, 16> arr;
*reinterpret_cast<UInt128*>(arr.data()) = sip_hash.get128();
return arr;
}
inline CityHash_v1_0_2::uint128 getSipHash128AsPair(SipHash & sip_hash)
{
CityHash_v1_0_2::uint128 result;
sip_hash.get128(result.low64, result.high64);
return result;
}
inline UInt128 sipHash128Keyed(UInt64 key0, UInt64 key1, const char * data, const size_t size)
@ -309,7 +293,7 @@ inline UInt64 sipHash64(const char * data, const size_t size)
}
template <typename T>
UInt64 sipHash64(const T & x)
inline UInt64 sipHash64(const T & x)
{
SipHash hash;
hash.update(x);

View File

@ -2,24 +2,27 @@
#include <base/Decimal_fwd.h>
#include <base/extended_types.h>
#include <base/strong_typedef.h>
#include <city.h>
#include <utility>
namespace DB
{
template <std::endian endian, typename T>
template <std::endian ToEndian, std::endian FromEndian = std::endian::native, typename T>
requires std::is_integral_v<T>
inline void transformEndianness(T & value)
{
if constexpr (endian != std::endian::native)
if constexpr (ToEndian != FromEndian)
value = std::byteswap(value);
}
template <std::endian endian, typename T>
template <std::endian ToEndian, std::endian FromEndian = std::endian::native, typename T>
requires is_big_int_v<T>
inline void transformEndianness(T & x)
{
if constexpr (std::endian::native != endian)
if constexpr (ToEndian != FromEndian)
{
auto & items = x.items;
std::transform(std::begin(items), std::end(items), std::begin(items), [](auto & item) { return std::byteswap(item); });
@ -27,42 +30,49 @@ inline void transformEndianness(T & x)
}
}
template <std::endian endian, typename T>
template <std::endian ToEndian, std::endian FromEndian = std::endian::native, typename T>
requires is_decimal<T>
inline void transformEndianness(T & x)
{
transformEndianness<endian>(x.value);
transformEndianness<ToEndian, FromEndian>(x.value);
}
template <std::endian endian, typename T>
template <std::endian ToEndian, std::endian FromEndian = std::endian::native, typename T>
requires std::is_floating_point_v<T>
inline void transformEndianness(T & value)
{
if constexpr (std::endian::native != endian)
if constexpr (ToEndian != FromEndian)
{
auto * start = reinterpret_cast<std::byte *>(&value);
std::reverse(start, start + sizeof(T));
}
}
template <std::endian endian, typename T>
requires std::is_scoped_enum_v<T>
template <std::endian ToEndian, std::endian FromEndian = std::endian::native, typename T>
requires std::is_enum_v<T> || std::is_scoped_enum_v<T>
inline void transformEndianness(T & x)
{
using UnderlyingType = std::underlying_type_t<T>;
transformEndianness<endian>(reinterpret_cast<UnderlyingType &>(x));
transformEndianness<ToEndian, FromEndian>(reinterpret_cast<UnderlyingType &>(x));
}
template <std::endian endian, typename A, typename B>
template <std::endian ToEndian, std::endian FromEndian = std::endian::native, typename A, typename B>
inline void transformEndianness(std::pair<A, B> & pair)
{
transformEndianness<endian>(pair.first);
transformEndianness<endian>(pair.second);
transformEndianness<ToEndian, FromEndian>(pair.first);
transformEndianness<ToEndian, FromEndian>(pair.second);
}
template <std::endian endian, typename T, typename Tag>
template <std::endian ToEndian, std::endian FromEndian = std::endian::native, typename T, typename Tag>
inline void transformEndianness(StrongTypedef<T, Tag> & x)
{
transformEndianness<endian>(x.toUnderType());
transformEndianness<ToEndian, FromEndian>(x.toUnderType());
}
template <std::endian ToEndian, std::endian FromEndian = std::endian::native>
inline void transformEndianness(CityHash_v1_0_2::uint128 & x)
{
transformEndianness<ToEndian, FromEndian>(x.low64);
transformEndianness<ToEndian, FromEndian>(x.high64);
}
}

View File

@ -152,7 +152,7 @@ void ZooKeeper::init(ZooKeeperArgs args_)
throw KeeperException(code, "/");
if (code == Coordination::Error::ZNONODE)
throw KeeperException("ZooKeeper root doesn't exist. You should create root node " + args.chroot + " before start.", Coordination::Error::ZNONODE);
throw KeeperException(Coordination::Error::ZNONODE, "ZooKeeper root doesn't exist. You should create root node {} before start.", args.chroot);
}
}
@ -491,7 +491,7 @@ std::string ZooKeeper::get(const std::string & path, Coordination::Stat * stat,
if (tryGet(path, res, stat, watch, &code))
return res;
else
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
throw KeeperException(code, "Can't get data for node '{}': node doesn't exist", path);
}
std::string ZooKeeper::getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
@ -501,7 +501,7 @@ std::string ZooKeeper::getWatch(const std::string & path, Coordination::Stat * s
if (tryGetWatch(path, res, stat, watch_callback, &code))
return res;
else
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
throw KeeperException(code, "Can't get data for node '{}': node doesn't exist", path);
}
bool ZooKeeper::tryGet(

View File

@ -213,7 +213,7 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
};
}
else
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::Error::ZBADARGUMENTS);
throw KeeperException(Coordination::Error::ZBADARGUMENTS, "Unknown key {} in config file", key);
}
}

View File

@ -94,7 +94,8 @@ int main(int, char **)
{
SipHash hash;
hash.update(strings[i].data(), strings[i].size());
hash.get128(&hashes[i * 16]);
const auto hashed_value = getSipHash128AsArray(hash);
memcpy(&hashes[i * 16], hashed_value.data(), hashed_value.size());
}
watch.stop();

View File

@ -37,8 +37,7 @@ SipHash getHashOfLoadedBinary()
std::string getHashOfLoadedBinaryHex()
{
SipHash hash = getHashOfLoadedBinary();
UInt128 checksum;
hash.get128(checksum);
const auto checksum = hash.get128();
return getHexUIntUppercase(checksum);
}

View File

@ -39,7 +39,7 @@ DB::UInt64 randomSeed()
#if defined(__linux__)
struct utsname sysinfo;
if (uname(&sysinfo) == 0)
hash.update(sysinfo);
hash.update<std::identity>(sysinfo);
#endif
return hash.get64();

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/defines.h>
#include <base/unit.h>
#define DBMS_DEFAULT_PORT 9000
#define DBMS_DEFAULT_SECURE_PORT 9440
@ -64,6 +65,21 @@
/// Max depth of hierarchical dictionary
#define DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH 1000
/// Default maximum (total and entry) sizes and policies of various caches
static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_MARK_CACHE_MAX_SIZE = 5368_MiB;
static constexpr auto DEFAULT_MARK_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_MMAP_CACHE_MAX_SIZE = 1_KiB; /// chosen by rolling dice
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE = 128_MiB;
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES = 10'000;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_SIZE = 1_GiB;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRIES = 1024uz;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES = 1_MiB;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS = 30'000'000uz;
/// Query profiler cannot work with sanitizers.
/// Sanitizers are using quick "frame walking" stack unwinding (this implies -fno-omit-frame-pointer)
/// And they do unwinding frequently (on every malloc/free, thread/mutex operations, etc).

View File

@ -2,6 +2,7 @@
#include <Core/BaseSettings.h>
#include <Core/Defines.h>
namespace Poco::Util
@ -56,13 +57,13 @@ namespace DB
M(UInt64, max_concurrent_select_queries, 0, "Limit on total number of concurrently select queries. Zero means Unlimited.", 0) \
\
M(Double, cache_size_to_ram_max_ratio, 0.5, "Set cache size ro ram max ratio. Allows to lower cache size on low-memory systems.", 0) \
M(String, uncompressed_cache_policy, "SLRU", "Uncompressed cache policy name.", 0) \
M(UInt64, uncompressed_cache_size, 0, "Size of cache for uncompressed blocks. Zero means disabled.", 0) \
M(UInt64, mark_cache_size, 5368709120, "Size of cache for marks (index of MergeTree family of tables).", 0) \
M(String, mark_cache_policy, "SLRU", "Mark cache policy name.", 0) \
M(UInt64, index_uncompressed_cache_size, 0, "Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.", 0) \
M(UInt64, index_mark_cache_size, 0, "Size of cache for index marks. Zero means disabled.", 0) \
M(UInt64, mmap_cache_size, 1000, "A cache for mmapped files.", 0) /* The choice of default is arbitrary. */ \
M(String, uncompressed_cache_policy, DEFAULT_UNCOMPRESSED_CACHE_POLICY, "Uncompressed cache policy name.", 0) \
M(UInt64, uncompressed_cache_size, DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks. Zero means disabled.", 0) \
M(UInt64, mark_cache_size, DEFAULT_MARK_CACHE_MAX_SIZE, "Size of cache for marks (index of MergeTree family of tables).", 0) \
M(String, mark_cache_policy, DEFAULT_MARK_CACHE_POLICY, "Mark cache policy name.", 0) \
M(UInt64, index_uncompressed_cache_size, DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.", 0) \
M(UInt64, index_mark_cache_size, DEFAULT_INDEX_MARK_CACHE_MAX_SIZE, "Size of cache for index marks. Zero means disabled.", 0) \
M(UInt64, mmap_cache_size, DEFAULT_MMAP_CACHE_MAX_SIZE, "A cache for mmapped files.", 0) \
\
M(Bool, disable_internal_dns_cache, false, "Disable internal DNS caching at all.", 0) \
M(Int32, dns_cache_update_period, 15, "Internal DNS cache update period in seconds.", 0) \

View File

@ -133,8 +133,7 @@ void compileSortDescriptionIfNeeded(SortDescription & description, const DataTyp
SipHash sort_description_dump_hash;
sort_description_dump_hash.update(description_dump);
UInt128 sort_description_hash_key;
sort_description_dump_hash.get128(sort_description_hash_key);
const auto sort_description_hash_key = sort_description_dump_hash.get128();
{
std::lock_guard lock(mutex);

View File

@ -65,9 +65,7 @@ UInt128 PathInData::getPartsHash(const Parts::const_iterator & begin, const Part
hash.update(part_it->anonymous_array_level);
}
UInt128 res;
hash.get128(res);
return res;
return hash.get128();
}
void PathInData::buildPath(const Parts & other_parts)

View File

@ -10,6 +10,8 @@
#include <Formats/ProtobufReader.h>
#include <Core/Field.h>
#include <ranges>
namespace DB
{
@ -135,13 +137,25 @@ template <typename T>
void SerializationNumber<T>::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
{
const typename ColumnVector<T>::Container & x = typeid_cast<const ColumnVector<T> &>(column).getData();
size_t size = x.size();
if (limit == 0 || offset + limit > size)
if (const size_t size = x.size(); limit == 0 || offset + limit > size)
limit = size - offset;
if (limit)
if (limit == 0)
return;
if constexpr (std::endian::native == std::endian::big && sizeof(T) >= 2)
{
static constexpr auto to_little_endian = [](auto i)
{
transformEndianness<std::endian::little>(i);
return i;
};
std::ranges::for_each(
x | std::views::drop(offset) | std::views::take(limit) | std::views::transform(to_little_endian),
[&ostr](const auto & i) { ostr.write(reinterpret_cast<const char *>(&i), sizeof(typename ColumnVector<T>::ValueType)); });
}
else
ostr.write(reinterpret_cast<const char *>(&x[offset]), sizeof(typename ColumnVector<T>::ValueType) * limit);
}
@ -149,10 +163,13 @@ template <typename T>
void SerializationNumber<T>::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double /*avg_value_size_hint*/) const
{
typename ColumnVector<T>::Container & x = typeid_cast<ColumnVector<T> &>(column).getData();
size_t initial_size = x.size();
const size_t initial_size = x.size();
x.resize(initial_size + limit);
size_t size = istr.readBig(reinterpret_cast<char*>(&x[initial_size]), sizeof(typename ColumnVector<T>::ValueType) * limit);
const size_t size = istr.readBig(reinterpret_cast<char*>(&x[initial_size]), sizeof(typename ColumnVector<T>::ValueType) * limit);
x.resize(initial_size + size / sizeof(typename ColumnVector<T>::ValueType));
if constexpr (std::endian::native == std::endian::big && sizeof(T) >= 2)
std::ranges::for_each(x | std::views::drop(initial_size), [](auto & i) { transformEndianness<std::endian::big, std::endian::little>(i); });
}
template class SerializationNumber<UInt8>;

View File

@ -20,8 +20,8 @@ using FunctionCreator = std::function<FunctionOverloadResolverPtr(ContextPtr)>;
using FunctionFactoryData = std::pair<FunctionCreator, FunctionDocumentation>;
/** Creates function by name.
* Function could use for initialization (take ownership of shared_ptr, for example)
* some dictionaries from Context.
* The provided Context is guaranteed to outlive the created function. Functions may use it for
* things like settings, current database, permission checks, etc.
*/
class FunctionFactory : private boost::noncopyable, public IFactoryWithAliases<FunctionFactoryData>
{

View File

@ -62,13 +62,14 @@ namespace ErrorCodes
*/
class FunctionDictHelper
class FunctionDictHelper : WithContext
{
public:
explicit FunctionDictHelper(ContextPtr context_) : current_context(context_) {}
explicit FunctionDictHelper(ContextPtr context_) : WithContext(context_) {}
std::shared_ptr<const IDictionary> getDictionary(const String & dictionary_name)
{
auto current_context = getContext();
auto dict = current_context->getExternalDictionariesLoader().getDictionary(dictionary_name, current_context);
if (!access_checked)
@ -131,12 +132,10 @@ public:
DictionaryStructure getDictionaryStructure(const String & dictionary_name) const
{
return current_context->getExternalDictionariesLoader().getDictionaryStructure(dictionary_name, current_context);
return getContext()->getExternalDictionariesLoader().getDictionaryStructure(dictionary_name, getContext());
}
private:
ContextPtr current_context;
/// Access cannot be not granted, since in this case checkAccess() will throw and access_checked will not be updated.
std::atomic<bool> access_checked = false;

View File

@ -1374,8 +1374,8 @@ public:
if constexpr (std::is_same_v<ToType, UInt128>) /// backward-compatible
{
if (std::endian::native == std::endian::big)
std::ranges::for_each(col_to->getData(), transformEndianness<std::endian::little, ToType>);
if constexpr (std::endian::native == std::endian::big)
std::ranges::for_each(col_to->getData(), transformEndianness<std::endian::little, std::endian::native, ToType>);
auto col_to_fixed_string = ColumnFixedString::create(sizeof(UInt128));
const auto & data = col_to->getData();

View File

@ -336,7 +336,7 @@ private:
template <typename Name, template<typename> typename Impl>
class ExecutableFunctionJSON : public IExecutableFunction, WithContext
class ExecutableFunctionJSON : public IExecutableFunction
{
public:

View File

@ -635,9 +635,7 @@ UInt128 sipHash128(Polygon && polygon)
for (auto & inner : inners)
hash_ring(inner);
UInt128 res;
hash.get128(res);
return res;
return hash.get128();
}
}

View File

@ -268,10 +268,9 @@ void FunctionArrayDistinct::executeHashed(
if (nullable_col && (*src_null_map)[j])
continue;
UInt128 hash;
SipHash hash_function;
src_data.updateHashWithValue(j, hash_function);
hash_function.get128(hash);
const auto hash = hash_function.get128();
if (!set.find(hash))
{

View File

@ -133,18 +133,14 @@ private:
/// Hash a set of keys into a UInt128 value.
static inline UInt128 ALWAYS_INLINE hash128depths(const std::vector<size_t> & indices, const ColumnRawPtrs & key_columns)
{
UInt128 key;
SipHash hash;
for (size_t j = 0, keys_size = key_columns.size(); j < keys_size; ++j)
{
// Debug: const auto & field = (*key_columns[j])[indices[j]]; DUMP(j, indices[j], field);
key_columns[j]->updateHashWithValue(indices[j], hash);
}
hash.get128(key);
return key;
return hash.get128();
}

View File

@ -776,8 +776,12 @@ namespace
UInt64 key = 0;
auto * dst = reinterpret_cast<char *>(&key);
const auto ref = cache.from_column->getDataAt(i);
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunreachable-code"
if constexpr (std::endian::native == std::endian::big)
dst += sizeof(key) - ref.size;
#pragma clang diagnostic pop
memcpy(dst, ref.data, ref.size);
table[key] = i;

View File

@ -33,15 +33,12 @@ public:
/// Calculate key from path to file and offset.
static UInt128 hash(const String & path_to_file, size_t offset, ssize_t length = -1)
{
UInt128 key;
SipHash hash;
hash.update(path_to_file.data(), path_to_file.size() + 1);
hash.update(offset);
hash.update(length);
hash.get128(key);
return key;
return hash.get128();
}
template <typename LoadFunc>

View File

@ -188,7 +188,7 @@ Client::Client(
}
}
LOG_TRACE(log, "API mode: {}", toString(api_mode));
LOG_TRACE(log, "API mode of the S3 client: {}", api_mode);
detect_region = provider_type == ProviderType::AWS && explicit_region == Aws::Region::AWS_GLOBAL;

View File

@ -51,14 +51,11 @@ public:
/// Calculate key from path to file and offset.
static UInt128 hash(const String & path_to_file, size_t offset)
{
UInt128 key;
SipHash hash;
hash.update(path_to_file.data(), path_to_file.size() + 1);
hash.update(offset);
hash.get128(key);
return key;
return hash.get128();
}
template <typename LoadFunc>

View File

@ -1170,6 +1170,15 @@ inline String toString(const T & x)
return buf.str();
}
inline String toString(const CityHash_v1_0_2::uint128 & hash)
{
WriteBufferFromOwnString buf;
writeText(hash.low64, buf);
writeChar('_', buf);
writeText(hash.high64, buf);
return buf.str();
}
template <typename T>
inline String toStringWithFinalSeparator(const std::vector<T> & x, const String & final_sep)
{

View File

@ -253,15 +253,11 @@ static inline T ALWAYS_INLINE packFixed(
static inline UInt128 ALWAYS_INLINE hash128( /// NOLINT
size_t i, size_t keys_size, const ColumnRawPtrs & key_columns)
{
UInt128 key;
SipHash hash;
for (size_t j = 0; j < keys_size; ++j)
key_columns[j]->updateHashWithValue(i, hash);
hash.get128(key);
return key;
return hash.get128();
}
/** Serialize keys into a continuous chunk of memory.

View File

@ -694,8 +694,7 @@ void Aggregator::compileAggregateFunctionsIfNeeded()
SipHash aggregate_functions_description_hash;
aggregate_functions_description_hash.update(functions_description);
UInt128 aggregate_functions_description_hash_key;
aggregate_functions_description_hash.get128(aggregate_functions_description_hash_key);
const auto aggregate_functions_description_hash_key = aggregate_functions_description_hash.get128();
{
std::lock_guard<std::mutex> lock(mutex);

View File

@ -105,9 +105,7 @@ UInt128 AsynchronousInsertQueue::InsertQuery::calculateHash() const
applyVisitor(FieldVisitorHash(siphash), setting.getValue());
}
UInt128 res;
siphash.get128(res);
return res;
return siphash.get128();
}
bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other) const

View File

@ -493,7 +493,6 @@ void QueryCache::reset()
cache.reset();
std::lock_guard lock(mutex);
times_executed.clear();
cache_size_in_bytes = 0;
}
size_t QueryCache::weight() const
@ -511,7 +510,7 @@ size_t QueryCache::recordQueryRun(const Key & key)
std::lock_guard lock(mutex);
size_t times = ++times_executed[key];
// Regularly drop times_executed to avoid DOS-by-unlimited-growth.
static constexpr size_t TIMES_EXECUTED_MAX_SIZE = 10'000;
static constexpr auto TIMES_EXECUTED_MAX_SIZE = 10'000uz;
if (times_executed.size() > TIMES_EXECUTED_MAX_SIZE)
times_executed.clear();
return times;
@ -522,23 +521,19 @@ std::vector<QueryCache::Cache::KeyMapped> QueryCache::dump() const
return cache.dump();
}
QueryCache::QueryCache()
QueryCache::QueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_)
: cache(std::make_unique<TTLCachePolicy<Key, Entry, KeyHasher, QueryCacheEntryWeight, IsStale>>(std::make_unique<PerUserTTLCachePolicyUserQuota>()))
{
updateConfiguration(max_size_in_bytes, max_entries, max_entry_size_in_bytes_, max_entry_size_in_rows_);
}
void QueryCache::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
void QueryCache::updateConfiguration(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_)
{
std::lock_guard lock(mutex);
size_t max_size_in_bytes = config.getUInt64("query_cache.max_size_in_bytes", 1_GiB);
cache.setMaxSize(max_size_in_bytes);
size_t max_entries = config.getUInt64("query_cache.max_entries", 1024);
cache.setMaxCount(max_entries);
max_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size_in_bytes", 1_MiB);
max_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows_in_rows", 30'000'000);
max_entry_size_in_bytes = max_entry_size_in_bytes_;
max_entry_size_in_rows = max_entry_size_in_rows_;
}
}

View File

@ -4,7 +4,6 @@
#include <Core/Block.h>
#include <Parsers/IAST_fwd.h>
#include <Processors/Sources/SourceFromChunks.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Processors/Chunk.h>
#include <QueryPipeline/Pipe.h>
@ -110,9 +109,6 @@ private:
/// query --> query result
using Cache = CacheBase<Key, Entry, KeyHasher, QueryCacheEntryWeight>;
/// query --> query execution count
using TimesExecuted = std::unordered_map<Key, size_t, KeyHasher>;
public:
/// Buffers multiple partial query result chunks (buffer()) and eventually stores them as cache entry (finalizeWrite()).
///
@ -177,9 +173,9 @@ public:
friend class QueryCache; /// for createReader()
};
QueryCache();
QueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_);
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);
void updateConfiguration(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_);
Reader createReader(const Key & key);
Writer createWriter(const Key & key, std::chrono::milliseconds min_query_runtime, bool squash_partial_results, size_t max_block_size, size_t max_query_cache_size_in_bytes_quota, size_t max_query_cache_entries_quota);
@ -199,14 +195,15 @@ private:
Cache cache; /// has its own locking --> not protected by mutex
mutable std::mutex mutex;
/// query --> query execution count
using TimesExecuted = std::unordered_map<Key, size_t, KeyHasher>;
TimesExecuted times_executed TSA_GUARDED_BY(mutex);
/// Cache configuration
size_t max_entry_size_in_bytes TSA_GUARDED_BY(mutex) = 0;
size_t max_entry_size_in_rows TSA_GUARDED_BY(mutex) = 0;
size_t cache_size_in_bytes TSA_GUARDED_BY(mutex) = 0; /// Updated in each cache insert/delete
friend class StorageSystemQueryCache;
};

View File

@ -60,9 +60,6 @@ public:
/// (When there is a local replica with big delay).
bool lazy = false;
time_t local_delay = 0;
/// Set only if parallel reading from replicas is used.
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator;
};
using Shards = std::vector<Shard>;

View File

@ -28,7 +28,6 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int LOGICAL_ERROR;
extern const int SUPPORT_IS_DISABLED;
}
@ -281,7 +280,6 @@ void executeQueryWithParallelReplicas(
auto all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), new_cluster->getShardCount());
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(all_replicas_count);
auto remote_plan = std::make_unique<QueryPlan>();
auto plans = std::vector<QueryPlanPtr>();
/// This is a little bit weird, but we construct an "empty" coordinator without
/// any specified reading/coordination method (like Default, InOrder, InReverseOrder)
@ -309,20 +307,7 @@ void executeQueryWithParallelReplicas(
&Poco::Logger::get("ReadFromParallelRemoteReplicasStep"),
query_info.storage_limits);
remote_plan->addStep(std::move(read_from_remote));
remote_plan->addInterpreterContext(context);
plans.emplace_back(std::move(remote_plan));
if (std::all_of(plans.begin(), plans.end(), [](const QueryPlanPtr & plan) { return !plan; }))
throw Exception(ErrorCodes::LOGICAL_ERROR, "No plans were generated for reading from shard. This is a bug");
DataStreams input_streams;
input_streams.reserve(plans.size());
for (const auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
query_plan.unitePlans(std::move(union_step), std::move(plans));
query_plan.addStep(std::move(read_from_remote));
}
}

View File

@ -118,7 +118,7 @@ private:
{
size_t operator() (const IAST::Hash & hash) const
{
return hash.first;
return hash.low64;
}
};

View File

@ -245,27 +245,27 @@ struct ContextSharedPart : boost::noncopyable
std::optional<BackupsWorker> backups_worker;
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
std::unique_ptr<AccessControl> access_control;
mutable ResourceManagerPtr resource_manager;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable QueryCachePtr query_cache; /// Cache of query results.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
ProcessList process_list; /// Executing queries at the moment.
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable QueryCachePtr query_cache; /// Cache of query results.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
ProcessList process_list; /// Executing queries at the moment.
SessionTracker session_tracker;
GlobalOvercommitTracker global_overcommit_tracker;
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
ReplicatedFetchList replicated_fetch_list;
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
mutable std::unique_ptr<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
mutable std::unique_ptr<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
@ -1561,7 +1561,7 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
}
}
auto hash = table_expression->getTreeHash();
String key = toString(hash.first) + '_' + toString(hash.second);
auto key = toString(hash);
StoragePtr & res = table_function_results[key];
if (!res)
{
@ -1712,7 +1712,7 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
auto new_hash = table_expression->getTreeHash();
if (hash != new_hash)
{
key = toString(new_hash.first) + '_' + toString(new_hash.second);
key = toString(new_hash);
table_function_results[key] = res;
}
}
@ -1721,8 +1721,8 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const TableFunctionPtr & table_function_ptr)
{
auto hash = table_expression->getTreeHash();
String key = toString(hash.first) + '_' + toString(hash.second);
const auto hash = table_expression->getTreeHash();
const auto key = toString(hash);
StoragePtr & res = table_function_results[key];
if (!res)
@ -2269,7 +2269,7 @@ UncompressedCachePtr Context::getUncompressedCache() const
}
void Context::dropUncompressedCache() const
void Context::clearUncompressedCache() const
{
auto lock = getLock();
if (shared->uncompressed_cache)
@ -2293,7 +2293,7 @@ MarkCachePtr Context::getMarkCache() const
return shared->mark_cache;
}
void Context::dropMarkCache() const
void Context::clearMarkCache() const
{
auto lock = getLock();
if (shared->mark_cache)
@ -2315,32 +2315,6 @@ ThreadPool & Context::getLoadMarksThreadpool() const
return *shared->load_marks_threadpool;
}
static size_t getPrefetchThreadpoolSizeFromConfig(const Poco::Util::AbstractConfiguration & config)
{
return config.getUInt(".prefetch_threadpool_pool_size", 100);
}
size_t Context::getPrefetchThreadpoolSize() const
{
const auto & config = getConfigRef();
return getPrefetchThreadpoolSizeFromConfig(config);
}
ThreadPool & Context::getPrefetchThreadpool() const
{
const auto & config = getConfigRef();
auto lock = getLock();
if (!shared->prefetch_threadpool)
{
auto pool_size = getPrefetchThreadpoolSize();
auto queue_size = config.getUInt(".prefetch_threadpool_queue_size", 1000000);
shared->prefetch_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::IOPrefetchThreads, CurrentMetrics::IOPrefetchThreadsActive, pool_size, pool_size, queue_size);
}
return *shared->prefetch_threadpool;
}
void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
{
auto lock = getLock();
@ -2351,7 +2325,6 @@ void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
shared->index_uncompressed_cache = std::make_shared<UncompressedCache>(max_size_in_bytes);
}
UncompressedCachePtr Context::getIndexUncompressedCache() const
{
auto lock = getLock();
@ -2359,7 +2332,7 @@ UncompressedCachePtr Context::getIndexUncompressedCache() const
}
void Context::dropIndexUncompressedCache() const
void Context::clearIndexUncompressedCache() const
{
auto lock = getLock();
if (shared->index_uncompressed_cache)
@ -2383,44 +2356,13 @@ MarkCachePtr Context::getIndexMarkCache() const
return shared->index_mark_cache;
}
void Context::dropIndexMarkCache() const
void Context::clearIndexMarkCache() const
{
auto lock = getLock();
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
}
void Context::setQueryCache(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();
if (shared->query_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query cache has been already created.");
shared->query_cache = std::make_shared<QueryCache>();
shared->query_cache->updateConfiguration(config);
}
void Context::updateQueryCacheConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();
if (shared->query_cache)
shared->query_cache->updateConfiguration(config);
}
QueryCachePtr Context::getQueryCache() const
{
auto lock = getLock();
return shared->query_cache;
}
void Context::dropQueryCache() const
{
auto lock = getLock();
if (shared->query_cache)
shared->query_cache->reset();
}
void Context::setMMappedFileCache(size_t cache_size_in_num_entries)
{
auto lock = getLock();
@ -2437,15 +2379,50 @@ MMappedFileCachePtr Context::getMMappedFileCache() const
return shared->mmap_cache;
}
void Context::dropMMappedFileCache() const
void Context::clearMMappedFileCache() const
{
auto lock = getLock();
if (shared->mmap_cache)
shared->mmap_cache->reset();
}
void Context::setQueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes, size_t max_entry_size_in_rows)
{
auto lock = getLock();
void Context::dropCaches() const
if (shared->query_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query cache has been already created.");
shared->query_cache = std::make_shared<QueryCache>(max_size_in_bytes, max_entries, max_entry_size_in_bytes, max_entry_size_in_rows);
}
void Context::updateQueryCacheConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();
if (shared->query_cache)
{
size_t max_size_in_bytes = config.getUInt64("query_cache.max_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_SIZE);
size_t max_entries = config.getUInt64("query_cache.max_entries", DEFAULT_QUERY_CACHE_MAX_ENTRIES);
size_t max_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES);
size_t max_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows_in_rows", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS);
shared->query_cache->updateConfiguration(max_size_in_bytes, max_entries, max_entry_size_in_bytes, max_entry_size_in_rows);
}
}
QueryCachePtr Context::getQueryCache() const
{
auto lock = getLock();
return shared->query_cache;
}
void Context::clearQueryCache() const
{
auto lock = getLock();
if (shared->query_cache)
shared->query_cache->reset();
}
void Context::clearCaches() const
{
auto lock = getLock();
@ -2461,11 +2438,31 @@ void Context::dropCaches() const
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
if (shared->query_cache)
shared->query_cache->reset();
if (shared->mmap_cache)
shared->mmap_cache->reset();
/// Intentionally not dropping the query cache which is transactionally inconsistent by design.
}
ThreadPool & Context::getPrefetchThreadpool() const
{
const auto & config = getConfigRef();
auto lock = getLock();
if (!shared->prefetch_threadpool)
{
auto pool_size = getPrefetchThreadpoolSize();
auto queue_size = config.getUInt(".prefetch_threadpool_queue_size", 1000000);
shared->prefetch_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::IOPrefetchThreads, CurrentMetrics::IOPrefetchThreadsActive, pool_size, pool_size, queue_size);
}
return *shared->prefetch_threadpool;
}
size_t Context::getPrefetchThreadpoolSize() const
{
const auto & config = getConfigRef();
return config.getUInt(".prefetch_threadpool_pool_size", 100);
}
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const

View File

@ -915,44 +915,39 @@ public:
void setSystemZooKeeperLogAfterInitializationIfNeeded();
/// --- Caches ------------------------------------------------------------------------------------------
/// Create a cache of uncompressed blocks of specified size. This can be done only once.
void setUncompressedCache(const String & uncompressed_cache_policy, size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getUncompressedCache() const;
void dropUncompressedCache() const;
void clearUncompressedCache() const;
/// Create a cache of marks of specified size. This can be done only once.
void setMarkCache(const String & mark_cache_policy, size_t cache_size_in_bytes);
std::shared_ptr<MarkCache> getMarkCache() const;
void dropMarkCache() const;
void clearMarkCache() const;
ThreadPool & getLoadMarksThreadpool() const;
ThreadPool & getPrefetchThreadpool() const;
/// Note: prefetchThreadpool is different from threadpoolReader
/// in the way that its tasks are - wait for marks to be loaded
/// and make a prefetch by putting a read task to threadpoolReader.
size_t getPrefetchThreadpoolSize() const;
/// Create a cache of index uncompressed blocks of specified size. This can be done only once.
void setIndexUncompressedCache(size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getIndexUncompressedCache() const;
void dropIndexUncompressedCache() const;
void clearIndexUncompressedCache() const;
/// Create a cache of index marks of specified size. This can be done only once.
void setIndexMarkCache(size_t cache_size_in_bytes);
std::shared_ptr<MarkCache> getIndexMarkCache() const;
void dropIndexMarkCache() const;
void clearIndexMarkCache() const;
/// Create a cache of mapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
void setMMappedFileCache(size_t cache_size_in_num_entries);
std::shared_ptr<MMappedFileCache> getMMappedFileCache() const;
void dropMMappedFileCache() const;
void clearMMappedFileCache() const;
/// Create a cache of query results for statements which run repeatedly.
void setQueryCache(const Poco::Util::AbstractConfiguration & config);
void setQueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes, size_t max_entry_size_in_rows);
void updateQueryCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<QueryCache> getQueryCache() const;
void dropQueryCache() const;
void clearQueryCache() const;
/** Clear the caches of the uncompressed blocks and marks.
* This is usually done when renaming tables, changing the type of columns, deleting a table.
@ -960,7 +955,16 @@ public:
* (when deleting a table - it is necessary, since in its place another can appear)
* const - because the change in the cache is not considered significant.
*/
void dropCaches() const;
void clearCaches() const;
/// -----------------------------------------------------------------------------------------------------
ThreadPool & getPrefetchThreadpool() const;
/// Note: prefetchThreadpool is different from threadpoolReader
/// in the way that its tasks are - wait for marks to be loaded
/// and make a prefetch by putting a read task to threadpoolReader.
size_t getPrefetchThreadpoolSize() const;
/// Settings for MergeTree background tasks stored in config.xml
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;

View File

@ -98,7 +98,7 @@ static auto getQueryInterpreter(const ASTSubquery & subquery, ExecuteScalarSubqu
void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr & ast, Data & data)
{
auto hash = subquery.getTreeHash();
auto scalar_query_hash_str = toString(hash.first) + "_" + toString(hash.second);
const auto scalar_query_hash_str = toString(hash);
std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter = nullptr;
bool hit = false;

View File

@ -115,7 +115,7 @@ public:
if (alias.empty())
{
auto hash = subquery_or_table_name->getTreeHash();
external_table_name = fmt::format("_data_{}_{}", hash.first, hash.second);
external_table_name = fmt::format("_data_{}", toString(hash));
}
else
external_table_name = alias;

View File

@ -9,6 +9,7 @@
#include <Common/Macros.h>
#include <Common/randomSeed.h>
#include <Common/atomicRename.h>
#include <Common/logger_useful.h>
#include <base/hex.h>
#include <Core/Defines.h>
@ -71,7 +72,6 @@
#include <Interpreters/ApplyWithSubqueryVisitor.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/logger_useful.h>
#include <DataTypes/DataTypeFixedString.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
@ -1329,10 +1329,32 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
}
data_path = database->getTableDataPath(create);
auto full_data_path = fs::path{getContext()->getPath()} / data_path;
if (!create.attach && !data_path.empty() && fs::exists(fs::path{getContext()->getPath()} / data_path))
throw Exception(storage_already_exists_error_code,
"Directory for {} data {} already exists", Poco::toLower(storage_name), String(data_path));
if (!create.attach && !data_path.empty() && fs::exists(full_data_path))
{
if (getContext()->getZooKeeperMetadataTransaction() &&
!getContext()->getZooKeeperMetadataTransaction()->isInitialQuery() &&
!DatabaseCatalog::instance().hasUUIDMapping(create.uuid) &&
Context::getGlobalContextInstance()->isServerCompletelyStarted() &&
Context::getGlobalContextInstance()->getConfigRef().getBool("allow_moving_table_directory_to_trash", false))
{
/// This is a secondary query from a Replicated database. It cannot be retried with another UUID, we must execute it as is.
/// We don't have a table with this UUID (and all metadata is loaded),
/// so the existing directory probably contains some leftovers from previous unsuccessful attempts to create the table
fs::path trash_path = fs::path{getContext()->getPath()} / "trash" / data_path / getHexUIntLowercase(thread_local_rng());
LOG_WARNING(&Poco::Logger::get("InterpreterCreateQuery"), "Directory for {} data {} already exists. Will move it to {}",
Poco::toLower(storage_name), String(data_path), trash_path);
fs::create_directories(trash_path.parent_path());
renameNoReplace(full_data_path, trash_path);
}
else
{
throw Exception(storage_already_exists_error_code,
"Directory for {} data {} already exists", Poco::toLower(storage_name), String(data_path));
}
}
bool from_path = create.attach_from_path.has_value();
String actual_data_path = data_path;

View File

@ -247,10 +247,10 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
DatabaseCatalog::instance().removeDependencies(table_id, check_ref_deps, check_loading_deps, is_drop_or_detach_database);
database->dropTable(context_, table_id.table_name, query.sync);
/// We have to drop mmapio cache when dropping table from Ordinary database
/// We have to clear mmapio cache when dropping table from Ordinary database
/// to avoid reading old data if new table with the same name is created
if (database->getUUID() == UUIDHelpers::Nil)
context_->dropMMappedFileCache();
context_->clearMMappedFileCache();
}
db = database;

View File

@ -541,13 +541,13 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), getContext(), SelectQueryOptions());
interpreter.buildQueryPlan(plan);
context = interpreter.getContext();
// collect the selected marks, rows, parts during build query pipeline.
plan.buildQueryPipeline(
// Collect the selected marks, rows, parts during build query pipeline.
// Hold on to the returned QueryPipelineBuilderPtr because `plan` may have pointers into
// it (through QueryPlanResourceHolder).
auto builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context));
if (settings.optimize)
plan.optimize(QueryPlanOptimizationSettings::fromContext(context));
plan.explainEstimate(res_columns);
insert_buf = false;
break;

View File

@ -184,7 +184,7 @@ InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(
, context(buildContext(context_, select_query_options_))
, select_query_options(select_query_options_)
, query_tree(query_tree_)
, planner(query_tree_, select_query_options_)
, planner(query_tree_, select_query_options)
{
}

Some files were not shown because too many files have changed in this diff Show More