Merge remote-tracking branch 'upstream/master' into system-sync-cache

This commit is contained in:
kssenii 2023-08-19 08:43:53 +04:00
commit 247abe0a2a
410 changed files with 6030 additions and 2813 deletions

2
contrib/boost vendored

@ -1 +1 @@
Subproject commit aec12eea7fc762721ae16943d1361340c66c9c17
Subproject commit bb179652862b528d94a9032a784796c4db846c3f

View File

@ -172,9 +172,9 @@ endif()
# coroutine
set (SRCS_COROUTINE
"${LIBRARY_DIR}/libs/coroutine/detail/coroutine_context.cpp"
"${LIBRARY_DIR}/libs/coroutine/exceptions.cpp"
"${LIBRARY_DIR}/libs/coroutine/posix/stack_traits.cpp"
"${LIBRARY_DIR}/libs/coroutine/src/detail/coroutine_context.cpp"
"${LIBRARY_DIR}/libs/coroutine/src/exceptions.cpp"
"${LIBRARY_DIR}/libs/coroutine/src/posix/stack_traits.cpp"
)
add_library (_boost_coroutine ${SRCS_COROUTINE})
add_library (boost::coroutine ALIAS _boost_coroutine)

View File

@ -73,8 +73,8 @@ struct uint128
uint128() = default;
uint128(uint64 low64_, uint64 high64_) : low64(low64_), high64(high64_) {}
friend bool operator ==(const uint128 & x, const uint128 & y) { return (x.low64 == y.low64) && (x.high64 == y.high64); }
friend bool operator !=(const uint128 & x, const uint128 & y) { return !(x == y); }
friend auto operator<=>(const uint128 &, const uint128 &) = default;
};
inline uint64 Uint128Low64(const uint128 & x) { return x.low64; }

2
contrib/curl vendored

@ -1 +1 @@
Subproject commit b0edf0b7dae44d9e66f270a257cf654b35d5263d
Subproject commit eb3b049df526bf125eda23218e680ce7fa9ec46c

View File

@ -8,125 +8,122 @@ endif()
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/curl")
set (SRCS
"${LIBRARY_DIR}/lib/fopen.c"
"${LIBRARY_DIR}/lib/noproxy.c"
"${LIBRARY_DIR}/lib/idn.c"
"${LIBRARY_DIR}/lib/cfilters.c"
"${LIBRARY_DIR}/lib/cf-socket.c"
"${LIBRARY_DIR}/lib/altsvc.c"
"${LIBRARY_DIR}/lib/amigaos.c"
"${LIBRARY_DIR}/lib/asyn-thread.c"
"${LIBRARY_DIR}/lib/base64.c"
"${LIBRARY_DIR}/lib/bufq.c"
"${LIBRARY_DIR}/lib/bufref.c"
"${LIBRARY_DIR}/lib/cf-h1-proxy.c"
"${LIBRARY_DIR}/lib/cf-haproxy.c"
"${LIBRARY_DIR}/lib/cf-https-connect.c"
"${LIBRARY_DIR}/lib/file.c"
"${LIBRARY_DIR}/lib/timeval.c"
"${LIBRARY_DIR}/lib/base64.c"
"${LIBRARY_DIR}/lib/hostip.c"
"${LIBRARY_DIR}/lib/progress.c"
"${LIBRARY_DIR}/lib/formdata.c"
"${LIBRARY_DIR}/lib/cookie.c"
"${LIBRARY_DIR}/lib/http.c"
"${LIBRARY_DIR}/lib/sendf.c"
"${LIBRARY_DIR}/lib/url.c"
"${LIBRARY_DIR}/lib/dict.c"
"${LIBRARY_DIR}/lib/if2ip.c"
"${LIBRARY_DIR}/lib/speedcheck.c"
"${LIBRARY_DIR}/lib/ldap.c"
"${LIBRARY_DIR}/lib/version.c"
"${LIBRARY_DIR}/lib/getenv.c"
"${LIBRARY_DIR}/lib/escape.c"
"${LIBRARY_DIR}/lib/mprintf.c"
"${LIBRARY_DIR}/lib/telnet.c"
"${LIBRARY_DIR}/lib/netrc.c"
"${LIBRARY_DIR}/lib/getinfo.c"
"${LIBRARY_DIR}/lib/transfer.c"
"${LIBRARY_DIR}/lib/strcase.c"
"${LIBRARY_DIR}/lib/easy.c"
"${LIBRARY_DIR}/lib/curl_fnmatch.c"
"${LIBRARY_DIR}/lib/curl_log.c"
"${LIBRARY_DIR}/lib/fileinfo.c"
"${LIBRARY_DIR}/lib/krb5.c"
"${LIBRARY_DIR}/lib/memdebug.c"
"${LIBRARY_DIR}/lib/http_chunks.c"
"${LIBRARY_DIR}/lib/strtok.c"
"${LIBRARY_DIR}/lib/cf-socket.c"
"${LIBRARY_DIR}/lib/cfilters.c"
"${LIBRARY_DIR}/lib/conncache.c"
"${LIBRARY_DIR}/lib/connect.c"
"${LIBRARY_DIR}/lib/llist.c"
"${LIBRARY_DIR}/lib/hash.c"
"${LIBRARY_DIR}/lib/multi.c"
"${LIBRARY_DIR}/lib/content_encoding.c"
"${LIBRARY_DIR}/lib/share.c"
"${LIBRARY_DIR}/lib/http_digest.c"
"${LIBRARY_DIR}/lib/md4.c"
"${LIBRARY_DIR}/lib/md5.c"
"${LIBRARY_DIR}/lib/http_negotiate.c"
"${LIBRARY_DIR}/lib/inet_pton.c"
"${LIBRARY_DIR}/lib/strtoofft.c"
"${LIBRARY_DIR}/lib/strerror.c"
"${LIBRARY_DIR}/lib/amigaos.c"
"${LIBRARY_DIR}/lib/cookie.c"
"${LIBRARY_DIR}/lib/curl_addrinfo.c"
"${LIBRARY_DIR}/lib/curl_des.c"
"${LIBRARY_DIR}/lib/curl_endian.c"
"${LIBRARY_DIR}/lib/curl_fnmatch.c"
"${LIBRARY_DIR}/lib/curl_get_line.c"
"${LIBRARY_DIR}/lib/curl_gethostname.c"
"${LIBRARY_DIR}/lib/curl_gssapi.c"
"${LIBRARY_DIR}/lib/curl_memrchr.c"
"${LIBRARY_DIR}/lib/curl_multibyte.c"
"${LIBRARY_DIR}/lib/curl_ntlm_core.c"
"${LIBRARY_DIR}/lib/curl_ntlm_wb.c"
"${LIBRARY_DIR}/lib/curl_path.c"
"${LIBRARY_DIR}/lib/curl_range.c"
"${LIBRARY_DIR}/lib/curl_rtmp.c"
"${LIBRARY_DIR}/lib/curl_sasl.c"
"${LIBRARY_DIR}/lib/curl_sspi.c"
"${LIBRARY_DIR}/lib/curl_threads.c"
"${LIBRARY_DIR}/lib/curl_trc.c"
"${LIBRARY_DIR}/lib/dict.c"
"${LIBRARY_DIR}/lib/doh.c"
"${LIBRARY_DIR}/lib/dynbuf.c"
"${LIBRARY_DIR}/lib/dynhds.c"
"${LIBRARY_DIR}/lib/easy.c"
"${LIBRARY_DIR}/lib/escape.c"
"${LIBRARY_DIR}/lib/file.c"
"${LIBRARY_DIR}/lib/fileinfo.c"
"${LIBRARY_DIR}/lib/fopen.c"
"${LIBRARY_DIR}/lib/formdata.c"
"${LIBRARY_DIR}/lib/getenv.c"
"${LIBRARY_DIR}/lib/getinfo.c"
"${LIBRARY_DIR}/lib/gopher.c"
"${LIBRARY_DIR}/lib/hash.c"
"${LIBRARY_DIR}/lib/headers.c"
"${LIBRARY_DIR}/lib/hmac.c"
"${LIBRARY_DIR}/lib/hostasyn.c"
"${LIBRARY_DIR}/lib/hostip.c"
"${LIBRARY_DIR}/lib/hostip4.c"
"${LIBRARY_DIR}/lib/hostip6.c"
"${LIBRARY_DIR}/lib/hostsyn.c"
"${LIBRARY_DIR}/lib/hsts.c"
"${LIBRARY_DIR}/lib/http.c"
"${LIBRARY_DIR}/lib/http2.c"
"${LIBRARY_DIR}/lib/http_aws_sigv4.c"
"${LIBRARY_DIR}/lib/http_chunks.c"
"${LIBRARY_DIR}/lib/http_digest.c"
"${LIBRARY_DIR}/lib/http_negotiate.c"
"${LIBRARY_DIR}/lib/http_ntlm.c"
"${LIBRARY_DIR}/lib/http_proxy.c"
"${LIBRARY_DIR}/lib/idn.c"
"${LIBRARY_DIR}/lib/if2ip.c"
"${LIBRARY_DIR}/lib/imap.c"
"${LIBRARY_DIR}/lib/inet_ntop.c"
"${LIBRARY_DIR}/lib/inet_pton.c"
"${LIBRARY_DIR}/lib/krb5.c"
"${LIBRARY_DIR}/lib/ldap.c"
"${LIBRARY_DIR}/lib/llist.c"
"${LIBRARY_DIR}/lib/md4.c"
"${LIBRARY_DIR}/lib/md5.c"
"${LIBRARY_DIR}/lib/memdebug.c"
"${LIBRARY_DIR}/lib/mime.c"
"${LIBRARY_DIR}/lib/mprintf.c"
"${LIBRARY_DIR}/lib/mqtt.c"
"${LIBRARY_DIR}/lib/multi.c"
"${LIBRARY_DIR}/lib/netrc.c"
"${LIBRARY_DIR}/lib/nonblock.c"
"${LIBRARY_DIR}/lib/noproxy.c"
"${LIBRARY_DIR}/lib/openldap.c"
"${LIBRARY_DIR}/lib/parsedate.c"
"${LIBRARY_DIR}/lib/pingpong.c"
"${LIBRARY_DIR}/lib/pop3.c"
"${LIBRARY_DIR}/lib/progress.c"
"${LIBRARY_DIR}/lib/psl.c"
"${LIBRARY_DIR}/lib/rand.c"
"${LIBRARY_DIR}/lib/rename.c"
"${LIBRARY_DIR}/lib/rtsp.c"
"${LIBRARY_DIR}/lib/select.c"
"${LIBRARY_DIR}/lib/splay.c"
"${LIBRARY_DIR}/lib/strdup.c"
"${LIBRARY_DIR}/lib/sendf.c"
"${LIBRARY_DIR}/lib/setopt.c"
"${LIBRARY_DIR}/lib/sha256.c"
"${LIBRARY_DIR}/lib/share.c"
"${LIBRARY_DIR}/lib/slist.c"
"${LIBRARY_DIR}/lib/smb.c"
"${LIBRARY_DIR}/lib/smtp.c"
"${LIBRARY_DIR}/lib/socketpair.c"
"${LIBRARY_DIR}/lib/socks.c"
"${LIBRARY_DIR}/lib/curl_addrinfo.c"
"${LIBRARY_DIR}/lib/socks_gssapi.c"
"${LIBRARY_DIR}/lib/socks_sspi.c"
"${LIBRARY_DIR}/lib/curl_sspi.c"
"${LIBRARY_DIR}/lib/slist.c"
"${LIBRARY_DIR}/lib/nonblock.c"
"${LIBRARY_DIR}/lib/curl_memrchr.c"
"${LIBRARY_DIR}/lib/imap.c"
"${LIBRARY_DIR}/lib/pop3.c"
"${LIBRARY_DIR}/lib/smtp.c"
"${LIBRARY_DIR}/lib/pingpong.c"
"${LIBRARY_DIR}/lib/rtsp.c"
"${LIBRARY_DIR}/lib/curl_threads.c"
"${LIBRARY_DIR}/lib/warnless.c"
"${LIBRARY_DIR}/lib/hmac.c"
"${LIBRARY_DIR}/lib/curl_rtmp.c"
"${LIBRARY_DIR}/lib/openldap.c"
"${LIBRARY_DIR}/lib/curl_gethostname.c"
"${LIBRARY_DIR}/lib/gopher.c"
"${LIBRARY_DIR}/lib/http_proxy.c"
"${LIBRARY_DIR}/lib/asyn-thread.c"
"${LIBRARY_DIR}/lib/curl_gssapi.c"
"${LIBRARY_DIR}/lib/http_ntlm.c"
"${LIBRARY_DIR}/lib/curl_ntlm_wb.c"
"${LIBRARY_DIR}/lib/curl_ntlm_core.c"
"${LIBRARY_DIR}/lib/curl_sasl.c"
"${LIBRARY_DIR}/lib/rand.c"
"${LIBRARY_DIR}/lib/curl_multibyte.c"
"${LIBRARY_DIR}/lib/conncache.c"
"${LIBRARY_DIR}/lib/cf-h1-proxy.c"
"${LIBRARY_DIR}/lib/http2.c"
"${LIBRARY_DIR}/lib/smb.c"
"${LIBRARY_DIR}/lib/curl_endian.c"
"${LIBRARY_DIR}/lib/curl_des.c"
"${LIBRARY_DIR}/lib/speedcheck.c"
"${LIBRARY_DIR}/lib/splay.c"
"${LIBRARY_DIR}/lib/strcase.c"
"${LIBRARY_DIR}/lib/strdup.c"
"${LIBRARY_DIR}/lib/strerror.c"
"${LIBRARY_DIR}/lib/strtok.c"
"${LIBRARY_DIR}/lib/strtoofft.c"
"${LIBRARY_DIR}/lib/system_win32.c"
"${LIBRARY_DIR}/lib/mime.c"
"${LIBRARY_DIR}/lib/sha256.c"
"${LIBRARY_DIR}/lib/setopt.c"
"${LIBRARY_DIR}/lib/curl_path.c"
"${LIBRARY_DIR}/lib/curl_range.c"
"${LIBRARY_DIR}/lib/psl.c"
"${LIBRARY_DIR}/lib/doh.c"
"${LIBRARY_DIR}/lib/urlapi.c"
"${LIBRARY_DIR}/lib/curl_get_line.c"
"${LIBRARY_DIR}/lib/altsvc.c"
"${LIBRARY_DIR}/lib/socketpair.c"
"${LIBRARY_DIR}/lib/bufref.c"
"${LIBRARY_DIR}/lib/bufq.c"
"${LIBRARY_DIR}/lib/dynbuf.c"
"${LIBRARY_DIR}/lib/dynhds.c"
"${LIBRARY_DIR}/lib/hsts.c"
"${LIBRARY_DIR}/lib/http_aws_sigv4.c"
"${LIBRARY_DIR}/lib/mqtt.c"
"${LIBRARY_DIR}/lib/rename.c"
"${LIBRARY_DIR}/lib/headers.c"
"${LIBRARY_DIR}/lib/telnet.c"
"${LIBRARY_DIR}/lib/timediff.c"
"${LIBRARY_DIR}/lib/vauth/vauth.c"
"${LIBRARY_DIR}/lib/timeval.c"
"${LIBRARY_DIR}/lib/transfer.c"
"${LIBRARY_DIR}/lib/url.c"
"${LIBRARY_DIR}/lib/urlapi.c"
"${LIBRARY_DIR}/lib/vauth/cleartext.c"
"${LIBRARY_DIR}/lib/vauth/cram.c"
"${LIBRARY_DIR}/lib/vauth/digest.c"
@ -138,23 +135,24 @@ set (SRCS
"${LIBRARY_DIR}/lib/vauth/oauth2.c"
"${LIBRARY_DIR}/lib/vauth/spnego_gssapi.c"
"${LIBRARY_DIR}/lib/vauth/spnego_sspi.c"
"${LIBRARY_DIR}/lib/vauth/vauth.c"
"${LIBRARY_DIR}/lib/version.c"
"${LIBRARY_DIR}/lib/vquic/vquic.c"
"${LIBRARY_DIR}/lib/vtls/openssl.c"
"${LIBRARY_DIR}/lib/vssh/libssh.c"
"${LIBRARY_DIR}/lib/vssh/libssh2.c"
"${LIBRARY_DIR}/lib/vtls/bearssl.c"
"${LIBRARY_DIR}/lib/vtls/gtls.c"
"${LIBRARY_DIR}/lib/vtls/vtls.c"
"${LIBRARY_DIR}/lib/vtls/nss.c"
"${LIBRARY_DIR}/lib/vtls/wolfssl.c"
"${LIBRARY_DIR}/lib/vtls/hostcheck.c"
"${LIBRARY_DIR}/lib/vtls/keylog.c"
"${LIBRARY_DIR}/lib/vtls/mbedtls.c"
"${LIBRARY_DIR}/lib/vtls/openssl.c"
"${LIBRARY_DIR}/lib/vtls/schannel.c"
"${LIBRARY_DIR}/lib/vtls/schannel_verify.c"
"${LIBRARY_DIR}/lib/vtls/sectransp.c"
"${LIBRARY_DIR}/lib/vtls/gskit.c"
"${LIBRARY_DIR}/lib/vtls/mbedtls.c"
"${LIBRARY_DIR}/lib/vtls/bearssl.c"
"${LIBRARY_DIR}/lib/vtls/keylog.c"
"${LIBRARY_DIR}/lib/vtls/vtls.c"
"${LIBRARY_DIR}/lib/vtls/wolfssl.c"
"${LIBRARY_DIR}/lib/vtls/x509asn1.c"
"${LIBRARY_DIR}/lib/vtls/hostcheck.c"
"${LIBRARY_DIR}/lib/vssh/libssh2.c"
"${LIBRARY_DIR}/lib/vssh/libssh.c"
"${LIBRARY_DIR}/lib/warnless.c"
)
add_library (_curl ${SRCS})

2
contrib/krb5 vendored

@ -1 +1 @@
Subproject commit b56ce6ba690e1f320df1a64afa34980c3e462617
Subproject commit 1d5c970e9369f444caf81d1d06a231a6bad8581f

View File

@ -17,6 +17,9 @@ CONNECTION_PARAMETERS=${CONNECTION_PARAMETERS:=""}
# Create all configured system logs:
clickhouse-client --query "SYSTEM FLUSH LOGS"
# It's doesn't make sense to try creating tables if SYNC fails
echo "SYSTEM SYNC DATABASE REPLICA default" | clickhouse-client --receive_timeout 180 $CONNECTION_PARAMETERS || exit 0
# For each system log table:
clickhouse-client --query "SHOW TABLES FROM system LIKE '%\\_log'" | while read -r table
do
@ -38,7 +41,7 @@ do
echo "Creating destination table ${table}_${hash}" >&2
echo "$statement" | clickhouse-client $CONNECTION_PARAMETERS
echo "$statement" | clickhouse-client --distributed_ddl_task_timeout=10 $CONNECTION_PARAMETERS || continue
echo "Creating table system.${table}_sender" >&2
@ -46,6 +49,7 @@ do
clickhouse-client --query "
CREATE TABLE system.${table}_sender
ENGINE = Distributed(${CLUSTER}, default, ${table}_${hash})
SETTINGS flush_on_detach=0
EMPTY AS
SELECT ${EXTRA_COLUMNS_EXPRESSION}, *
FROM system.${table}

View File

@ -12,6 +12,7 @@ ENV \
# install systemd packages
RUN apt-get update && \
apt-get install -y --no-install-recommends \
sudo \
systemd \
&& \
apt-get clean && \

View File

@ -1,18 +1,7 @@
# docker build -t clickhouse/performance-comparison .
# Using ubuntu:22.04 over 20.04 as all other images, since:
# a) ubuntu 20.04 has too old parallel, and does not support --memsuspend
# b) anyway for perf tests it should not be important (backward compatiblity
# with older ubuntu had been checked lots of times in various tests)
FROM ubuntu:22.04
# ARG for quick switch to a given ubuntu mirror
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
ENV LANG=C.UTF-8
ENV TZ=Europe/Amsterdam
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ARG FROM_TAG=latest
FROM clickhouse/test-base:$FROM_TAG
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends \
@ -56,10 +45,9 @@ COPY * /
# node #0 should be less stable because of system interruptions. We bind
# randomly to node 1 or 0 to gather some statistics on that. We have to bind
# both servers and the tmpfs on which the database is stored. How to do it
# through Yandex Sandbox API is unclear, but by default tmpfs uses
# is unclear, but by default tmpfs uses
# 'process allocation policy', not sure which process but hopefully the one that
# writes to it, so just bind the downloader script as well. We could also try to
# remount it with proper options in Sandbox task.
# writes to it, so just bind the downloader script as well.
# https://www.kernel.org/doc/Documentation/filesystems/tmpfs.txt
# Double-escaped backslashes are a tribute to the engineering wonder of docker --
# it gives '/bin/sh: 1: [bash,: not found' otherwise.

View File

@ -90,7 +90,7 @@ function configure
set +m
wait_for_server $LEFT_SERVER_PORT $left_pid
echo Server for setup started
echo "Server for setup started"
clickhouse-client --port $LEFT_SERVER_PORT --query "create database test" ||:
clickhouse-client --port $LEFT_SERVER_PORT --query "rename table datasets.hits_v1 to test.hits" ||:
@ -156,9 +156,9 @@ function restart
wait_for_server $RIGHT_SERVER_PORT $right_pid
echo right ok
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.tables where database != 'system'"
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.tables where database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')"
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.build_options"
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.tables where database != 'system'"
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.tables where database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')"
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.build_options"
# Check again that both servers we started are running -- this is important
@ -352,14 +352,12 @@ function get_profiles
wait
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.query_log where type in ('QueryFinish', 'ExceptionWhileProcessing') format TSVWithNamesAndTypes" > left-query-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.query_thread_log format TSVWithNamesAndTypes" > left-query-thread-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.trace_log format TSVWithNamesAndTypes" > left-trace-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select arrayJoin(trace) addr, concat(splitByChar('/', addressToLine(addr))[-1], '#', demangle(addressToSymbol(addr)) ) name from system.trace_log group by addr format TSVWithNamesAndTypes" > left-addresses.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.metric_log format TSVWithNamesAndTypes" > left-metric-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.asynchronous_metric_log format TSVWithNamesAndTypes" > left-async-metric-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.query_log where type in ('QueryFinish', 'ExceptionWhileProcessing') format TSVWithNamesAndTypes" > right-query-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.query_thread_log format TSVWithNamesAndTypes" > right-query-thread-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.trace_log format TSVWithNamesAndTypes" > right-trace-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select arrayJoin(trace) addr, concat(splitByChar('/', addressToLine(addr))[-1], '#', demangle(addressToSymbol(addr)) ) name from system.trace_log group by addr format TSVWithNamesAndTypes" > right-addresses.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.metric_log format TSVWithNamesAndTypes" > right-metric-log.tsv ||: &

View File

@ -19,31 +19,6 @@
<opentelemetry_span_log remove="remove"/>
<session_log remove="remove"/>
<!-- performance tests does not uses real block devices,
instead they stores everything in memory.
And so, to avoid extra memory reference switch *_log to Memory engine. -->
<query_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</query_log>
<query_thread_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</query_thread_log>
<trace_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</trace_log>
<metric_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</metric_log>
<asynchronous_metric_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</asynchronous_metric_log>
<uncompressed_cache_size>1000000000</uncompressed_cache_size>
<asynchronous_metrics_update_period_s>10</asynchronous_metrics_update_period_s>

View File

@ -31,8 +31,6 @@ function download
# Test all of them.
declare -a urls_to_try=(
"$S3_URL/PRs/$left_pr/$left_sha/$BUILD_NAME/performance.tar.zst"
"$S3_URL/$left_pr/$left_sha/$BUILD_NAME/performance.tar.zst"
"$S3_URL/$left_pr/$left_sha/$BUILD_NAME/performance.tgz"
)
for path in "${urls_to_try[@]}"

View File

@ -130,7 +130,7 @@ then
git -C right/ch diff --name-only "$base" pr -- :!tests/performance :!docker/test/performance-comparison | tee other-changed-files.txt
fi
# Set python output encoding so that we can print queries with Russian letters.
# Set python output encoding so that we can print queries with non-ASCII letters.
export PYTHONIOENCODING=utf-8
# By default, use the main comparison script from the tested package, so that we
@ -151,11 +151,7 @@ export PATH
export REF_PR
export REF_SHA
# Try to collect some core dumps. I've seen two patterns in Sandbox:
# 1) |/home/zomb-sandbox/venv/bin/python /home/zomb-sandbox/client/sandbox/bin/coredumper.py %e %p %g %u %s %P %c
# Not sure what this script does (puts them to sandbox resources, logs some messages?),
# and it's not accessible from inside docker anyway.
# 2) something like %e.%p.core.dmp. The dump should end up in the workspace directory.
# Try to collect some core dumps.
# At least we remove the ulimit and then try to pack some common file names into output.
ulimit -c unlimited
cat /proc/sys/kernel/core_pattern

View File

@ -1,5 +1,5 @@
# docker build -t clickhouse/style-test .
FROM ubuntu:20.04
FROM ubuntu:22.04
ARG ACT_VERSION=0.2.33
ARG ACTIONLINT_VERSION=1.6.22

View File

@ -190,7 +190,7 @@ These are the schema conversion manipulations you can do with table overrides fo
* Modify [column TTL](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#mergetree-column-ttl).
* Modify [column compression codec](/docs/en/sql-reference/statements/create/table.md/#codecs).
* Add [ALIAS columns](/docs/en/sql-reference/statements/create/table.md/#alias).
* Add [skipping indexes](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-data_skipping-indexes)
* Add [skipping indexes](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-data_skipping-indexes). Note that you need to enable `use_skip_indexes_if_final` setting to make them work (MaterializedMySQL is using `SELECT ... FINAL` by default)
* Add [projections](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#projections). Note that projection optimizations are
disabled when using `SELECT ... FINAL` (which MaterializedMySQL does by default), so their utility is limited here.
`INDEX ... TYPE hypothesis` as [described in the v21.12 blog post]](https://clickhouse.com/blog/en/2021/clickhouse-v21.12-released/)

View File

@ -2136,6 +2136,7 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
- [input_format_parquet_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_case_insensitive_column_matching) - ignore case when matching Parquet columns with ClickHouse columns. Default value - `false`.
- [input_format_parquet_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_allow_missing_columns) - allow missing columns while reading Parquet data. Default value - `false`.
- [input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Parquet format. Default value - `false`.
- [input_format_parquet_local_file_min_bytes_for_seek](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_local_file_min_bytes_for_seek) - min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format. Default value - `8192`.
- [output_format_parquet_fixed_string_as_fixed_byte_array](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_fixed_string_as_fixed_byte_array) - use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary/String for FixedString columns. Default value - `true`.
- [output_format_parquet_version](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_version) - The version of Parquet format used in output format. Default value - `2.latest`.
- [output_format_parquet_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_compression_method) - compression method used in output Parquet format. Default value - `snappy`.

View File

@ -56,11 +56,11 @@ Possible values:
- Any positive integer.
Default value: 300.
Default value: 3000.
To achieve maximum performance of `SELECT` queries, it is necessary to minimize the number of parts processed, see [Merge Tree](../../development/architecture.md#merge-tree).
You can set a larger value to 600 (1200), this will reduce the probability of the `Too many parts` error, but at the same time `SELECT` performance might degrade. Also in case of a merge issue (for example, due to insufficient disk space) you will notice it later than it could be with the original 300.
Prior to 23.6 this setting was set to 300. You can set a higher different value, it will reduce the probability of the `Too many parts` error, but at the same time `SELECT` performance might degrade. Also in case of a merge issue (for example, due to insufficient disk space) you will notice it later than it could be with the original 300.
## parts_to_delay_insert {#parts-to-delay-insert}

View File

@ -1223,6 +1223,12 @@ Allow skipping columns with unsupported types while schema inference for format
Disabled by default.
### input_format_parquet_local_file_min_bytes_for_seek {#input_format_parquet_local_file_min_bytes_for_seek}
min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format.
Default value - `8192`.
### output_format_parquet_string_as_string {#output_format_parquet_string_as_string}
Use Parquet String type instead of Binary for String columns.

View File

@ -98,6 +98,18 @@ Default value: 0.
</profiles>
```
## mutations_execute_nondeterministic_on_initiator {#mutations_execute_nondeterministic_on_initiator}
If true constant nondeterministic functions (e.g. function `now()`) are executed on initiator and replaced to literals in `UPDATE` and `DELETE` queries. It helps to keep data in sync on replicas while executing mutations with constant nondeterministic functions. Default value: `false`.
## mutations_execute_subqueries_on_initiator {#mutations_execute_subqueries_on_initiator}
If true scalar subqueries are executed on initiator and replaced to literals in `UPDATE` and `DELETE` queries. Default value: `false`.
## mutations_max_literal_size_to_replace {#mutations_max_literal_size_to_replace}
The maximum size of serialized literal in bytes to replace in `UPDATE` and `DELETE` queries. Takes effect only if at least one the two settings above is enabled. Default value: 16384 (16 KiB).
## distributed_product_mode {#distributed-product-mode}
Changes the behaviour of [distributed subqueries](../../sql-reference/operators/in.md).
@ -4298,7 +4310,7 @@ Use this setting only for backward compatibility if your use cases depend on old
## session_timezone {#session_timezone}
Sets the implicit time zone of the current session or query.
The implicit time zone is the time zone applied to values of type DateTime/DateTime64 which have no explicitly specified time zone.
The implicit time zone is the time zone applied to values of type DateTime/DateTime64 which have no explicitly specified time zone.
The setting takes precedence over the globally configured (server-level) implicit time zone.
A value of '' (empty string) means that the implicit time zone of the current session or query is equal to the [server time zone](../server-configuration-parameters/settings.md#server_configuration_parameters-timezone).
@ -4333,7 +4345,7 @@ SELECT toDateTime64(toDateTime64('1999-12-12 23:23:23.123', 3), 3, 'Europe/Zuric
```
:::warning
Not all functions that parse DateTime/DateTime64 respect `session_timezone`. This can lead to subtle errors.
Not all functions that parse DateTime/DateTime64 respect `session_timezone`. This can lead to subtle errors.
See the following example and explanation.
:::

View File

@ -26,9 +26,9 @@ SELECT p, toTypeName(p) FROM geo_point;
Result:
``` text
┌─p─────┬─toTypeName(p)─┐
┌─p───────┬─toTypeName(p)─┐
│ (10,10) │ Point │
└───────┴───────────────┘
└─────────┴───────────────┘
```
## Ring

View File

@ -1092,7 +1092,7 @@ Types of sources (`source_type`):
- [Local file](#local_file)
- [Executable File](#executable)
- [Executable Pool](#executable_pool)
- [HTTP(s)](#http)
- [HTTP(S)](#http)
- DBMS
- [ODBC](#odbc)
- [MySQL](#mysql)
@ -1102,7 +1102,7 @@ Types of sources (`source_type`):
- [Cassandra](#cassandra)
- [PostgreSQL](#postgresql)
## Local File {#local_file}
### Local File {#local_file}
Example of settings:
@ -1132,7 +1132,7 @@ When a dictionary with source `FILE` is created via DDL command (`CREATE DICTION
- [Dictionary function](../../sql-reference/table-functions/dictionary.md#dictionary-function)
## Executable File {#executable}
### Executable File {#executable}
Working with executable files depends on [how the dictionary is stored in memory](#storig-dictionaries-in-memory). If the dictionary is stored using `cache` and `complex_key_cache`, ClickHouse requests the necessary keys by sending a request to the executable files STDIN. Otherwise, ClickHouse starts the executable file and treats its output as dictionary data.
@ -1161,7 +1161,7 @@ Setting fields:
That dictionary source can be configured only via XML configuration. Creating dictionaries with executable source via DDL is disabled; otherwise, the DB user would be able to execute arbitrary binaries on the ClickHouse node.
## Executable Pool {#executable_pool}
### Executable Pool {#executable_pool}
Executable pool allows loading data from pool of processes. This source does not work with dictionary layouts that need to load all data from source. Executable pool works if the dictionary [is stored](#ways-to-store-dictionaries-in-memory) using `cache`, `complex_key_cache`, `ssd_cache`, `complex_key_ssd_cache`, `direct`, or `complex_key_direct` layouts.
@ -1196,9 +1196,9 @@ Setting fields:
That dictionary source can be configured only via XML configuration. Creating dictionaries with executable source via DDL is disabled, otherwise, the DB user would be able to execute arbitrary binary on ClickHouse node.
## Http(s) {#https}
### HTTP(S) {#https}
Working with an HTTP(s) server depends on [how the dictionary is stored in memory](#storig-dictionaries-in-memory). If the dictionary is stored using `cache` and `complex_key_cache`, ClickHouse requests the necessary keys by sending a request via the `POST` method.
Working with an HTTP(S) server depends on [how the dictionary is stored in memory](#storig-dictionaries-in-memory). If the dictionary is stored using `cache` and `complex_key_cache`, ClickHouse requests the necessary keys by sending a request via the `POST` method.
Example of settings:
@ -1248,7 +1248,55 @@ Setting fields:
When creating a dictionary using the DDL command (`CREATE DICTIONARY ...`) remote hosts for HTTP dictionaries are checked against the contents of `remote_url_allow_hosts` section from config to prevent database users to access arbitrary HTTP server.
### Known Vulnerability of the ODBC Dictionary Functionality
### DBMS
#### ODBC
You can use this method to connect any database that has an ODBC driver.
Example of settings:
``` xml
<source>
<odbc>
<db>DatabaseName</db>
<table>ShemaName.TableName</table>
<connection_string>DSN=some_parameters</connection_string>
<invalidate_query>SQL_QUERY</invalidate_query>
<query>SELECT id, value_1, value_2 FROM ShemaName.TableName</query>
</odbc>
</source>
```
or
``` sql
SOURCE(ODBC(
db 'DatabaseName'
table 'SchemaName.TableName'
connection_string 'DSN=some_parameters'
invalidate_query 'SQL_QUERY'
query 'SELECT id, value_1, value_2 FROM db_name.table_name'
))
```
Setting fields:
- `db` Name of the database. Omit it if the database name is set in the `<connection_string>` parameters.
- `table` Name of the table and schema if exists.
- `connection_string` Connection string.
- `invalidate_query` Query for checking the dictionary status. Optional parameter. Read more in the section [Updating dictionaries](#dictionary-updates).
- `query` The custom query. Optional parameter.
:::note
The `table` and `query` fields cannot be used together. And either one of the `table` or `query` fields must be declared.
:::
ClickHouse receives quoting symbols from ODBC-driver and quote all settings in queries to driver, so its necessary to set table name accordingly to table name case in database.
If you have a problems with encodings when using Oracle, see the corresponding [FAQ](/knowledgebase/oracle-odbc) item.
##### Known Vulnerability of the ODBC Dictionary Functionality
:::note
When connecting to the database through the ODBC driver connection parameter `Servername` can be substituted. In this case values of `USERNAME` and `PASSWORD` from `odbc.ini` are sent to the remote server and can be compromised.
@ -1277,7 +1325,7 @@ SELECT * FROM odbc('DSN=gregtest;Servername=some-server.com', 'test_db');
ODBC driver will send values of `USERNAME` and `PASSWORD` from `odbc.ini` to `some-server.com`.
### Example of Connecting Postgresql
##### Example of Connecting Postgresql
Ubuntu OS.
@ -1358,7 +1406,7 @@ LIFETIME(MIN 300 MAX 360)
You may need to edit `odbc.ini` to specify the full path to the library with the driver `DRIVER=/usr/local/lib/psqlodbcw.so`.
### Example of Connecting MS SQL Server
##### Example of Connecting MS SQL Server
Ubuntu OS.
@ -1462,55 +1510,7 @@ LAYOUT(FLAT())
LIFETIME(MIN 300 MAX 360)
```
## DBMS
### ODBC
You can use this method to connect any database that has an ODBC driver.
Example of settings:
``` xml
<source>
<odbc>
<db>DatabaseName</db>
<table>ShemaName.TableName</table>
<connection_string>DSN=some_parameters</connection_string>
<invalidate_query>SQL_QUERY</invalidate_query>
<query>SELECT id, value_1, value_2 FROM ShemaName.TableName</query>
</odbc>
</source>
```
or
``` sql
SOURCE(ODBC(
db 'DatabaseName'
table 'SchemaName.TableName'
connection_string 'DSN=some_parameters'
invalidate_query 'SQL_QUERY'
query 'SELECT id, value_1, value_2 FROM db_name.table_name'
))
```
Setting fields:
- `db` Name of the database. Omit it if the database name is set in the `<connection_string>` parameters.
- `table` Name of the table and schema if exists.
- `connection_string` Connection string.
- `invalidate_query` Query for checking the dictionary status. Optional parameter. Read more in the section [Updating dictionaries](#dictionary-updates).
- `query` The custom query. Optional parameter.
:::note
The `table` and `query` fields cannot be used together. And either one of the `table` or `query` fields must be declared.
:::
ClickHouse receives quoting symbols from ODBC-driver and quote all settings in queries to driver, so its necessary to set table name accordingly to table name case in database.
If you have a problems with encodings when using Oracle, see the corresponding [FAQ](/knowledgebase/oracle-odbc) item.
### Mysql
#### Mysql
Example of settings:
@ -1627,7 +1627,7 @@ SOURCE(MYSQL(
))
```
### ClickHouse
#### ClickHouse
Example of settings:
@ -1680,7 +1680,7 @@ Setting fields:
The `table` or `where` fields cannot be used together with the `query` field. And either one of the `table` or `query` fields must be declared.
:::
### Mongodb
#### Mongodb
Example of settings:
@ -1723,7 +1723,7 @@ Setting fields:
- `options` - MongoDB connection string options (optional parameter).
### Redis
#### Redis
Example of settings:
@ -1756,7 +1756,7 @@ Setting fields:
- `storage_type` The structure of internal Redis storage using for work with keys. `simple` is for simple sources and for hashed single key sources, `hash_map` is for hashed sources with two keys. Ranged sources and cache sources with complex key are unsupported. May be omitted, default value is `simple`.
- `db_index` The specific numeric index of Redis logical database. May be omitted, default value is 0.
### Cassandra
#### Cassandra
Example of settings:
@ -1798,7 +1798,7 @@ Setting fields:
The `column_family` or `where` fields cannot be used together with the `query` field. And either one of the `column_family` or `query` fields must be declared.
:::
### PostgreSQL
#### PostgreSQL
Example of settings:
@ -1855,7 +1855,7 @@ Setting fields:
The `table` or `where` fields cannot be used together with the `query` field. And either one of the `table` or `query` fields must be declared.
:::
## Null
### Null
A special source that can be used to create dummy (empty) dictionaries. Such dictionaries can useful for tests or with setups with separated data and query nodes at nodes with Distributed tables.

View File

@ -51,7 +51,7 @@ Calculates the MD5 from a string and returns the resulting set of bytes as Fixed
If you do not need MD5 in particular, but you need a decent cryptographic 128-bit hash, use the sipHash128 function instead.
If you want to get the same result as output by the md5sum utility, use lower(hex(MD5(s))).
## sipHash64 (#hash_functions-siphash64)
## sipHash64 {#hash_functions-siphash64}
Produces a 64-bit [SipHash](https://en.wikipedia.org/wiki/SipHash) hash value.
@ -63,9 +63,9 @@ This is a cryptographic hash function. It works at least three times faster than
The function [interprets](/docs/en/sql-reference/functions/type-conversion-functions.md/#type_conversion_functions-reinterpretAsString) all the input parameters as strings and calculates the hash value for each of them. It then combines the hashes by the following algorithm:
1. The first and the second hash value are concatenated to an array which is hashed.
2. The previously calculated hash value and the hash of the third input parameter are hashed in a similar way.
3. This calculation is repeated for all remaining hash values of the original input.
1. The first and the second hash value are concatenated to an array which is hashed.
2. The previously calculated hash value and the hash of the third input parameter are hashed in a similar way.
3. This calculation is repeated for all remaining hash values of the original input.
**Arguments**

View File

@ -4,8 +4,9 @@ sidebar_position: 52
sidebar_label: TRUNCATE
---
# TRUNCATE Statement
# TRUNCATE Statements
## TRUNCATE TABLE
``` sql
TRUNCATE TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster]
```
@ -21,3 +22,10 @@ You can specify how long (in seconds) to wait for inactive replicas to execute `
:::note
If the `alter_sync` is set to `2` and some replicas are not active for more than the time, specified by the `replication_wait_for_inactive_replica_timeout` setting, then an exception `UNFINISHED` is thrown.
:::
## TRUNCATE DATABASE
``` sql
TRUNCATE DATBASE [IF EXISTS] [db.]name [ON CLUSTER cluster]
```
Removes all tables from a database but keeps the database itself. When the clause `IF EXISTS` is omitted, the query returns an error if the database does not exist.

View File

@ -32,6 +32,12 @@ contents:
dst: /usr/bin/clickhouse-keeper
- src: clickhouse-keeper.service
dst: /lib/systemd/system/clickhouse-keeper.service
- src: clickhouse
dst: /usr/bin/clickhouse-keeper-client
type: symlink
- src: clickhouse
dst: /usr/bin/clickhouse-keeper-converter
type: symlink
# docs
- src: ../AUTHORS
dst: /usr/share/doc/clickhouse-keeper/AUTHORS

View File

@ -110,19 +110,18 @@ void Keeper::createServer(const std::string & listen_host, const char * port_nam
}
catch (const Poco::Exception &)
{
std::string message = "Listen [" + listen_host + "]:" + std::to_string(port) + " failed: " + getCurrentExceptionMessage(false);
if (listen_try)
{
LOG_WARNING(&logger(), "{}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
LOG_WARNING(&logger(), "Listen [{}]:{} failed: {}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, "
"then consider to "
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
" Example for disabled IPv4: <listen_host>::</listen_host>",
message);
listen_host, port, getCurrentExceptionMessage(false));
}
else
{
throw Exception::createDeprecated(message, ErrorCodes::NETWORK_ERROR);
throw Exception(ErrorCodes::NETWORK_ERROR, "Listen [{}]:{} failed: {}", listen_host, port, getCurrentExceptionMessage(false));
}
}
}
@ -291,12 +290,6 @@ try
{
path = config().getString("keeper_server.storage_path");
}
else if (std::filesystem::is_directory(std::filesystem::path{config().getString("path", DBMS_DEFAULT_PATH)} / "coordination"))
{
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"By default 'keeper.storage_path' could be assigned to {}, but the directory {} already exists. Please specify 'keeper.storage_path' in the keeper configuration explicitly",
KEEPER_DEFAULT_PATH, String{std::filesystem::path{config().getString("path", DBMS_DEFAULT_PATH)} / "coordination"});
}
else if (config().has("keeper_server.log_storage_path"))
{
path = std::filesystem::path(config().getString("keeper_server.log_storage_path")).parent_path();
@ -305,6 +298,12 @@ try
{
path = std::filesystem::path(config().getString("keeper_server.snapshot_storage_path")).parent_path();
}
else if (std::filesystem::is_directory(std::filesystem::path{config().getString("path", DBMS_DEFAULT_PATH)} / "coordination"))
{
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"By default 'keeper.storage_path' could be assigned to {}, but the directory {} already exists. Please specify 'keeper.storage_path' in the keeper configuration explicitly",
KEEPER_DEFAULT_PATH, String{std::filesystem::path{config().getString("path", DBMS_DEFAULT_PATH)} / "coordination"});
}
else
{
path = KEEPER_DEFAULT_PATH;

View File

@ -2,6 +2,8 @@
#include <sys/resource.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
#include <base/getMemoryAmount.h>
#include <base/errnoToString.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/String.h>
@ -655,43 +657,66 @@ void LocalServer::processConfig()
/// There is no need for concurrent queries, override max_concurrent_queries.
global_context->getProcessList().setMaxSize(0);
/// Size of cache for uncompressed blocks. Zero means disabled.
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", "");
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0);
const size_t memory_amount = getMemoryAmount();
const double cache_size_to_ram_max_ratio = config().getDouble("cache_size_to_ram_max_ratio", 0.5);
const size_t max_cache_size = static_cast<size_t>(memory_amount * cache_size_to_ram_max_ratio);
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", DEFAULT_UNCOMPRESSED_CACHE_POLICY);
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE);
if (uncompressed_cache_size > max_cache_size)
{
uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (uncompressed_cache_size)
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size);
/// Size of cache for marks (index of MergeTree family of tables).
String mark_cache_policy = config().getString("mark_cache_policy", "");
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);
String mark_cache_policy = config().getString("mark_cache_policy", DEFAULT_MARK_CACHE_POLICY);
size_t mark_cache_size = config().getUInt64("mark_cache_size", DEFAULT_MARK_CACHE_MAX_SIZE);
if (!mark_cache_size)
LOG_ERROR(log, "Too low mark cache size will lead to severe performance degradation.");
if (mark_cache_size > max_cache_size)
{
mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size));
}
if (mark_cache_size)
global_context->setMarkCache(mark_cache_policy, mark_cache_size);
/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
if (index_uncompressed_cache_size > max_cache_size)
{
index_uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);
/// Size of cache for index marks (index of MergeTree skip indices).
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
if (index_mark_cache_size > max_cache_size)
{
index_mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);
/// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", DEFAULT_MMAP_CACHE_MAX_SIZE);
if (mmap_cache_size > max_cache_size)
{
mmap_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (mmap_cache_size)
global_context->setMMappedFileCache(mmap_cache_size);
/// In Server.cpp (./clickhouse-server), we would initialize the query cache here.
/// Intentionally not doing this in clickhouse-local as it doesn't make sense.
#if USE_EMBEDDED_COMPILER
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
size_t compiled_expression_cache_size = config().getUInt64("compiled_expression_cache_size", compiled_expression_cache_size_default);
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
size_t compiled_expression_cache_elements_size
= config().getUInt64("compiled_expression_cache_elements_size", compiled_expression_cache_elements_size_default);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
size_t compiled_expression_cache_max_elements = config().getUInt64("compiled_expression_cache_elements_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
#endif
/// NOTE: it is important to apply any overrides before

View File

@ -365,17 +365,14 @@ static void transformFixedString(const UInt8 * src, UInt8 * dst, size_t size, UI
hash.update(seed);
hash.update(i);
const auto checksum = getSipHash128AsArray(hash);
if (size >= 16)
{
char * hash_dst = reinterpret_cast<char *>(std::min(pos, end - 16));
hash.get128(hash_dst);
auto * hash_dst = std::min(pos, end - 16);
memcpy(hash_dst, checksum.data(), checksum.size());
}
else
{
char value[16];
hash.get128(value);
memcpy(dst, value, end - dst);
}
memcpy(dst, checksum.data(), end - dst);
pos += 16;
++i;
@ -401,7 +398,7 @@ static void transformUUID(const UUID & src_uuid, UUID & dst_uuid, UInt64 seed)
hash.update(reinterpret_cast<const char *>(&src), sizeof(UUID));
/// Saving version and variant from an old UUID
hash.get128(reinterpret_cast<char *>(&dst));
dst = hash.get128();
dst.items[1] = (dst.items[1] & 0x1fffffffffffffffull) | (src.items[1] & 0xe000000000000000ull);
dst.items[0] = (dst.items[0] & 0xffffffffffff0fffull) | (src.items[0] & 0x000000000000f000ull);

View File

@ -29,6 +29,7 @@
#include <Common/ShellCommand.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <Common/formatReadable.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getExecutablePath.h>
@ -325,19 +326,18 @@ void Server::createServer(
}
catch (const Poco::Exception &)
{
std::string message = "Listen [" + listen_host + "]:" + std::to_string(port) + " failed: " + getCurrentExceptionMessage(false);
if (listen_try)
{
LOG_WARNING(&logger(), "{}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
LOG_WARNING(&logger(), "Listen [{}]:{} failed: {}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, "
"then consider to "
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
" Example for disabled IPv4: <listen_host>::</listen_host>",
message);
listen_host, port, getCurrentExceptionMessage(false));
}
else
{
throw Exception::createDeprecated(message, ErrorCodes::NETWORK_ERROR);
throw Exception(ErrorCodes::NETWORK_ERROR, "Listen [{}]:{} failed: {}", listen_host, port, getCurrentExceptionMessage(false));
}
}
}
@ -658,10 +658,10 @@ try
global_context->addWarningMessage("Server was built with sanitizer. It will work slowly.");
#endif
const auto memory_amount = getMemoryAmount();
const size_t physical_server_memory = getMemoryAmount();
LOG_INFO(log, "Available RAM: {}; physical cores: {}; logical cores: {}.",
formatReadableSizeWithBinarySuffix(memory_amount),
formatReadableSizeWithBinarySuffix(physical_server_memory),
getNumberOfPhysicalCPUCores(), // on ARM processors it can show only enabled at current moment cores
std::thread::hardware_concurrency());
@ -1136,9 +1136,10 @@ try
server_settings_.loadSettingsFromConfig(*config);
size_t max_server_memory_usage = server_settings_.max_server_memory_usage;
double max_server_memory_usage_to_ram_ratio = server_settings_.max_server_memory_usage_to_ram_ratio;
size_t default_max_server_memory_usage = static_cast<size_t>(memory_amount * max_server_memory_usage_to_ram_ratio);
size_t current_physical_server_memory = getMemoryAmount(); /// With cgroups, the amount of memory available to the server can be changed dynamically.
size_t default_max_server_memory_usage = static_cast<size_t>(current_physical_server_memory * max_server_memory_usage_to_ram_ratio);
if (max_server_memory_usage == 0)
{
@ -1146,7 +1147,7 @@ try
LOG_INFO(log, "Setting max_server_memory_usage was set to {}"
" ({} available * {:.2f} max_server_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(max_server_memory_usage),
formatReadableSizeWithBinarySuffix(memory_amount),
formatReadableSizeWithBinarySuffix(current_physical_server_memory),
max_server_memory_usage_to_ram_ratio);
}
else if (max_server_memory_usage > default_max_server_memory_usage)
@ -1157,7 +1158,7 @@ try
" calculated as {} available"
" * {:.2f} max_server_memory_usage_to_ram_ratio",
formatReadableSizeWithBinarySuffix(max_server_memory_usage),
formatReadableSizeWithBinarySuffix(memory_amount),
formatReadableSizeWithBinarySuffix(current_physical_server_memory),
max_server_memory_usage_to_ram_ratio);
}
@ -1167,14 +1168,14 @@ try
size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit;
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(current_physical_server_memory * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
if (merges_mutations_memory_usage_soft_limit == 0)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_INFO(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(memory_amount),
formatReadableSizeWithBinarySuffix(current_physical_server_memory),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
}
else if (merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
@ -1183,7 +1184,7 @@ try
LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(memory_amount),
formatReadableSizeWithBinarySuffix(current_physical_server_memory),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
}
@ -1485,16 +1486,14 @@ try
/// Set up caches.
size_t max_cache_size = static_cast<size_t>(memory_amount * server_settings.cache_size_to_ram_max_ratio);
const size_t max_cache_size = static_cast<size_t>(physical_server_memory * server_settings.cache_size_to_ram_max_ratio);
String uncompressed_cache_policy = server_settings.uncompressed_cache_policy;
LOG_INFO(log, "Uncompressed cache policy name {}", uncompressed_cache_policy);
size_t uncompressed_cache_size = server_settings.uncompressed_cache_size;
if (uncompressed_cache_size > max_cache_size)
{
uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Uncompressed cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
LOG_INFO(log, "Lowered uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size);
@ -1513,39 +1512,59 @@ try
server_settings.async_insert_queue_flush_on_shutdown));
}
size_t mark_cache_size = server_settings.mark_cache_size;
String mark_cache_policy = server_settings.mark_cache_policy;
size_t mark_cache_size = server_settings.mark_cache_size;
if (!mark_cache_size)
LOG_ERROR(log, "Too low mark cache size will lead to severe performance degradation.");
if (mark_cache_size > max_cache_size)
{
mark_cache_size = max_cache_size;
LOG_INFO(log, "Mark cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(mark_cache_size));
LOG_INFO(log, "Lowered mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size));
}
global_context->setMarkCache(mark_cache_policy, mark_cache_size);
if (server_settings.index_uncompressed_cache_size)
size_t index_uncompressed_cache_size = server_settings.index_uncompressed_cache_size;
if (index_uncompressed_cache_size > max_cache_size)
{
index_uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(server_settings.index_uncompressed_cache_size);
if (server_settings.index_mark_cache_size)
size_t index_mark_cache_size = server_settings.index_mark_cache_size;
if (index_mark_cache_size > max_cache_size)
{
index_mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_mark_cache_size)
global_context->setIndexMarkCache(server_settings.index_mark_cache_size);
if (server_settings.mmap_cache_size)
size_t mmap_cache_size = server_settings.mmap_cache_size;
if (mmap_cache_size > max_cache_size)
{
mmap_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (mmap_cache_size)
global_context->setMMappedFileCache(server_settings.mmap_cache_size);
/// A cache for query results.
global_context->setQueryCache(config());
size_t query_cache_max_size_in_bytes = config().getUInt64("query_cache.max_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_SIZE);
size_t query_cache_max_entries = config().getUInt64("query_cache.max_entries", DEFAULT_QUERY_CACHE_MAX_ENTRIES);
size_t query_cache_query_cache_max_entry_size_in_bytes = config().getUInt64("query_cache.max_entry_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES);
size_t query_cache_max_entry_size_in_rows = config().getUInt64("query_cache.max_entry_rows_in_rows", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS);
if (query_cache_max_size_in_bytes > max_cache_size)
{
query_cache_max_size_in_bytes = max_cache_size;
LOG_INFO(log, "Lowered query cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setQueryCache(query_cache_max_size_in_bytes, query_cache_max_entries, query_cache_query_cache_max_entry_size_in_bytes, query_cache_max_entry_size_in_rows);
#if USE_EMBEDDED_COMPILER
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
size_t compiled_expression_cache_size = config().getUInt64("compiled_expression_cache_size", compiled_expression_cache_size_default);
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
size_t compiled_expression_cache_elements_size = config().getUInt64("compiled_expression_cache_elements_size", compiled_expression_cache_elements_size_default);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
size_t compiled_expression_cache_max_elements = config().getUInt64("compiled_expression_cache_elements_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
#endif
/// Set path for format schema files

View File

@ -95,7 +95,7 @@ enum class AccessType
M(CREATE_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute CREATE NAMED COLLECTION */ \
M(CREATE, "", GROUP, ALL) /* allows to execute {CREATE|ATTACH} */ \
\
M(DROP_DATABASE, "", DATABASE, DROP) /* allows to execute {DROP|DETACH} DATABASE */\
M(DROP_DATABASE, "", DATABASE, DROP) /* allows to execute {DROP|DETACH|TRUNCATE} DATABASE */\
M(DROP_TABLE, "", TABLE, DROP) /* allows to execute {DROP|DETACH} TABLE */\
M(DROP_VIEW, "", VIEW, DROP) /* allows to execute {DROP|DETACH} TABLE for views;
implicitly enabled by the grant DROP_TABLE */\

View File

@ -549,7 +549,7 @@ LDAPClient::SearchResults LDAPClient::search(const SearchParams & search_params)
if (rc != LDAP_SUCCESS)
{
String message = "LDAP search failed";
String message;
const char * raw_err_str = ldap_err2string(rc);
if (raw_err_str && *raw_err_str != '\0')
@ -570,7 +570,7 @@ LDAPClient::SearchResults LDAPClient::search(const SearchParams & search_params)
message += matched_msg;
}
throw Exception::createDeprecated(message, ErrorCodes::LDAP_ERROR);
throw Exception(ErrorCodes::LDAP_ERROR, "LDAP search failed{}", message);
}
break;

View File

@ -502,4 +502,15 @@ void MultipleAccessStorage::restoreFromBackup(RestorerFromBackup & restorer)
throwBackupNotAllowed();
}
bool MultipleAccessStorage::containsStorage(std::string_view storage_type) const
{
auto storages = getStoragesInternal();
for (const auto & storage : *storages)
{
if (storage->getStorageType() == storage_type)
return true;
}
return false;
}
}

View File

@ -57,6 +57,7 @@ public:
bool isRestoreAllowed() const override;
void backup(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, AccessEntityType type) const override;
void restoreFromBackup(RestorerFromBackup & restorer) override;
bool containsStorage(std::string_view storage_type) const;
protected:
std::optional<UUID> findImpl(AccessEntityType type, const String & name) const override;

View File

@ -4,6 +4,8 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Interpreters/Context.h>
#include <Core/ServerSettings.h>
namespace DB
@ -43,6 +45,13 @@ inline AggregateFunctionPtr createAggregateFunctionGroupArrayImpl(const DataType
return std::make_shared<GroupArrayGeneralImpl<GroupArrayNodeGeneral, Trait>>(argument_type, parameters, std::forward<TArgs>(args)...);
}
size_t getMaxArraySize()
{
if (auto context = Context::getGlobalContextInstance())
return context->getServerSettings().aggregate_function_group_array_max_element_size;
return 0xFFFFFF;
}
template <bool Tlast>
AggregateFunctionPtr createAggregateFunctionGroupArray(
@ -51,7 +60,7 @@ AggregateFunctionPtr createAggregateFunctionGroupArray(
assertUnary(name, argument_types);
bool limit_size = false;
UInt64 max_elems = std::numeric_limits<UInt64>::max();
UInt64 max_elems = getMaxArraySize();
if (parameters.empty())
{
@ -78,7 +87,7 @@ AggregateFunctionPtr createAggregateFunctionGroupArray(
{
if (Tlast)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "groupArrayLast make sense only with max_elems (groupArrayLast(max_elems)())");
return createAggregateFunctionGroupArrayImpl<GroupArrayTrait</* Thas_limit= */ false, Tlast, /* Tsampler= */ Sampler::NONE>>(argument_types[0], parameters);
return createAggregateFunctionGroupArrayImpl<GroupArrayTrait</* Thas_limit= */ false, Tlast, /* Tsampler= */ Sampler::NONE>>(argument_types[0], parameters, max_elems);
}
else
return createAggregateFunctionGroupArrayImpl<GroupArrayTrait</* Thas_limit= */ true, Tlast, /* Tsampler= */ Sampler::NONE>>(argument_types[0], parameters, max_elems);

View File

@ -21,7 +21,7 @@
#include <type_traits>
#define AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE 0xFFFFFF
#define AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE 0xFFFFFF
namespace DB
@ -128,7 +128,7 @@ class GroupArrayNumericImpl final
public:
explicit GroupArrayNumericImpl(
const DataTypePtr & data_type_, const Array & parameters_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max(), UInt64 seed_ = 123456)
const DataTypePtr & data_type_, const Array & parameters_, UInt64 max_elems_, UInt64 seed_ = 123456)
: IAggregateFunctionDataHelper<GroupArrayNumericData<T, Trait::sampler != Sampler::NONE>, GroupArrayNumericImpl<T, Trait>>(
{data_type_}, parameters_, std::make_shared<DataTypeArray>(data_type_))
, max_elems(max_elems_)
@ -263,10 +263,18 @@ public:
}
}
static void checkArraySize(size_t elems, size_t max_elems)
{
if (unlikely(elems > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size {} (maximum: {})", elems, max_elems);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
const auto & value = this->data(place).value;
const size_t size = value.size();
const UInt64 size = value.size();
checkArraySize(size, max_elems);
writeVarUInt(size, buf);
for (const auto & element : value)
writeBinaryLittleEndian(element, buf);
@ -287,13 +295,7 @@ public:
{
size_t size = 0;
readVarUInt(size, buf);
if (unlikely(size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size (maximum: {})", AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE);
if (limit_num_elems && unlikely(size > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array size, it should not exceed {}", max_elems);
checkArraySize(size, max_elems);
auto & value = this->data(place).value;
@ -357,9 +359,17 @@ struct GroupArrayNodeBase
const_cast<char *>(arena->alignedInsert(reinterpret_cast<const char *>(this), sizeof(Node) + size, alignof(Node))));
}
static void checkElementSize(size_t size, size_t max_size)
{
if (unlikely(size > max_size))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array element size {} (maximum: {})", size, max_size);
}
/// Write node to buffer
void write(WriteBuffer & buf) const
{
checkElementSize(size, AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE);
writeVarUInt(size, buf);
buf.write(data(), size);
}
@ -369,9 +379,7 @@ struct GroupArrayNodeBase
{
UInt64 size;
readVarUInt(size, buf);
if (unlikely(size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size (maximum: {})", AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE);
checkElementSize(size, AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE);
Node * node = reinterpret_cast<Node *>(arena->alignedAlloc(sizeof(Node) + size, alignof(Node)));
node->size = size;
@ -455,7 +463,7 @@ class GroupArrayGeneralImpl final
UInt64 seed;
public:
GroupArrayGeneralImpl(const DataTypePtr & data_type_, const Array & parameters_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max(), UInt64 seed_ = 123456)
GroupArrayGeneralImpl(const DataTypePtr & data_type_, const Array & parameters_, UInt64 max_elems_, UInt64 seed_ = 123456)
: IAggregateFunctionDataHelper<GroupArrayGeneralData<Node, Trait::sampler != Sampler::NONE>, GroupArrayGeneralImpl<Node, Trait>>(
{data_type_}, parameters_, std::make_shared<DataTypeArray>(data_type_))
, data_type(this->argument_types[0])
@ -596,9 +604,18 @@ public:
}
}
static void checkArraySize(size_t elems, size_t max_elems)
{
if (unlikely(elems > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size {} (maximum: {})", elems, max_elems);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
writeVarUInt(data(place).value.size(), buf);
UInt64 elems = data(place).value.size();
checkArraySize(elems, max_elems);
writeVarUInt(elems, buf);
auto & value = data(place).value;
for (auto & node : value)
@ -624,12 +641,7 @@ public:
if (unlikely(elems == 0))
return;
if (unlikely(elems > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size (maximum: {})", AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE);
if (limit_num_elems && unlikely(elems > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array size, it should not exceed {}", max_elems);
checkArraySize(elems, max_elems);
auto & value = data(place).value;
@ -673,6 +685,6 @@ public:
bool allocatesMemoryInArena() const override { return true; }
};
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE
}

View File

@ -315,10 +315,9 @@ struct Adder
{
StringRef value = column.getDataAt(row_num);
UInt128 key;
SipHash hash;
hash.update(value.data, value.size);
hash.get128(key);
const auto key = hash.get128();
data.set.template insert<const UInt128 &, use_single_level_hash_table>(key);
}

View File

@ -107,9 +107,7 @@ struct UniqVariadicHash<true, false>
++column;
}
UInt128 key;
hash.get128(key);
return key;
return hash.get128();
}
};
@ -131,9 +129,7 @@ struct UniqVariadicHash<true, true>
++column;
}
UInt128 key;
hash.get128(key);
return key;
return hash.get128();
}
};

View File

@ -20,7 +20,7 @@ struct QueryTreeNodeWithHash
{}
QueryTreeNodePtrType node = nullptr;
std::pair<UInt64, UInt64> hash;
CityHash_v1_0_2::uint128 hash;
};
template <typename T>
@ -55,6 +55,6 @@ struct std::hash<DB::QueryTreeNodeWithHash<T>>
{
size_t operator()(const DB::QueryTreeNodeWithHash<T> & node_with_hash) const
{
return node_with_hash.hash.first;
return node_with_hash.hash.low64;
}
};

View File

@ -229,10 +229,7 @@ IQueryTreeNode::Hash IQueryTreeNode::getTreeHash() const
}
}
Hash result;
hash_state.get128(result);
return result;
return getSipHash128AsPair(hash_state);
}
QueryTreeNodePtr IQueryTreeNode::clone() const

View File

@ -106,7 +106,7 @@ public:
*/
bool isEqual(const IQueryTreeNode & rhs, CompareOptions compare_options = { .compare_aliases = true }) const;
using Hash = std::pair<UInt64, UInt64>;
using Hash = CityHash_v1_0_2::uint128;
using HashState = SipHash;
/** Get tree hash identifying current tree

View File

@ -2033,7 +2033,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
auto & nearest_query_scope_query_node = nearest_query_scope->scope_node->as<QueryNode &>();
auto & mutable_context = nearest_query_scope_query_node.getMutableContext();
auto scalar_query_hash_string = std::to_string(node_with_hash.hash.first) + '_' + std::to_string(node_with_hash.hash.second);
auto scalar_query_hash_string = DB::toString(node_with_hash.hash);
if (mutable_context->hasQueryContext())
mutable_context->getQueryContext()->addScalar(scalar_query_hash_string, scalar_block);

View File

@ -187,7 +187,7 @@ BackupCoordinationRemote::BackupCoordinationRemote(
if (code == Coordination::Error::ZNODEEXISTS)
zk->handleEphemeralNodeExistenceNoFailureInjection(alive_node_path, "");
else if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException(code, alive_node_path);
throw zkutil::KeeperException::fromPath(code, alive_node_path);
}
})
{
@ -745,7 +745,7 @@ bool BackupCoordinationRemote::startWritingFile(size_t data_file_index)
else if (code == Coordination::Error::ZNODEEXISTS)
host_is_assigned = (zk->get(full_path) == host_index_str); /// The previous retry could write this ZooKeeper's node and then fail.
else
throw zkutil::KeeperException(code, full_path);
throw zkutil::KeeperException::fromPath(code, full_path);
});
if (!host_is_assigned)
@ -815,7 +815,7 @@ bool BackupCoordinationRemote::hasConcurrentBackups(const std::atomic<size_t> &)
break;
bool is_last_attempt = (attempt == MAX_ZOOKEEPER_ATTEMPTS - 1);
if ((code != Coordination::Error::ZBADVERSION) || is_last_attempt)
throw zkutil::KeeperException(code, backup_stage_path);
throw zkutil::KeeperException::fromPath(code, backup_stage_path);
}
});

View File

@ -56,7 +56,7 @@ void BackupCoordinationStageSync::set(const String & current_host, const String
{
auto code = zookeeper->trySet(zookeeper_path, new_stage);
if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException(code, zookeeper_path);
throw zkutil::KeeperException::fromPath(code, zookeeper_path);
}
else
{
@ -64,7 +64,7 @@ void BackupCoordinationStageSync::set(const String & current_host, const String
String alive_node_path = zookeeper_path + "/alive|" + current_host;
auto code = zookeeper->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS)
throw zkutil::KeeperException(code, alive_node_path);
throw zkutil::KeeperException::fromPath(code, alive_node_path);
zookeeper->createIfNotExists(zookeeper_path + "/started|" + current_host, "");
zookeeper->createIfNotExists(zookeeper_path + "/current|" + current_host + "|" + new_stage, message);
@ -90,7 +90,7 @@ void BackupCoordinationStageSync::setError(const String & current_host, const Ex
/// so the following line tries to preserve the error status.
auto code = zookeeper->trySet(zookeeper_path, Stage::ERROR);
if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException(code, zookeeper_path);
throw zkutil::KeeperException::fromPath(code, zookeeper_path);
});
}

View File

@ -159,6 +159,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
blob_path.size(), mode);
copyS3File(
client,
client,
s3_uri.bucket,
fs::path(s3_uri.key) / path_in_backup,
@ -218,6 +219,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
{
LOG_TRACE(log, "Copying file {} from disk {} to S3", src_path, src_disk->getName());
copyS3File(
client,
client,
/* src_bucket */ blob_path[1],
/* src_key= */ blob_path[0],
@ -238,7 +240,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
void BackupWriterS3::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
{
copyDataToS3File(create_read_buffer, start_pos, length, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, request_settings, {},
copyDataToS3File(create_read_buffer, start_pos, length, client, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, request_settings, {},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
}

View File

@ -46,7 +46,7 @@ RestoreCoordinationRemote::RestoreCoordinationRemote(
if (code == Coordination::Error::ZNODEEXISTS)
zk->handleEphemeralNodeExistenceNoFailureInjection(alive_node_path, "");
else if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException(code, alive_node_path);
throw zkutil::KeeperException::fromPath(code, alive_node_path);
}
})
{
@ -129,7 +129,7 @@ bool RestoreCoordinationRemote::acquireCreatingTableInReplicatedDatabase(const S
path += "/" + escapeForFileName(table_name);
auto code = zk->tryCreate(path, toString(current_host_index), zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, path);
throw zkutil::KeeperException::fromPath(code, path);
if (code == Coordination::Error::ZOK)
{
@ -155,7 +155,7 @@ bool RestoreCoordinationRemote::acquireInsertingDataIntoReplicatedTable(const St
String path = zookeeper_path + "/repl_tables_data_acquired/" + escapeForFileName(table_zk_path);
auto code = zk->tryCreate(path, toString(current_host_index), zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, path);
throw zkutil::KeeperException::fromPath(code, path);
if (code == Coordination::Error::ZOK)
{
@ -181,7 +181,7 @@ bool RestoreCoordinationRemote::acquireReplicatedAccessStorage(const String & ac
String path = zookeeper_path + "/repl_access_storages_acquired/" + escapeForFileName(access_storage_zk_path);
auto code = zk->tryCreate(path, toString(current_host_index), zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, path);
throw zkutil::KeeperException::fromPath(code, path);
if (code == Coordination::Error::ZOK)
{
@ -217,7 +217,7 @@ bool RestoreCoordinationRemote::acquireReplicatedSQLObjects(const String & loade
auto code = zk->tryCreate(path, "", zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, path);
throw zkutil::KeeperException::fromPath(code, path);
if (code == Coordination::Error::ZOK)
{
@ -302,7 +302,7 @@ bool RestoreCoordinationRemote::hasConcurrentRestores(const std::atomic<size_t>
break;
bool is_last_attempt = (attempt == MAX_ZOOKEEPER_ATTEMPTS - 1);
if ((code != Coordination::Error::ZBADVERSION) || is_last_attempt)
throw zkutil::KeeperException(code, path);
throw zkutil::KeeperException::fromPath(code, path);
}
});

View File

@ -847,7 +847,9 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
visitor.visit(parsed_query);
/// Get new query after substitutions.
query = serializeAST(*parsed_query);
if (visitor.getNumberOfReplacedParameters())
query = serializeAST(*parsed_query);
chassert(!query.empty());
}
if (allow_merge_tree_settings && parsed_query->as<ASTCreateQuery>())
@ -1332,7 +1334,9 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
visitor.visit(parsed_query);
/// Get new query after substitutions.
query = serializeAST(*parsed_query);
if (visitor.getNumberOfReplacedParameters())
query = serializeAST(*parsed_query);
chassert(!query.empty());
}
/// Process the query that requires transferring data blocks to the server.
@ -1811,7 +1815,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
}
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
{
const String & new_database = use_query->database;
const String & new_database = use_query->getDatabase();
/// If the client initiates the reconnection, it takes the settings from the config.
config().setString("database", new_database);
/// If the connection initiates the reconnection, it uses its variable.

View File

@ -170,7 +170,7 @@ bool ConnectionEstablisherAsync::checkTimeout()
epoll_event events[2];
events[0].data.fd = events[1].data.fd = -1;
size_t ready_count = epoll.getManyReady(2, events, false);
size_t ready_count = epoll.getManyReady(2, events, 0);
for (size_t i = 0; i != ready_count; ++i)
{
if (events[i].data.fd == socket_fd)

View File

@ -388,7 +388,7 @@ int HedgedConnections::getReadyFileDescriptor(AsyncCallback async_callback)
bool blocking = !static_cast<bool>(async_callback);
while (events_count == 0)
{
events_count = epoll.getManyReady(1, &event, blocking);
events_count = epoll.getManyReady(1, &event, blocking ? -1 : 0);
if (!events_count && async_callback)
async_callback(epoll.getFileDescriptor(), 0, AsyncEventTimeoutType::NONE, epoll.getDescription(), AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
}

View File

@ -521,8 +521,7 @@ void QueryFuzzer::fuzzCreateQuery(ASTCreateQuery & create)
if (create.storage)
create.storage->updateTreeHash(sip_hash);
IAST::Hash hash;
sip_hash.get128(hash);
const auto hash = getSipHash128AsPair(sip_hash);
/// Save only tables with unique definition.
if (created_tables_hashes.insert(hash).second)

View File

@ -524,7 +524,7 @@ void ColumnAggregateFunction::insertDefault()
pushBackAndCreateState(data, arena, func.get());
}
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin, const UInt8 *) const
{
WriteBufferFromArena out(arena, begin);
func->serialize(data[n], out, version);

View File

@ -162,7 +162,7 @@ public:
void insertDefault() override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * src_arena) override;

View File

@ -205,7 +205,7 @@ void ColumnArray::insertData(const char * pos, size_t length)
}
StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
size_t array_size = sizeAt(n);
size_t offset = offsetAt(n);

View File

@ -77,7 +77,7 @@ public:
StringRef getDataAt(size_t n) const override;
bool isDefaultAt(size_t n) const override;
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -88,7 +88,7 @@ public:
void insertData(const char *, size_t) override { throwMustBeDecompressed(); }
void insertDefault() override { throwMustBeDecompressed(); }
void popBack(size_t) override { throwMustBeDecompressed(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeDecompressed(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeDecompressed(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeDecompressed(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeDecompressed(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeDecompressed(); }

View File

@ -151,7 +151,7 @@ public:
s -= n;
}
StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin) const override
StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin, const UInt8 *) const override
{
return data->serializeValueIntoArena(0, arena, begin);
}

View File

@ -59,9 +59,26 @@ bool ColumnDecimal<T>::hasEqualValues() const
}
template <is_decimal T>
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
auto * pos = arena.allocContinue(sizeof(T), begin);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &data[n], sizeof(T));
return StringRef(pos, sizeof(T));
}

View File

@ -80,7 +80,7 @@ public:
Float64 getFloat64(size_t n) const final { return DecimalUtils::convertTo<Float64>(data[n], scale); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -86,11 +86,28 @@ void ColumnFixedString::insertData(const char * pos, size_t length)
memset(chars.data() + old_size + length, 0, n - length);
}
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
auto * pos = arena.allocContinue(n, begin);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &chars[n * index], n);
return StringRef(pos, n);
return res;
}
const char * ColumnFixedString::deserializeAndInsertFromArena(const char * pos)

View File

@ -115,7 +115,7 @@ public:
chars.resize_assume_reserved(chars.size() - n * elems);
}
StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -96,7 +96,7 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot insert into {}", getName());
}
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot serialize from {}", getName());
}

View File

@ -255,7 +255,7 @@ void ColumnLowCardinality::insertData(const char * pos, size_t length)
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertData(pos, length));
}
StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return getDictionary().serializeValueIntoArena(getIndexes().getUInt(n), arena, begin);
}

View File

@ -87,7 +87,7 @@ public:
void popBack(size_t n) override { idx.popBack(n); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -111,7 +111,7 @@ void ColumnMap::popBack(size_t n)
nested->popBack(n);
}
StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return nested->serializeValueIntoArena(n, arena, begin);
}

View File

@ -58,7 +58,7 @@ public:
void insert(const Field & x) override;
void insertDefault() override;
void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -4,6 +4,10 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/WeakHash.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsDateTime.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
@ -34,6 +38,7 @@ ColumnNullable::ColumnNullable(MutableColumnPtr && nested_column_, MutableColumn
{
/// ColumnNullable cannot have constant nested column. But constant argument could be passed. Materialize it.
nested_column = getNestedColumn().convertToFullColumnIfConst();
nested_type = nested_column->getDataType();
if (!getNestedColumn().canBeInsideNullable())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "{} cannot be inside Nullable column", getNestedColumn().getName());
@ -134,21 +139,77 @@ void ColumnNullable::insertData(const char * pos, size_t length)
}
}
StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
const auto & arr = getNullMapData();
static constexpr auto s = sizeof(arr[0]);
char * pos;
auto * pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);
if (arr[n])
return StringRef(pos, s);
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
switch (nested_type)
{
case TypeIndex::UInt8:
return static_cast<const ColumnUInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt16:
return static_cast<const ColumnUInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt32:
return static_cast<const ColumnUInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt64:
return static_cast<const ColumnUInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt128:
return static_cast<const ColumnUInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt256:
return static_cast<const ColumnUInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int8:
return static_cast<const ColumnInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int16:
return static_cast<const ColumnInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int32:
return static_cast<const ColumnInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int64:
return static_cast<const ColumnInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int128:
return static_cast<const ColumnInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int256:
return static_cast<const ColumnInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float32:
return static_cast<const ColumnFloat32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float64:
return static_cast<const ColumnFloat64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date:
return static_cast<const ColumnDate *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date32:
return static_cast<const ColumnDate32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime:
return static_cast<const ColumnDateTime *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime64:
return static_cast<const ColumnDateTime64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::String:
return static_cast<const ColumnString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::FixedString:
return static_cast<const ColumnFixedString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal32:
return static_cast<const ColumnDecimal<Decimal32> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal64:
return static_cast<const ColumnDecimal<Decimal64> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal128:
return static_cast<const ColumnDecimal<Decimal128> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal256:
return static_cast<const ColumnDecimal<Decimal256> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UUID:
return static_cast<const ColumnUUID *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv4:
return static_cast<const ColumnIPv4 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv6:
return static_cast<const ColumnIPv6 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
default:
pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);
if (arr[n])
return StringRef(pos, s);
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
}
}
const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos)

View File

@ -6,6 +6,7 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include "Core/TypeId.h"
#include "config.h"
@ -62,7 +63,7 @@ public:
StringRef getDataAt(size_t) const override;
/// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
@ -212,6 +213,8 @@ public:
private:
WrappedPtr nested_column;
WrappedPtr null_map;
// optimize serializeValueIntoArena
TypeIndex nested_type;
template <bool negative>
void applyNullMapImpl(const NullMap & map);

View File

@ -244,7 +244,7 @@ public:
StringRef getDataAt(size_t) const override { throwMustBeConcrete(); }
bool isDefaultAt(size_t) const override { throwMustBeConcrete(); }
void insertData(const char *, size_t) override { throwMustBeConcrete(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeConcrete(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeConcrete(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeConcrete(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeConcrete(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeConcrete(); }

View File

@ -150,7 +150,7 @@ void ColumnSparse::insertData(const char * pos, size_t length)
insertSingleValue([&](IColumn & column) { column.insertData(pos, length); });
}
StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return values->serializeValueIntoArena(getValueIndex(n), arena, begin);
}

View File

@ -78,7 +78,7 @@ public:
/// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char *) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;

View File

@ -213,17 +213,30 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
}
StringRef ColumnString::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnString::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
size_t string_size = sizeAt(n);
size_t offset = offsetAt(n);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
res.size = sizeof(string_size) + string_size;
char * pos = arena.allocContinue(res.size, begin);
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(string_size) + string_size;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(string_size) + string_size;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &string_size, sizeof(string_size));
memcpy(pos + sizeof(string_size), &chars[offset], string_size);
res.data = pos;
return res;
}

View File

@ -11,6 +11,7 @@
#include <Common/memcmpSmall.h>
#include <Common/assert_cast.h>
#include <Core/Field.h>
#include <Common/Arena.h>
class Collator;
@ -168,7 +169,7 @@ public:
offsets.resize_assume_reserved(offsets.size() - n);
}
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -171,7 +171,7 @@ void ColumnTuple::popBack(size_t n)
column->popBack(n);
}
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
StringRef res(begin, 0);
for (const auto & column : columns)

View File

@ -61,7 +61,7 @@ public:
void insertFrom(const IColumn & src_, size_t n) override;
void insertDefault() override;
void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -79,7 +79,7 @@ public:
Float32 getFloat32(size_t n) const override { return getNestedColumn()->getFloat32(n); }
bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); }
bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash_func) const override
{
@ -373,7 +373,7 @@ size_t ColumnUnique<ColumnType>::uniqueInsertData(const char * pos, size_t lengt
}
template <typename ColumnType>
StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
if (is_nullable)
{
@ -670,8 +670,9 @@ UInt128 ColumnUnique<ColumnType>::IncrementalHash::getHash(const ColumnType & co
for (size_t i = 0; i < column_size; ++i)
column.updateHashWithValue(i, sip_hash);
hash = sip_hash.get128();
std::lock_guard lock(mutex);
sip_hash.get128(hash);
cur_hash = hash;
num_added_rows.store(column_size);
}

View File

@ -49,11 +49,28 @@ namespace ErrorCodes
}
template <typename T>
StringRef ColumnVector<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnVector<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
auto * pos = arena.allocContinue(sizeof(T), begin);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
unalignedStore<T>(pos, data[n]);
return StringRef(pos, sizeof(T));
return res;
}
template <typename T>

View File

@ -174,7 +174,7 @@ public:
data.resize_assume_reserved(data.size() - n);
}
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -218,7 +218,7 @@ public:
* For example, to obtain unambiguous representation of Array of strings, strings data should be interleaved with their sizes.
* Parameter begin should be used with Arena::allocContinue.
*/
virtual StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const = 0;
virtual StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit = nullptr) const = 0;
/// Deserializes a value that was serialized using IColumn::serializeValueIntoArena method.
/// Returns pointer to the position after the read data.

View File

@ -57,7 +57,7 @@ public:
++s;
}
StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin) const override
StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin, const UInt8 *) const override
{
/// Has to put one useless byte into Arena, because serialization into zero number of bytes is ambiguous.
char * res = arena.allocContinue(1, begin);

View File

@ -117,7 +117,7 @@ void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const
const char * pos = nullptr;
for (size_t i = 0; i < num_values; ++i)
{
auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos);
auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos, nullptr);
const char * new_pos;
column_unique->uniqueDeserializeAndInsertFromArena(ref.data, new_pos);
ASSERT_EQ(new_pos - ref.data, ref.size) << "Deserialized data has different sizes at position " << i;
@ -140,8 +140,8 @@ void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const
const char * pos_lc = nullptr;
for (size_t i = 0; i < num_values; ++i)
{
auto ref_string = column.serializeValueIntoArena(i, arena_string, pos_string);
auto ref_lc = column_unique->serializeValueIntoArena(idx->getUInt(i), arena_lc, pos_lc);
auto ref_string = column.serializeValueIntoArena(i, arena_string, pos_string, nullptr);
auto ref_lc = column_unique->serializeValueIntoArena(idx->getUInt(i), arena_lc, pos_lc, nullptr);
ASSERT_EQ(ref_string, ref_lc) << "Serialized data is different from pattern at position " << i;
}
}

View File

@ -51,10 +51,11 @@ public:
{
auto on_weight_loss_function = [&](size_t weight_loss) { onRemoveOverflowWeightLoss(weight_loss); };
static constexpr std::string_view default_cache_policy = "SLRU";
if (cache_policy_name.empty())
{
static constexpr auto default_cache_policy = "SLRU";
cache_policy_name = default_cache_policy;
}
if (cache_policy_name == "LRU")
{

View File

@ -313,8 +313,8 @@ bool DNSResolver::updateCacheImpl(
UpdateF && update_func,
ElemsT && elems,
UInt32 max_consecutive_failures,
const String & notfound_log_msg,
const String & dropped_log_msg)
FormatStringHelper<String> notfound_log_msg,
FormatStringHelper<String> dropped_log_msg)
{
bool updated = false;
String lost_elems;
@ -351,7 +351,7 @@ bool DNSResolver::updateCacheImpl(
}
if (!lost_elems.empty())
LOG_INFO(log, fmt::runtime(notfound_log_msg), lost_elems);
LOG_INFO(log, notfound_log_msg.format(std::move(lost_elems)));
if (elements_to_drop.size())
{
updated = true;
@ -363,7 +363,7 @@ bool DNSResolver::updateCacheImpl(
deleted_elements += cacheElemToString(it->first);
elems.erase(it);
}
LOG_INFO(log, fmt::runtime(dropped_log_msg), deleted_elements);
LOG_INFO(log, dropped_log_msg.format(std::move(deleted_elements)));
}
return updated;

View File

@ -5,6 +5,7 @@
#include <base/types.h>
#include <Core/Names.h>
#include <boost/noncopyable.hpp>
#include <Common/LoggingFormatStringHelpers.h>
namespace Poco { class Logger; }
@ -61,13 +62,12 @@ public:
private:
template <typename UpdateF, typename ElemsT>
bool updateCacheImpl(
UpdateF && update_func,
ElemsT && elems,
UInt32 max_consecutive_failures,
const String & notfound_log_msg,
const String & dropped_log_msg);
FormatStringHelper<String> notfound_log_msg,
FormatStringHelper<String> dropped_log_msg);
DNSResolver();

View File

@ -3,7 +3,7 @@
#include <cctz/civil_time.h>
#include <cctz/time_zone.h>
#include <cctz/zone_info_source.h>
#include <Poco/Exception.h>
#include <Common/Exception.h>
#include <algorithm>
#include <cassert>
@ -12,6 +12,14 @@
#include <memory>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
}
/// Embedded timezones.
std::string_view getTimeZone(const char * name);
@ -66,7 +74,7 @@ DateLUTImpl::DateLUTImpl(const std::string & time_zone_)
cctz::time_zone cctz_time_zone;
if (!cctz::load_time_zone(time_zone, &cctz_time_zone))
throw Poco::Exception("Cannot load time zone " + time_zone_);
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Cannot load time zone {}", time_zone_);
constexpr cctz::civil_day epoch{1970, 1, 1};
constexpr cctz::civil_day lut_start{DATE_LUT_MIN_YEAR, 1, 1};

View File

@ -2,6 +2,7 @@
#include "Epoll.h"
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <base/defines.h>
#include <unistd.h>
@ -57,21 +58,35 @@ void Epoll::remove(int fd)
throwFromErrno("Cannot remove descriptor from epoll", DB::ErrorCodes::EPOLL_ERROR);
}
size_t Epoll::getManyReady(int max_events, epoll_event * events_out, bool blocking) const
size_t Epoll::getManyReady(int max_events, epoll_event * events_out, int timeout) const
{
if (events_count == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There are no events in epoll");
Stopwatch watch;
int ready_size;
int timeout = blocking ? -1 : 0;
do
while (true)
{
ready_size = epoll_wait(epoll_fd, events_out, max_events, timeout);
if (ready_size == -1 && errno != EINTR)
throwFromErrno("Error in epoll_wait", DB::ErrorCodes::EPOLL_ERROR);
/// If `ready_size` = 0, it's timeout.
if (ready_size < 0)
{
if (errno == EINTR)
{
if (timeout >= 0)
{
timeout = std::max(0, static_cast<int>(timeout - watch.elapsedMilliseconds()));
watch.restart();
}
continue;
}
else
throwFromErrno("Error in epoll_wait", DB::ErrorCodes::EPOLL_ERROR);
}
else
break;
}
while (ready_size <= 0 && (ready_size != 0 || blocking));
return ready_size;
}

View File

@ -30,10 +30,11 @@ public:
/// Remove file descriptor to epoll.
void remove(int fd);
/// Get events from epoll. Events are written in events_out, this function returns an amount of ready events.
/// If blocking is false and there are no ready events,
/// return empty vector, otherwise wait for ready events.
size_t getManyReady(int max_events, epoll_event * events_out, bool blocking) const;
/// Get events from epoll. Events are written in events_out, this function returns an amount of
/// ready events. The timeout argument specifies the number of milliseconds to wait for ready
/// events. Timeout of -1 causes epoll_wait() to block indefinitely, while specifying a timeout
/// equal to zero will return immediately, even if no events are available.
size_t getManyReady(int max_events, epoll_event * events_out, int timeout) const;
int getFileDescriptor() const { return epoll_fd; }

View File

@ -151,8 +151,11 @@ void MemoryTracker::logPeakMemoryUsage()
{
log_peak_memory_usage_in_destructor = false;
const auto * description = description_ptr.load(std::memory_order_relaxed);
auto peak_bytes = peak.load(std::memory_order::relaxed);
if (peak_bytes < 128 * 1024)
return;
LOG_DEBUG(&Poco::Logger::get("MemoryTracker"),
"Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(peak));
"Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(peak_bytes));
}
void MemoryTracker::logMemoryUsage(Int64 current) const

View File

@ -117,6 +117,11 @@ public:
DB::appendHintsMessage(error_message, hints);
}
String getHintsMessage(const String & name) const
{
return getHintsErrorMessageSuffix(getHints(name));
}
IHints() = default;
IHints(const IHints &) = default;

View File

@ -0,0 +1,27 @@
#include <Common/ShellCommandSettings.h>
#include <magic_enum.hpp>
#include <Poco/String.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
ExternalCommandStderrReaction parseExternalCommandStderrReaction(const std::string & config)
{
auto reaction = magic_enum::enum_cast<ExternalCommandStderrReaction>(Poco::toUpper(config));
if (!reaction)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unknown stderr_reaction: {}. Possible values are 'none', 'log', 'log_first', 'log_last' and 'throw'",
config);
return *reaction;
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <string>
namespace DB
{
enum class ExternalCommandStderrReaction
{
NONE, /// Do nothing.
LOG, /// Try to log all outputs of stderr from the external command immediately.
LOG_FIRST, /// Try to log first 1_KiB outputs of stderr from the external command after exit.
LOG_LAST, /// Same as above, but log last 1_KiB outputs.
THROW /// Immediately throw exception when the external command outputs something to its stderr.
};
ExternalCommandStderrReaction parseExternalCommandStderrReaction(const std::string & config);
}

View File

@ -13,6 +13,8 @@
* (~ 700 MB/sec, 15 million strings per second)
*/
#include "TransformEndianness.hpp"
#include <bit>
#include <string>
#include <type_traits>
@ -22,14 +24,12 @@
#include <base/unaligned.h>
#include <Common/Exception.h>
#include <city.h>
namespace DB
{
namespace ErrorCodes
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
#define SIPROUND \
do \
@ -161,71 +161,50 @@ public:
}
}
template <typename T>
template <typename Transform = void, typename T>
ALWAYS_INLINE void update(const T & x)
{
if constexpr (std::endian::native == std::endian::big)
{
T rev_x = x;
char *start = reinterpret_cast<char *>(&rev_x);
char *end = start + sizeof(T);
std::reverse(start, end);
update(reinterpret_cast<const char *>(&rev_x), sizeof(rev_x)); /// NOLINT
auto transformed_x = x;
if constexpr (!std::is_same_v<Transform, void>)
transformed_x = Transform()(x);
else
DB::transformEndianness<std::endian::little>(transformed_x);
update(reinterpret_cast<const char *>(&transformed_x), sizeof(transformed_x)); /// NOLINT
}
else
update(reinterpret_cast<const char *>(&x), sizeof(x)); /// NOLINT
}
ALWAYS_INLINE void update(const std::string & x)
{
update(x.data(), x.length());
}
ALWAYS_INLINE void update(const std::string & x) { update(x.data(), x.length()); }
ALWAYS_INLINE void update(const std::string_view x) { update(x.data(), x.size()); }
ALWAYS_INLINE void update(const char * s) { update(std::string_view(s)); }
ALWAYS_INLINE void update(const std::string_view x)
{
update(x.data(), x.size());
}
/// Get the result in some form. This can only be done once!
ALWAYS_INLINE void get128(char * out)
{
finalize();
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
unalignedStore<UInt64>(out + 8, v0 ^ v1);
unalignedStore<UInt64>(out, v2 ^ v3);
#else
unalignedStore<UInt64>(out, v0 ^ v1);
unalignedStore<UInt64>(out + 8, v2 ^ v3);
#endif
}
template <typename T>
ALWAYS_INLINE void get128(T & lo, T & hi)
{
static_assert(sizeof(T) == 8);
finalize();
lo = v0 ^ v1;
hi = v2 ^ v3;
}
template <typename T>
ALWAYS_INLINE void get128(T & dst)
{
static_assert(sizeof(T) == 16);
get128(reinterpret_cast<char *>(&dst));
}
UInt64 get64()
ALWAYS_INLINE UInt64 get64()
{
finalize();
return v0 ^ v1 ^ v2 ^ v3;
}
UInt128 get128()
template <typename T>
requires (sizeof(T) == 8)
ALWAYS_INLINE void get128(T & lo, T & hi)
{
finalize();
lo = v0 ^ v1;
hi = v2 ^ v3;
}
ALWAYS_INLINE UInt128 get128()
{
UInt128 res;
get128(res);
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
get128(res.items[1], res.items[0]);
#else
get128(res.items[0], res.items[1]);
#endif
return res;
}
@ -247,9 +226,7 @@ public:
{
lo = std::byteswap(lo);
hi = std::byteswap(hi);
auto tmp = hi;
hi = lo;
lo = tmp;
std::swap(lo, hi);
}
UInt128 res = hi;
@ -265,11 +242,18 @@ public:
#include <cstddef>
inline void sipHash128(const char * data, const size_t size, char * out)
inline std::array<char, 16> getSipHash128AsArray(SipHash & sip_hash)
{
SipHash hash;
hash.update(data, size);
hash.get128(out);
std::array<char, 16> arr;
*reinterpret_cast<UInt128*>(arr.data()) = sip_hash.get128();
return arr;
}
inline CityHash_v1_0_2::uint128 getSipHash128AsPair(SipHash & sip_hash)
{
CityHash_v1_0_2::uint128 result;
sip_hash.get128(result.low64, result.high64);
return result;
}
inline UInt128 sipHash128Keyed(UInt64 key0, UInt64 key1, const char * data, const size_t size)
@ -309,7 +293,7 @@ inline UInt64 sipHash64(const char * data, const size_t size)
}
template <typename T>
UInt64 sipHash64(const T & x)
inline UInt64 sipHash64(const T & x)
{
SipHash hash;
hash.update(x);

View File

@ -2,6 +2,7 @@
#include <base/Decimal_fwd.h>
#include <base/extended_types.h>
#include <base/strong_typedef.h>
#include <city.h>
@ -48,7 +49,7 @@ inline void transformEndianness(T & value)
}
template <std::endian ToEndian, std::endian FromEndian = std::endian::native, typename T>
requires std::is_scoped_enum_v<T>
requires std::is_enum_v<T> || std::is_scoped_enum_v<T>
inline void transformEndianness(T & x)
{
using UnderlyingType = std::underlying_type_t<T>;

View File

@ -21,29 +21,33 @@ namespace ProfileEvents
namespace Coordination
{
Exception::Exception(const std::string & msg, const Error code_, int)
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code_)
void Exception::incrementErrorMetrics(const Error code_)
{
if (Coordination::isUserError(code))
if (Coordination::isUserError(code_))
ProfileEvents::increment(ProfileEvents::ZooKeeperUserExceptions);
else if (Coordination::isHardwareError(code))
else if (Coordination::isHardwareError(code_))
ProfileEvents::increment(ProfileEvents::ZooKeeperHardwareExceptions);
else
ProfileEvents::increment(ProfileEvents::ZooKeeperOtherExceptions);
}
Exception::Exception(const std::string & msg, const Error code_)
: Exception(msg + " (" + errorMessage(code_) + ")", code_, 0)
Exception::Exception(const std::string & msg, const Error code_, int)
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION)
, code(code_)
{
incrementErrorMetrics(code);
}
Exception::Exception(PreformattedMessage && msg, const Error code_)
: DB::Exception(std::move(msg), DB::ErrorCodes::KEEPER_EXCEPTION)
, code(code_)
{
extendedMessage(errorMessage(code));
incrementErrorMetrics(code);
}
Exception::Exception(const Error code_)
: Exception(errorMessage(code_), code_, 0)
{
}
Exception::Exception(const Error code_, const std::string & path)
: Exception(std::string{errorMessage(code_)} + ", path: " + path, code_, 0)
: Exception(code_, "Coordination error: {}", errorMessage(code_))
{
}
@ -56,10 +60,10 @@ using namespace DB;
static void addRootPath(String & path, const String & root_path)
{
if (path.empty())
throw Exception("Path cannot be empty", Error::ZBADARGUMENTS);
throw Exception::fromMessage(Error::ZBADARGUMENTS, "Path cannot be empty");
if (path[0] != '/')
throw Exception("Path must begin with /, got path '" + path + "'", Error::ZBADARGUMENTS);
throw Exception(Error::ZBADARGUMENTS, "Path must begin with /, got path '{}'", path);
if (root_path.empty())
return;
@ -76,7 +80,7 @@ static void removeRootPath(String & path, const String & root_path)
return;
if (path.size() <= root_path.size())
throw Exception("Received path is not longer than root_path", Error::ZDATAINCONSISTENCY);
throw Exception::fromMessage(Error::ZDATAINCONSISTENCY, "Received path is not longer than root_path");
path = path.substr(root_path.size());
}

View File

@ -17,6 +17,13 @@
* - ZooKeeper emulation layer on top of Etcd, FoundationDB, whatever.
*/
namespace DB
{
namespace ErrorCodes
{
extern const int KEEPER_EXCEPTION;
}
}
namespace Coordination
{
@ -158,6 +165,10 @@ struct WatchResponse : virtual Response
};
using WatchCallback = std::function<void(const WatchResponse &)>;
/// Passing watch callback as a shared_ptr allows to
/// - avoid copying of the callback
/// - registering the same callback only once per path
using WatchCallbackPtr = std::shared_ptr<WatchCallback>;
struct SetACLRequest : virtual Request
{
@ -450,17 +461,46 @@ class Exception : public DB::Exception
private:
/// Delegate constructor, used to minimize repetition; last parameter used for overload resolution.
Exception(const std::string & msg, const Error code_, int); /// NOLINT
Exception(PreformattedMessage && msg, const Error code_);
/// Message must be a compile-time constant
template <typename T>
requires std::is_convertible_v<T, String>
Exception(T && message, const Error code_) : DB::Exception(DB::ErrorCodes::KEEPER_EXCEPTION, std::forward<T>(message)), code(code_)
{
incrementErrorMetrics(code);
}
static void incrementErrorMetrics(const Error code_);
public:
explicit Exception(const Error code_); /// NOLINT
Exception(const std::string & msg, const Error code_); /// NOLINT
Exception(const Error code_, const std::string & path); /// NOLINT
Exception(const Exception & exc);
template <typename... Args>
Exception(const Error code_, fmt::format_string<Args...> fmt, Args &&... args)
: Exception(fmt::format(fmt, std::forward<Args>(args)...), code_)
Exception(const Error code_, FormatStringHelper<Args...> fmt, Args &&... args)
: DB::Exception(DB::ErrorCodes::KEEPER_EXCEPTION, std::move(fmt), std::forward<Args>(args)...)
, code(code_)
{
incrementErrorMetrics(code);
}
inline static Exception createDeprecated(const std::string & msg, const Error code_)
{
return Exception(msg, code_, 0);
}
inline static Exception fromPath(const Error code_, const std::string & path)
{
return Exception(code_, "Coordination error: {}, path {}", errorMessage(code_), path);
}
/// Message must be a compile-time constant
template <typename T>
requires std::is_convertible_v<T, String>
inline static Exception fromMessage(const Error code_, T && message)
{
return Exception(std::forward<T>(message), code_);
}
const char * name() const noexcept override { return "Coordination::Exception"; }
@ -521,12 +561,12 @@ public:
virtual void exists(
const String & path,
ExistsCallback callback,
WatchCallback watch) = 0;
WatchCallbackPtr watch) = 0;
virtual void get(
const String & path,
GetCallback callback,
WatchCallback watch) = 0;
WatchCallbackPtr watch) = 0;
virtual void set(
const String & path,
@ -538,7 +578,7 @@ public:
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) = 0;
WatchCallbackPtr watch) = 0;
virtual void check(
const String & path,

View File

@ -24,6 +24,9 @@ public:
static void check(Coordination::Error code, const Coordination::Requests & requests, const Coordination::Responses & responses);
KeeperMultiException(Coordination::Error code, const Coordination::Requests & requests, const Coordination::Responses & responses);
private:
KeeperMultiException(Coordination::Error code, size_t failed_op_index_, const Coordination::Requests & requests_, const Coordination::Responses & responses_);
};
size_t getFailedOpIndex(Coordination::Error code, const Coordination::Responses & responses);

View File

@ -42,9 +42,9 @@ static void processWatchesImpl(const String & path, TestKeeper::Watches & watche
auto it = watches.find(watch_response.path);
if (it != watches.end())
{
for (auto & callback : it->second)
for (const auto & callback : it->second)
if (callback)
callback(watch_response);
(*callback)(watch_response);
watches.erase(it);
}
@ -55,9 +55,9 @@ static void processWatchesImpl(const String & path, TestKeeper::Watches & watche
it = list_watches.find(watch_list_response.path);
if (it != list_watches.end())
{
for (auto & callback : it->second)
for (const auto & callback : it->second)
if (callback)
callback(watch_list_response);
(*callback)(watch_list_response);
list_watches.erase(it);
}
@ -177,7 +177,7 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
requests.push_back(std::make_shared<TestKeeperCheckRequest>(*concrete_request_check));
}
else
throw Exception("Illegal command as part of multi ZooKeeper request", Error::ZBADARGUMENTS);
throw Exception::fromMessage(Error::ZBADARGUMENTS, "Illegal command as part of multi ZooKeeper request");
}
}
@ -389,7 +389,7 @@ std::pair<ResponsePtr, Undo> TestKeeperListRequest::process(TestKeeper::Containe
{
auto path_prefix = path;
if (path_prefix.empty())
throw Exception("Logical error: path cannot be empty", Error::ZSESSIONEXPIRED);
throw Exception::fromMessage(Error::ZSESSIONEXPIRED, "Logical error: path cannot be empty");
if (path_prefix.back() != '/')
path_prefix += '/';
@ -587,11 +587,11 @@ void TestKeeper::processingThread()
? list_watches
: watches;
watches_type[info.request->getPath()].emplace_back(std::move(info.watch));
watches_type[info.request->getPath()].insert(info.watch);
}
else if (response->error == Error::ZNONODE && dynamic_cast<const ExistsRequest *>(info.request.get()))
{
watches[info.request->getPath()].emplace_back(std::move(info.watch));
watches[info.request->getPath()].insert(info.watch);
}
}
@ -634,13 +634,13 @@ void TestKeeper::finalize(const String &)
response.state = EXPIRED_SESSION;
response.error = Error::ZSESSIONEXPIRED;
for (auto & callback : path_watch.second)
for (const auto & callback : path_watch.second)
{
if (callback)
{
try
{
callback(response);
(*callback)(response);
}
catch (...)
{
@ -677,7 +677,7 @@ void TestKeeper::finalize(const String &)
response.error = Error::ZSESSIONEXPIRED;
try
{
info.watch(response);
(*info.watch)(response);
}
catch (...)
{
@ -705,10 +705,10 @@ void TestKeeper::pushRequest(RequestInfo && request)
std::lock_guard lock(push_request_mutex);
if (expired)
throw Exception("Session expired", Error::ZSESSIONEXPIRED);
throw Exception::fromMessage(Error::ZSESSIONEXPIRED, "Session expired");
if (!requests_queue.tryPush(std::move(request), args.operation_timeout_ms))
throw Exception("Cannot push request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
throw Exception::fromMessage(Error::ZOPERATIONTIMEOUT, "Cannot push request to queue within operation timeout");
}
catch (...)
{
@ -756,7 +756,7 @@ void TestKeeper::remove(
void TestKeeper::exists(
const String & path,
ExistsCallback callback,
WatchCallback watch)
WatchCallbackPtr watch)
{
TestKeeperExistsRequest request;
request.path = path;
@ -771,7 +771,7 @@ void TestKeeper::exists(
void TestKeeper::get(
const String & path,
GetCallback callback,
WatchCallback watch)
WatchCallbackPtr watch)
{
TestKeeperGetRequest request;
request.path = path;
@ -804,7 +804,7 @@ void TestKeeper::list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch)
WatchCallbackPtr watch)
{
TestKeeperFilteredListRequest request;
request.path = path;

View File

@ -59,12 +59,12 @@ public:
void exists(
const String & path,
ExistsCallback callback,
WatchCallback watch) override;
WatchCallbackPtr watch) override;
void get(
const String & path,
GetCallback callback,
WatchCallback watch) override;
WatchCallbackPtr watch) override;
void set(
const String & path,
@ -76,7 +76,7 @@ public:
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) override;
WatchCallbackPtr watch) override;
void check(
const String & path,
@ -117,7 +117,7 @@ public:
using Container = std::map<std::string, Node>;
using WatchCallbacks = std::vector<WatchCallback>;
using WatchCallbacks = std::unordered_set<WatchCallbackPtr>;
using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
private:
@ -127,7 +127,7 @@ private:
{
TestKeeperRequestPtr request;
ResponseCallback callback;
WatchCallback watch;
WatchCallbackPtr watch;
clock::time_point time;
};

View File

@ -51,7 +51,7 @@ const int CreateMode::EphemeralSequential = 3;
static void check(Coordination::Error code, const std::string & path)
{
if (code != Coordination::Error::ZOK)
throw KeeperException(code, path);
throw KeeperException::fromPath(code, path);
}
@ -64,7 +64,7 @@ void ZooKeeper::init(ZooKeeperArgs args_)
if (args.implementation == "zookeeper")
{
if (args.hosts.empty())
throw KeeperException("No hosts passed to ZooKeeper constructor.", Coordination::Error::ZBADARGUMENTS);
throw KeeperException::fromMessage(Coordination::Error::ZBADARGUMENTS, "No hosts passed to ZooKeeper constructor.");
Coordination::ZooKeeper::Nodes nodes;
nodes.reserve(args.hosts.size());
@ -107,9 +107,9 @@ void ZooKeeper::init(ZooKeeperArgs args_)
{
/// For DNS errors we throw exception with ZCONNECTIONLOSS code, so it will be considered as hardware error, not user error
if (dns_error)
throw KeeperException("Cannot resolve any of provided ZooKeeper hosts due to DNS error", Coordination::Error::ZCONNECTIONLOSS);
throw KeeperException::fromMessage(Coordination::Error::ZCONNECTIONLOSS, "Cannot resolve any of provided ZooKeeper hosts due to DNS error");
else
throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZCONNECTIONLOSS);
throw KeeperException::fromMessage(Coordination::Error::ZCONNECTIONLOSS, "Cannot use any of provided ZooKeeper nodes");
}
impl = std::make_unique<Coordination::ZooKeeper>(nodes, args, zk_log, [this](size_t node_idx, const Coordination::ZooKeeper::Node & node)
@ -145,11 +145,11 @@ void ZooKeeper::init(ZooKeeperArgs args_)
auto future = asyncExists("/");
auto res = future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms));
if (res != std::future_status::ready)
throw KeeperException("Cannot check if zookeeper root exists.", Coordination::Error::ZOPERATIONTIMEOUT);
throw KeeperException::fromMessage(Coordination::Error::ZOPERATIONTIMEOUT, "Cannot check if zookeeper root exists.");
auto code = future.get().error;
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, "/");
throw KeeperException::fromPath(code, "/");
if (code == Coordination::Error::ZNONODE)
throw KeeperException(Coordination::Error::ZNONODE, "ZooKeeper root doesn't exist. You should create root node {} before start.", args.chroot);
@ -212,7 +212,7 @@ static Coordination::WatchCallback callbackForEvent(const EventPtr & watch)
Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::WatchCallbackPtr watch_callback,
Coordination::ListRequestType list_request_type)
{
auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback, list_request_type);
@ -250,6 +250,13 @@ Strings ZooKeeper::getChildrenWatch(const std::string & path, Coordination::Stat
return res;
}
Strings ZooKeeper::getChildrenWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallbackPtr watch_callback, Coordination::ListRequestType list_request_type)
{
Strings res;
check(tryGetChildrenWatch(path, res, stat, watch_callback, list_request_type), path);
return res;
}
Coordination::Error ZooKeeper::tryGetChildren(
const std::string & path,
Strings & res,
@ -257,12 +264,9 @@ Coordination::Error ZooKeeper::tryGetChildren(
const EventPtr & watch,
Coordination::ListRequestType list_request_type)
{
Coordination::Error code = getChildrenImpl(path, res, stat, callbackForEvent(watch), list_request_type);
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path);
return code;
return tryGetChildrenWatch(path, res, stat,
watch ? std::make_shared<Coordination::WatchCallback>(callbackForEvent(watch)) : Coordination::WatchCallbackPtr{},
list_request_type);
}
Coordination::Error ZooKeeper::tryGetChildrenWatch(
@ -271,11 +275,23 @@ Coordination::Error ZooKeeper::tryGetChildrenWatch(
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type)
{
return tryGetChildrenWatch(path, res, stat,
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{},
list_request_type);
}
Coordination::Error ZooKeeper::tryGetChildrenWatch(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallbackPtr watch_callback,
Coordination::ListRequestType list_request_type)
{
Coordination::Error code = getChildrenImpl(path, res, stat, watch_callback, list_request_type);
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path);
throw KeeperException::fromPath(code, path);
return code;
}
@ -314,7 +330,7 @@ Coordination::Error ZooKeeper::tryCreate(const std::string & path, const std::st
code == Coordination::Error::ZNONODE ||
code == Coordination::Error::ZNODEEXISTS ||
code == Coordination::Error::ZNOCHILDRENFOREPHEMERALS))
throw KeeperException(code, path);
throw KeeperException::fromPath(code, path);
return code;
}
@ -333,7 +349,7 @@ void ZooKeeper::createIfNotExists(const std::string & path, const std::string &
if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
return;
else
throw KeeperException(code, path);
throw KeeperException::fromPath(code, path);
}
void ZooKeeper::createAncestors(const std::string & path)
@ -355,14 +371,14 @@ void ZooKeeper::createAncestors(const std::string & path)
/// The parent node doesn't exist. Save the current node and try with the parent
last_pos = current_node.rfind('/');
if (last_pos == std::string::npos || last_pos == 0)
throw KeeperException(code, path);
throw KeeperException::fromPath(code, path);
pending_nodes.emplace_back(std::move(current_node));
current_node = path.substr(0, last_pos);
}
else if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
break;
else
throw KeeperException(code, path);
throw KeeperException::fromPath(code, path);
}
for (const std::string & pending : pending_nodes | std::views::reverse)
@ -423,7 +439,7 @@ Coordination::Error ZooKeeper::tryRemove(const std::string & path, int32_t versi
code == Coordination::Error::ZNONODE ||
code == Coordination::Error::ZBADVERSION ||
code == Coordination::Error::ZNOTEMPTY))
throw KeeperException(code, path);
throw KeeperException::fromPath(code, path);
return code;
}
@ -457,7 +473,7 @@ bool ZooKeeper::existsWatch(const std::string & path, Coordination::Stat * stat,
Coordination::Error code = existsImpl(path, stat, watch_callback);
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path);
throw KeeperException::fromPath(code, path);
return code != Coordination::Error::ZNONODE;
}
@ -524,7 +540,7 @@ bool ZooKeeper::tryGetWatch(
Coordination::Error code = getImpl(path, res, stat, watch_callback);
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path);
throw KeeperException::fromPath(code, path);
if (return_code)
*return_code = code;
@ -566,7 +582,7 @@ void ZooKeeper::createOrUpdate(const std::string & path, const std::string & dat
create(path, data, mode);
}
else if (code != Coordination::Error::ZOK)
throw KeeperException(code, path);
throw KeeperException::fromPath(code, path);
}
Coordination::Error ZooKeeper::trySet(const std::string & path, const std::string & data,
@ -577,7 +593,7 @@ Coordination::Error ZooKeeper::trySet(const std::string & path, const std::strin
if (!(code == Coordination::Error::ZOK ||
code == Coordination::Error::ZNONODE ||
code == Coordination::Error::ZBADVERSION))
throw KeeperException(code, path);
throw KeeperException::fromPath(code, path);
return code;
}
@ -756,7 +772,7 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab
continue;
}
throw KeeperException(res.error, batch[i]);
throw KeeperException::fromPath(res.error, batch[i]);
}
}
return removed_as_expected;
@ -814,7 +830,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
do
{
/// Use getData insteand of exists to avoid watch leak.
impl->get(path, callback, watch);
impl->get(path, callback, std::make_shared<Coordination::WatchCallback>(watch));
if (!state->event.tryWait(1000))
continue;
@ -823,7 +839,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
return true;
if (state->code)
throw KeeperException(static_cast<Coordination::Error>(state->code.load(std::memory_order_seq_cst)), path);
throw KeeperException::fromPath(static_cast<Coordination::Error>(state->code.load(std::memory_order_seq_cst)), path);
if (state->event_type == Coordination::DELETED)
return true;
@ -844,7 +860,7 @@ void ZooKeeper::handleEphemeralNodeExistence(const std::string & path, const std
{
auto code = tryRemove(path, stat.version);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
throw Coordination::Exception(code, path);
throw Coordination::Exception::fromPath(code, path);
}
else
{
@ -893,7 +909,7 @@ std::future<Coordination::CreateResponse> ZooKeeper::asyncCreate(const std::stri
auto callback = [promise, path](const Coordination::CreateResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
promise->set_exception(std::make_exception_ptr(KeeperException::fromPath(response.error, path)));
else
promise->set_value(response);
};
@ -924,12 +940,13 @@ std::future<Coordination::GetResponse> ZooKeeper::asyncGet(const std::string & p
auto callback = [promise, path](const Coordination::GetResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
promise->set_exception(std::make_exception_ptr(KeeperException::fromPath(response.error, path)));
else
promise->set_value(response);
};
impl->get(path, std::move(callback), watch_callback);
impl->get(path, std::move(callback),
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
return future;
}
@ -943,7 +960,8 @@ std::future<Coordination::GetResponse> ZooKeeper::asyncTryGetNoThrow(const std::
promise->set_value(response);
};
impl->get(path, std::move(callback), watch_callback);
impl->get(path, std::move(callback),
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
return future;
}
@ -956,7 +974,7 @@ std::future<Coordination::GetResponse> ZooKeeper::asyncTryGet(const std::string
auto callback = [promise, path](const Coordination::GetResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK && response.error != Coordination::Error::ZNONODE)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
promise->set_exception(std::make_exception_ptr(KeeperException::fromPath(response.error, path)));
else
promise->set_value(response);
};
@ -973,12 +991,13 @@ std::future<Coordination::ExistsResponse> ZooKeeper::asyncExists(const std::stri
auto callback = [promise, path](const Coordination::ExistsResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK && response.error != Coordination::Error::ZNONODE)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
promise->set_exception(std::make_exception_ptr(KeeperException::fromPath(response.error, path)));
else
promise->set_value(response);
};
impl->exists(path, std::move(callback), watch_callback);
impl->exists(path, std::move(callback),
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
return future;
}
@ -992,7 +1011,8 @@ std::future<Coordination::ExistsResponse> ZooKeeper::asyncTryExistsNoThrow(const
promise->set_value(response);
};
impl->exists(path, std::move(callback), watch_callback);
impl->exists(path, std::move(callback),
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
return future;
}
@ -1004,7 +1024,7 @@ std::future<Coordination::SetResponse> ZooKeeper::asyncSet(const std::string & p
auto callback = [promise, path](const Coordination::SetResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
promise->set_exception(std::make_exception_ptr(KeeperException::fromPath(response.error, path)));
else
promise->set_value(response);
};
@ -1037,17 +1057,18 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(
auto callback = [promise, path](const Coordination::ListResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
promise->set_exception(std::make_exception_ptr(KeeperException::fromPath(response.error, path)));
else
promise->set_value(response);
};
impl->list(path, list_request_type, std::move(callback), watch_callback);
impl->list(path, list_request_type, std::move(callback),
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
return future;
}
std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(
const std::string & path, Coordination::WatchCallback watch_callback, Coordination::ListRequestType list_request_type)
const std::string & path, Coordination::WatchCallbackPtr watch_callback, Coordination::ListRequestType list_request_type)
{
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future();
@ -1070,7 +1091,7 @@ ZooKeeper::asyncTryGetChildren(const std::string & path, Coordination::ListReque
auto callback = [promise, path](const Coordination::ListResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK && response.error != Coordination::Error::ZNONODE)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
promise->set_exception(std::make_exception_ptr(KeeperException::fromPath(response.error, path)));
else
promise->set_value(response);
};
@ -1087,7 +1108,7 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncRemove(const std::stri
auto callback = [promise, path](const Coordination::RemoveResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
promise->set_exception(std::make_exception_ptr(KeeperException::fromPath(response.error, path)));
else
promise->set_value(response);
};
@ -1108,7 +1129,7 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncTryRemove(const std::s
&& response.error != Coordination::Error::ZBADVERSION
&& response.error != Coordination::Error::ZNOTEMPTY)
{
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
promise->set_exception(std::make_exception_ptr(KeeperException::fromPath(response.error, path)));
}
else
promise->set_value(response);
@ -1243,11 +1264,16 @@ size_t getFailedOpIndex(Coordination::Error exception_code, const Coordination::
}
KeeperMultiException::KeeperMultiException(Coordination::Error exception_code, const Coordination::Requests & requests_, const Coordination::Responses & responses_)
: KeeperException("Transaction failed", exception_code),
requests(requests_), responses(responses_), failed_op_index(getFailedOpIndex(exception_code, responses))
KeeperMultiException::KeeperMultiException(Coordination::Error exception_code, size_t failed_op_index_, const Coordination::Requests & requests_, const Coordination::Responses & responses_)
: KeeperException(exception_code, "Transaction failed: Op #{}, path", failed_op_index_),
requests(requests_), responses(responses_), failed_op_index(failed_op_index_)
{
addMessage(getPathForFirstFailedOp());
}
KeeperMultiException::KeeperMultiException(Coordination::Error exception_code, const Coordination::Requests & requests_, const Coordination::Responses & responses_)
: KeeperMultiException(exception_code, getFailedOpIndex(exception_code, responses_), requests_, responses_)
{
addMessage("Op #" + std::to_string(failed_op_index) + ", path: " + getPathForFirstFailedOp());
}

View File

@ -333,6 +333,11 @@ public:
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Strings getChildrenWatch(const std::string & path,
Coordination::Stat * stat,
Coordination::WatchCallbackPtr watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using MultiGetChildrenResponse = MultiReadResponses<Coordination::ListResponse, false>;
using MultiTryGetChildrenResponse = MultiReadResponses<Coordination::ListResponse, true>;
@ -369,6 +374,13 @@ public:
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Coordination::Error tryGetChildrenWatch(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallbackPtr watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
template <typename TIter>
MultiTryGetChildrenResponse
tryGetChildren(TIter start, TIter end, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
@ -474,7 +486,7 @@ public:
/// Like the previous one but don't throw any exceptions on future.get()
FutureGetChildren asyncTryGetChildrenNoThrow(
const std::string & path,
Coordination::WatchCallback watch_callback = {},
Coordination::WatchCallbackPtr watch_callback = {},
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using FutureSet = std::future<Coordination::SetResponse>;
@ -545,7 +557,7 @@ private:
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::WatchCallbackPtr watch_callback,
Coordination::ListRequestType list_request_type);
Coordination::Error multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses);
Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);

View File

@ -36,7 +36,7 @@ ZooKeeperArgs::ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, c
}
if (session_timeout_ms < 0 || operation_timeout_ms < 0 || connection_timeout_ms < 0)
throw KeeperException("Timeout cannot be negative", Coordination::Error::ZBADARGUMENTS);
throw KeeperException::fromMessage(Coordination::Error::ZBADARGUMENTS, "Timeout cannot be negative");
/// init get_priority_load_balancing
get_priority_load_balancing.hostname_differences.resize(hosts.size());
@ -63,7 +63,7 @@ void ZooKeeperArgs::initFromKeeperServerSection(const Poco::Util::AbstractConfig
auto tcp_port_secure = config.getString(key);
if (tcp_port_secure.empty())
throw KeeperException("Empty tcp_port_secure in config file", Coordination::Error::ZBADARGUMENTS);
throw KeeperException::fromMessage(Coordination::Error::ZBADARGUMENTS, "Empty tcp_port_secure in config file");
}
bool secure{false};
@ -81,7 +81,7 @@ void ZooKeeperArgs::initFromKeeperServerSection(const Poco::Util::AbstractConfig
}
if (tcp_port.empty())
throw KeeperException("No tcp_port or tcp_port_secure in config file", Coordination::Error::ZBADARGUMENTS);
throw KeeperException::fromMessage(Coordination::Error::ZBADARGUMENTS, "No tcp_port or tcp_port_secure in config file");
if (auto coordination_key = std::string{config_name} + ".coordination_settings";
config.has(coordination_key))

View File

@ -461,8 +461,7 @@ void ZooKeeperErrorResponse::readImpl(ReadBuffer & in)
Coordination::read(read_error, in);
if (read_error != error)
throw Exception(fmt::format("Error code in ErrorResponse ({}) doesn't match error code in header ({})", read_error, error),
Error::ZMARSHALLINGERROR);
throw Exception(Error::ZMARSHALLINGERROR, "Error code in ErrorResponse ({}) doesn't match error code in header ({})", read_error, error);
}
void ZooKeeperErrorResponse::writeImpl(WriteBuffer & out) const
@ -534,7 +533,7 @@ ZooKeeperMultiRequest::ZooKeeperMultiRequest(const Requests & generic_requests,
requests.push_back(std::make_shared<ZooKeeperFilteredListRequest>(*concrete_request_list));
}
else
throw Exception("Illegal command as part of multi ZooKeeper request", Error::ZBADARGUMENTS);
throw Exception::fromMessage(Error::ZBADARGUMENTS, "Illegal command as part of multi ZooKeeper request");
}
}
@ -577,9 +576,9 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
if (done)
{
if (op_num != OpNum::Error)
throw Exception("Unexpected op_num received at the end of results for multi transaction", Error::ZMARSHALLINGERROR);
throw Exception::fromMessage(Error::ZMARSHALLINGERROR, "Unexpected op_num received at the end of results for multi transaction");
if (error != -1)
throw Exception("Unexpected error value received at the end of results for multi transaction", Error::ZMARSHALLINGERROR);
throw Exception::fromMessage(Error::ZMARSHALLINGERROR, "Unexpected error value received at the end of results for multi transaction");
break;
}
@ -588,7 +587,7 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
requests.push_back(request);
if (in.eof())
throw Exception("Not enough results received for multi transaction", Error::ZMARSHALLINGERROR);
throw Exception::fromMessage(Error::ZMARSHALLINGERROR, "Not enough results received for multi transaction");
}
}
@ -621,7 +620,7 @@ void ZooKeeperMultiResponse::readImpl(ReadBuffer & in)
Coordination::read(op_error, in);
if (done)
throw Exception("Not enough results received for multi transaction", Error::ZMARSHALLINGERROR);
throw Exception::fromMessage(Error::ZMARSHALLINGERROR, "Not enough results received for multi transaction");
/// op_num == -1 is special for multi transaction.
/// For unknown reason, error code is duplicated in header and in response body.
@ -657,11 +656,11 @@ void ZooKeeperMultiResponse::readImpl(ReadBuffer & in)
Coordination::read(error_read, in);
if (!done)
throw Exception("Too many results received for multi transaction", Error::ZMARSHALLINGERROR);
throw Exception::fromMessage(Error::ZMARSHALLINGERROR, "Too many results received for multi transaction");
if (op_num != OpNum::Error)
throw Exception("Unexpected op_num received at the end of results for multi transaction", Error::ZMARSHALLINGERROR);
throw Exception::fromMessage(Error::ZMARSHALLINGERROR, "Unexpected op_num received at the end of results for multi transaction");
if (error_read != -1)
throw Exception("Unexpected error value received at the end of results for multi transaction", Error::ZMARSHALLINGERROR);
throw Exception::fromMessage(Error::ZMARSHALLINGERROR, "Unexpected error value received at the end of results for multi transaction");
}
}

View File

@ -163,7 +163,7 @@ struct ZooKeeperWatchResponse final : WatchResponse, ZooKeeperResponse
OpNum getOpNum() const override
{
chassert(false);
throw Exception("OpNum for watch response doesn't exist", Error::ZRUNTIMEINCONSISTENCY);
throw Exception::fromMessage(Error::ZRUNTIMEINCONSISTENCY, "OpNum for watch response doesn't exist");
}
void fillLogElements(LogElements & elems, size_t idx) const override;
@ -214,7 +214,7 @@ struct ZooKeeperCloseResponse final : ZooKeeperResponse
{
void readImpl(ReadBuffer &) override
{
throw Exception("Received response for close request", Error::ZRUNTIMEINCONSISTENCY);
throw Exception::fromMessage(Error::ZRUNTIMEINCONSISTENCY, "Received response for close request");
}
void writeImpl(WriteBuffer &) const override {}

Some files were not shown because too many files have changed in this diff Show More