Merge remote-tracking branch 'rschu1ze/master' into CVE-2023-36054

This commit is contained in:
Robert Schulze 2023-08-17 10:09:41 +00:00
commit ab0bdda3fc
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
83 changed files with 1101 additions and 485 deletions

2
contrib/curl vendored

@ -1 +1 @@
Subproject commit b0edf0b7dae44d9e66f270a257cf654b35d5263d
Subproject commit eb3b049df526bf125eda23218e680ce7fa9ec46c

View File

@ -8,125 +8,122 @@ endif()
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/curl")
set (SRCS
"${LIBRARY_DIR}/lib/fopen.c"
"${LIBRARY_DIR}/lib/noproxy.c"
"${LIBRARY_DIR}/lib/idn.c"
"${LIBRARY_DIR}/lib/cfilters.c"
"${LIBRARY_DIR}/lib/cf-socket.c"
"${LIBRARY_DIR}/lib/altsvc.c"
"${LIBRARY_DIR}/lib/amigaos.c"
"${LIBRARY_DIR}/lib/asyn-thread.c"
"${LIBRARY_DIR}/lib/base64.c"
"${LIBRARY_DIR}/lib/bufq.c"
"${LIBRARY_DIR}/lib/bufref.c"
"${LIBRARY_DIR}/lib/cf-h1-proxy.c"
"${LIBRARY_DIR}/lib/cf-haproxy.c"
"${LIBRARY_DIR}/lib/cf-https-connect.c"
"${LIBRARY_DIR}/lib/file.c"
"${LIBRARY_DIR}/lib/timeval.c"
"${LIBRARY_DIR}/lib/base64.c"
"${LIBRARY_DIR}/lib/hostip.c"
"${LIBRARY_DIR}/lib/progress.c"
"${LIBRARY_DIR}/lib/formdata.c"
"${LIBRARY_DIR}/lib/cookie.c"
"${LIBRARY_DIR}/lib/http.c"
"${LIBRARY_DIR}/lib/sendf.c"
"${LIBRARY_DIR}/lib/url.c"
"${LIBRARY_DIR}/lib/dict.c"
"${LIBRARY_DIR}/lib/if2ip.c"
"${LIBRARY_DIR}/lib/speedcheck.c"
"${LIBRARY_DIR}/lib/ldap.c"
"${LIBRARY_DIR}/lib/version.c"
"${LIBRARY_DIR}/lib/getenv.c"
"${LIBRARY_DIR}/lib/escape.c"
"${LIBRARY_DIR}/lib/mprintf.c"
"${LIBRARY_DIR}/lib/telnet.c"
"${LIBRARY_DIR}/lib/netrc.c"
"${LIBRARY_DIR}/lib/getinfo.c"
"${LIBRARY_DIR}/lib/transfer.c"
"${LIBRARY_DIR}/lib/strcase.c"
"${LIBRARY_DIR}/lib/easy.c"
"${LIBRARY_DIR}/lib/curl_fnmatch.c"
"${LIBRARY_DIR}/lib/curl_log.c"
"${LIBRARY_DIR}/lib/fileinfo.c"
"${LIBRARY_DIR}/lib/krb5.c"
"${LIBRARY_DIR}/lib/memdebug.c"
"${LIBRARY_DIR}/lib/http_chunks.c"
"${LIBRARY_DIR}/lib/strtok.c"
"${LIBRARY_DIR}/lib/cf-socket.c"
"${LIBRARY_DIR}/lib/cfilters.c"
"${LIBRARY_DIR}/lib/conncache.c"
"${LIBRARY_DIR}/lib/connect.c"
"${LIBRARY_DIR}/lib/llist.c"
"${LIBRARY_DIR}/lib/hash.c"
"${LIBRARY_DIR}/lib/multi.c"
"${LIBRARY_DIR}/lib/content_encoding.c"
"${LIBRARY_DIR}/lib/share.c"
"${LIBRARY_DIR}/lib/http_digest.c"
"${LIBRARY_DIR}/lib/md4.c"
"${LIBRARY_DIR}/lib/md5.c"
"${LIBRARY_DIR}/lib/http_negotiate.c"
"${LIBRARY_DIR}/lib/inet_pton.c"
"${LIBRARY_DIR}/lib/strtoofft.c"
"${LIBRARY_DIR}/lib/strerror.c"
"${LIBRARY_DIR}/lib/amigaos.c"
"${LIBRARY_DIR}/lib/cookie.c"
"${LIBRARY_DIR}/lib/curl_addrinfo.c"
"${LIBRARY_DIR}/lib/curl_des.c"
"${LIBRARY_DIR}/lib/curl_endian.c"
"${LIBRARY_DIR}/lib/curl_fnmatch.c"
"${LIBRARY_DIR}/lib/curl_get_line.c"
"${LIBRARY_DIR}/lib/curl_gethostname.c"
"${LIBRARY_DIR}/lib/curl_gssapi.c"
"${LIBRARY_DIR}/lib/curl_memrchr.c"
"${LIBRARY_DIR}/lib/curl_multibyte.c"
"${LIBRARY_DIR}/lib/curl_ntlm_core.c"
"${LIBRARY_DIR}/lib/curl_ntlm_wb.c"
"${LIBRARY_DIR}/lib/curl_path.c"
"${LIBRARY_DIR}/lib/curl_range.c"
"${LIBRARY_DIR}/lib/curl_rtmp.c"
"${LIBRARY_DIR}/lib/curl_sasl.c"
"${LIBRARY_DIR}/lib/curl_sspi.c"
"${LIBRARY_DIR}/lib/curl_threads.c"
"${LIBRARY_DIR}/lib/curl_trc.c"
"${LIBRARY_DIR}/lib/dict.c"
"${LIBRARY_DIR}/lib/doh.c"
"${LIBRARY_DIR}/lib/dynbuf.c"
"${LIBRARY_DIR}/lib/dynhds.c"
"${LIBRARY_DIR}/lib/easy.c"
"${LIBRARY_DIR}/lib/escape.c"
"${LIBRARY_DIR}/lib/file.c"
"${LIBRARY_DIR}/lib/fileinfo.c"
"${LIBRARY_DIR}/lib/fopen.c"
"${LIBRARY_DIR}/lib/formdata.c"
"${LIBRARY_DIR}/lib/getenv.c"
"${LIBRARY_DIR}/lib/getinfo.c"
"${LIBRARY_DIR}/lib/gopher.c"
"${LIBRARY_DIR}/lib/hash.c"
"${LIBRARY_DIR}/lib/headers.c"
"${LIBRARY_DIR}/lib/hmac.c"
"${LIBRARY_DIR}/lib/hostasyn.c"
"${LIBRARY_DIR}/lib/hostip.c"
"${LIBRARY_DIR}/lib/hostip4.c"
"${LIBRARY_DIR}/lib/hostip6.c"
"${LIBRARY_DIR}/lib/hostsyn.c"
"${LIBRARY_DIR}/lib/hsts.c"
"${LIBRARY_DIR}/lib/http.c"
"${LIBRARY_DIR}/lib/http2.c"
"${LIBRARY_DIR}/lib/http_aws_sigv4.c"
"${LIBRARY_DIR}/lib/http_chunks.c"
"${LIBRARY_DIR}/lib/http_digest.c"
"${LIBRARY_DIR}/lib/http_negotiate.c"
"${LIBRARY_DIR}/lib/http_ntlm.c"
"${LIBRARY_DIR}/lib/http_proxy.c"
"${LIBRARY_DIR}/lib/idn.c"
"${LIBRARY_DIR}/lib/if2ip.c"
"${LIBRARY_DIR}/lib/imap.c"
"${LIBRARY_DIR}/lib/inet_ntop.c"
"${LIBRARY_DIR}/lib/inet_pton.c"
"${LIBRARY_DIR}/lib/krb5.c"
"${LIBRARY_DIR}/lib/ldap.c"
"${LIBRARY_DIR}/lib/llist.c"
"${LIBRARY_DIR}/lib/md4.c"
"${LIBRARY_DIR}/lib/md5.c"
"${LIBRARY_DIR}/lib/memdebug.c"
"${LIBRARY_DIR}/lib/mime.c"
"${LIBRARY_DIR}/lib/mprintf.c"
"${LIBRARY_DIR}/lib/mqtt.c"
"${LIBRARY_DIR}/lib/multi.c"
"${LIBRARY_DIR}/lib/netrc.c"
"${LIBRARY_DIR}/lib/nonblock.c"
"${LIBRARY_DIR}/lib/noproxy.c"
"${LIBRARY_DIR}/lib/openldap.c"
"${LIBRARY_DIR}/lib/parsedate.c"
"${LIBRARY_DIR}/lib/pingpong.c"
"${LIBRARY_DIR}/lib/pop3.c"
"${LIBRARY_DIR}/lib/progress.c"
"${LIBRARY_DIR}/lib/psl.c"
"${LIBRARY_DIR}/lib/rand.c"
"${LIBRARY_DIR}/lib/rename.c"
"${LIBRARY_DIR}/lib/rtsp.c"
"${LIBRARY_DIR}/lib/select.c"
"${LIBRARY_DIR}/lib/splay.c"
"${LIBRARY_DIR}/lib/strdup.c"
"${LIBRARY_DIR}/lib/sendf.c"
"${LIBRARY_DIR}/lib/setopt.c"
"${LIBRARY_DIR}/lib/sha256.c"
"${LIBRARY_DIR}/lib/share.c"
"${LIBRARY_DIR}/lib/slist.c"
"${LIBRARY_DIR}/lib/smb.c"
"${LIBRARY_DIR}/lib/smtp.c"
"${LIBRARY_DIR}/lib/socketpair.c"
"${LIBRARY_DIR}/lib/socks.c"
"${LIBRARY_DIR}/lib/curl_addrinfo.c"
"${LIBRARY_DIR}/lib/socks_gssapi.c"
"${LIBRARY_DIR}/lib/socks_sspi.c"
"${LIBRARY_DIR}/lib/curl_sspi.c"
"${LIBRARY_DIR}/lib/slist.c"
"${LIBRARY_DIR}/lib/nonblock.c"
"${LIBRARY_DIR}/lib/curl_memrchr.c"
"${LIBRARY_DIR}/lib/imap.c"
"${LIBRARY_DIR}/lib/pop3.c"
"${LIBRARY_DIR}/lib/smtp.c"
"${LIBRARY_DIR}/lib/pingpong.c"
"${LIBRARY_DIR}/lib/rtsp.c"
"${LIBRARY_DIR}/lib/curl_threads.c"
"${LIBRARY_DIR}/lib/warnless.c"
"${LIBRARY_DIR}/lib/hmac.c"
"${LIBRARY_DIR}/lib/curl_rtmp.c"
"${LIBRARY_DIR}/lib/openldap.c"
"${LIBRARY_DIR}/lib/curl_gethostname.c"
"${LIBRARY_DIR}/lib/gopher.c"
"${LIBRARY_DIR}/lib/http_proxy.c"
"${LIBRARY_DIR}/lib/asyn-thread.c"
"${LIBRARY_DIR}/lib/curl_gssapi.c"
"${LIBRARY_DIR}/lib/http_ntlm.c"
"${LIBRARY_DIR}/lib/curl_ntlm_wb.c"
"${LIBRARY_DIR}/lib/curl_ntlm_core.c"
"${LIBRARY_DIR}/lib/curl_sasl.c"
"${LIBRARY_DIR}/lib/rand.c"
"${LIBRARY_DIR}/lib/curl_multibyte.c"
"${LIBRARY_DIR}/lib/conncache.c"
"${LIBRARY_DIR}/lib/cf-h1-proxy.c"
"${LIBRARY_DIR}/lib/http2.c"
"${LIBRARY_DIR}/lib/smb.c"
"${LIBRARY_DIR}/lib/curl_endian.c"
"${LIBRARY_DIR}/lib/curl_des.c"
"${LIBRARY_DIR}/lib/speedcheck.c"
"${LIBRARY_DIR}/lib/splay.c"
"${LIBRARY_DIR}/lib/strcase.c"
"${LIBRARY_DIR}/lib/strdup.c"
"${LIBRARY_DIR}/lib/strerror.c"
"${LIBRARY_DIR}/lib/strtok.c"
"${LIBRARY_DIR}/lib/strtoofft.c"
"${LIBRARY_DIR}/lib/system_win32.c"
"${LIBRARY_DIR}/lib/mime.c"
"${LIBRARY_DIR}/lib/sha256.c"
"${LIBRARY_DIR}/lib/setopt.c"
"${LIBRARY_DIR}/lib/curl_path.c"
"${LIBRARY_DIR}/lib/curl_range.c"
"${LIBRARY_DIR}/lib/psl.c"
"${LIBRARY_DIR}/lib/doh.c"
"${LIBRARY_DIR}/lib/urlapi.c"
"${LIBRARY_DIR}/lib/curl_get_line.c"
"${LIBRARY_DIR}/lib/altsvc.c"
"${LIBRARY_DIR}/lib/socketpair.c"
"${LIBRARY_DIR}/lib/bufref.c"
"${LIBRARY_DIR}/lib/bufq.c"
"${LIBRARY_DIR}/lib/dynbuf.c"
"${LIBRARY_DIR}/lib/dynhds.c"
"${LIBRARY_DIR}/lib/hsts.c"
"${LIBRARY_DIR}/lib/http_aws_sigv4.c"
"${LIBRARY_DIR}/lib/mqtt.c"
"${LIBRARY_DIR}/lib/rename.c"
"${LIBRARY_DIR}/lib/headers.c"
"${LIBRARY_DIR}/lib/telnet.c"
"${LIBRARY_DIR}/lib/timediff.c"
"${LIBRARY_DIR}/lib/vauth/vauth.c"
"${LIBRARY_DIR}/lib/timeval.c"
"${LIBRARY_DIR}/lib/transfer.c"
"${LIBRARY_DIR}/lib/url.c"
"${LIBRARY_DIR}/lib/urlapi.c"
"${LIBRARY_DIR}/lib/vauth/cleartext.c"
"${LIBRARY_DIR}/lib/vauth/cram.c"
"${LIBRARY_DIR}/lib/vauth/digest.c"
@ -138,23 +135,24 @@ set (SRCS
"${LIBRARY_DIR}/lib/vauth/oauth2.c"
"${LIBRARY_DIR}/lib/vauth/spnego_gssapi.c"
"${LIBRARY_DIR}/lib/vauth/spnego_sspi.c"
"${LIBRARY_DIR}/lib/vauth/vauth.c"
"${LIBRARY_DIR}/lib/version.c"
"${LIBRARY_DIR}/lib/vquic/vquic.c"
"${LIBRARY_DIR}/lib/vtls/openssl.c"
"${LIBRARY_DIR}/lib/vssh/libssh.c"
"${LIBRARY_DIR}/lib/vssh/libssh2.c"
"${LIBRARY_DIR}/lib/vtls/bearssl.c"
"${LIBRARY_DIR}/lib/vtls/gtls.c"
"${LIBRARY_DIR}/lib/vtls/vtls.c"
"${LIBRARY_DIR}/lib/vtls/nss.c"
"${LIBRARY_DIR}/lib/vtls/wolfssl.c"
"${LIBRARY_DIR}/lib/vtls/hostcheck.c"
"${LIBRARY_DIR}/lib/vtls/keylog.c"
"${LIBRARY_DIR}/lib/vtls/mbedtls.c"
"${LIBRARY_DIR}/lib/vtls/openssl.c"
"${LIBRARY_DIR}/lib/vtls/schannel.c"
"${LIBRARY_DIR}/lib/vtls/schannel_verify.c"
"${LIBRARY_DIR}/lib/vtls/sectransp.c"
"${LIBRARY_DIR}/lib/vtls/gskit.c"
"${LIBRARY_DIR}/lib/vtls/mbedtls.c"
"${LIBRARY_DIR}/lib/vtls/bearssl.c"
"${LIBRARY_DIR}/lib/vtls/keylog.c"
"${LIBRARY_DIR}/lib/vtls/vtls.c"
"${LIBRARY_DIR}/lib/vtls/wolfssl.c"
"${LIBRARY_DIR}/lib/vtls/x509asn1.c"
"${LIBRARY_DIR}/lib/vtls/hostcheck.c"
"${LIBRARY_DIR}/lib/vssh/libssh2.c"
"${LIBRARY_DIR}/lib/vssh/libssh.c"
"${LIBRARY_DIR}/lib/warnless.c"
)
add_library (_curl ${SRCS})

View File

@ -1,18 +1,7 @@
# docker build -t clickhouse/performance-comparison .
# Using ubuntu:22.04 over 20.04 as all other images, since:
# a) ubuntu 20.04 has too old parallel, and does not support --memsuspend
# b) anyway for perf tests it should not be important (backward compatiblity
# with older ubuntu had been checked lots of times in various tests)
FROM ubuntu:22.04
# ARG for quick switch to a given ubuntu mirror
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
ENV LANG=C.UTF-8
ENV TZ=Europe/Amsterdam
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ARG FROM_TAG=latest
FROM clickhouse/test-base:$FROM_TAG
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends \
@ -56,10 +45,9 @@ COPY * /
# node #0 should be less stable because of system interruptions. We bind
# randomly to node 1 or 0 to gather some statistics on that. We have to bind
# both servers and the tmpfs on which the database is stored. How to do it
# through Yandex Sandbox API is unclear, but by default tmpfs uses
# is unclear, but by default tmpfs uses
# 'process allocation policy', not sure which process but hopefully the one that
# writes to it, so just bind the downloader script as well. We could also try to
# remount it with proper options in Sandbox task.
# writes to it, so just bind the downloader script as well.
# https://www.kernel.org/doc/Documentation/filesystems/tmpfs.txt
# Double-escaped backslashes are a tribute to the engineering wonder of docker --
# it gives '/bin/sh: 1: [bash,: not found' otherwise.

View File

@ -90,7 +90,7 @@ function configure
set +m
wait_for_server $LEFT_SERVER_PORT $left_pid
echo Server for setup started
echo "Server for setup started"
clickhouse-client --port $LEFT_SERVER_PORT --query "create database test" ||:
clickhouse-client --port $LEFT_SERVER_PORT --query "rename table datasets.hits_v1 to test.hits" ||:
@ -156,9 +156,9 @@ function restart
wait_for_server $RIGHT_SERVER_PORT $right_pid
echo right ok
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.tables where database != 'system'"
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.tables where database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')"
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.build_options"
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.tables where database != 'system'"
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.tables where database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')"
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.build_options"
# Check again that both servers we started are running -- this is important
@ -352,14 +352,12 @@ function get_profiles
wait
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.query_log where type in ('QueryFinish', 'ExceptionWhileProcessing') format TSVWithNamesAndTypes" > left-query-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.query_thread_log format TSVWithNamesAndTypes" > left-query-thread-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.trace_log format TSVWithNamesAndTypes" > left-trace-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select arrayJoin(trace) addr, concat(splitByChar('/', addressToLine(addr))[-1], '#', demangle(addressToSymbol(addr)) ) name from system.trace_log group by addr format TSVWithNamesAndTypes" > left-addresses.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.metric_log format TSVWithNamesAndTypes" > left-metric-log.tsv ||: &
clickhouse-client --port $LEFT_SERVER_PORT --query "select * from system.asynchronous_metric_log format TSVWithNamesAndTypes" > left-async-metric-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.query_log where type in ('QueryFinish', 'ExceptionWhileProcessing') format TSVWithNamesAndTypes" > right-query-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.query_thread_log format TSVWithNamesAndTypes" > right-query-thread-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.trace_log format TSVWithNamesAndTypes" > right-trace-log.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select arrayJoin(trace) addr, concat(splitByChar('/', addressToLine(addr))[-1], '#', demangle(addressToSymbol(addr)) ) name from system.trace_log group by addr format TSVWithNamesAndTypes" > right-addresses.tsv ||: &
clickhouse-client --port $RIGHT_SERVER_PORT --query "select * from system.metric_log format TSVWithNamesAndTypes" > right-metric-log.tsv ||: &

View File

@ -19,31 +19,6 @@
<opentelemetry_span_log remove="remove"/>
<session_log remove="remove"/>
<!-- performance tests does not uses real block devices,
instead they stores everything in memory.
And so, to avoid extra memory reference switch *_log to Memory engine. -->
<query_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</query_log>
<query_thread_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</query_thread_log>
<trace_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</trace_log>
<metric_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</metric_log>
<asynchronous_metric_log>
<engine>ENGINE = Memory</engine>
<partition_by remove="remove"/>
</asynchronous_metric_log>
<uncompressed_cache_size>1000000000</uncompressed_cache_size>
<asynchronous_metrics_update_period_s>10</asynchronous_metrics_update_period_s>

View File

@ -31,8 +31,6 @@ function download
# Test all of them.
declare -a urls_to_try=(
"$S3_URL/PRs/$left_pr/$left_sha/$BUILD_NAME/performance.tar.zst"
"$S3_URL/$left_pr/$left_sha/$BUILD_NAME/performance.tar.zst"
"$S3_URL/$left_pr/$left_sha/$BUILD_NAME/performance.tgz"
)
for path in "${urls_to_try[@]}"

View File

@ -130,7 +130,7 @@ then
git -C right/ch diff --name-only "$base" pr -- :!tests/performance :!docker/test/performance-comparison | tee other-changed-files.txt
fi
# Set python output encoding so that we can print queries with Russian letters.
# Set python output encoding so that we can print queries with non-ASCII letters.
export PYTHONIOENCODING=utf-8
# By default, use the main comparison script from the tested package, so that we
@ -151,11 +151,7 @@ export PATH
export REF_PR
export REF_SHA
# Try to collect some core dumps. I've seen two patterns in Sandbox:
# 1) |/home/zomb-sandbox/venv/bin/python /home/zomb-sandbox/client/sandbox/bin/coredumper.py %e %p %g %u %s %P %c
# Not sure what this script does (puts them to sandbox resources, logs some messages?),
# and it's not accessible from inside docker anyway.
# 2) something like %e.%p.core.dmp. The dump should end up in the workspace directory.
# Try to collect some core dumps.
# At least we remove the ulimit and then try to pack some common file names into output.
ulimit -c unlimited
cat /proc/sys/kernel/core_pattern

View File

@ -190,7 +190,7 @@ These are the schema conversion manipulations you can do with table overrides fo
* Modify [column TTL](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#mergetree-column-ttl).
* Modify [column compression codec](/docs/en/sql-reference/statements/create/table.md/#codecs).
* Add [ALIAS columns](/docs/en/sql-reference/statements/create/table.md/#alias).
* Add [skipping indexes](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-data_skipping-indexes)
* Add [skipping indexes](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-data_skipping-indexes). Note that you need to enable `use_skip_indexes_if_final` setting to make them work (MaterializedMySQL is using `SELECT ... FINAL` by default)
* Add [projections](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#projections). Note that projection optimizations are
disabled when using `SELECT ... FINAL` (which MaterializedMySQL does by default), so their utility is limited here.
`INDEX ... TYPE hypothesis` as [described in the v21.12 blog post]](https://clickhouse.com/blog/en/2021/clickhouse-v21.12-released/)

View File

@ -2,6 +2,8 @@
#include <sys/resource.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
#include <base/getMemoryAmount.h>
#include <base/errnoToString.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/String.h>
@ -655,43 +657,66 @@ void LocalServer::processConfig()
/// There is no need for concurrent queries, override max_concurrent_queries.
global_context->getProcessList().setMaxSize(0);
/// Size of cache for uncompressed blocks. Zero means disabled.
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", "");
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0);
const size_t memory_amount = getMemoryAmount();
const double cache_size_to_ram_max_ratio = config().getDouble("cache_size_to_ram_max_ratio", 0.5);
const size_t max_cache_size = static_cast<size_t>(memory_amount * cache_size_to_ram_max_ratio);
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", DEFAULT_UNCOMPRESSED_CACHE_POLICY);
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE);
if (uncompressed_cache_size > max_cache_size)
{
uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (uncompressed_cache_size)
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size);
/// Size of cache for marks (index of MergeTree family of tables).
String mark_cache_policy = config().getString("mark_cache_policy", "");
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);
String mark_cache_policy = config().getString("mark_cache_policy", DEFAULT_MARK_CACHE_POLICY);
size_t mark_cache_size = config().getUInt64("mark_cache_size", DEFAULT_MARK_CACHE_MAX_SIZE);
if (!mark_cache_size)
LOG_ERROR(log, "Too low mark cache size will lead to severe performance degradation.");
if (mark_cache_size > max_cache_size)
{
mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size));
}
if (mark_cache_size)
global_context->setMarkCache(mark_cache_policy, mark_cache_size);
/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
if (index_uncompressed_cache_size > max_cache_size)
{
index_uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);
/// Size of cache for index marks (index of MergeTree skip indices).
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
if (index_mark_cache_size > max_cache_size)
{
index_mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);
/// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", DEFAULT_MMAP_CACHE_MAX_SIZE);
if (mmap_cache_size > max_cache_size)
{
mmap_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (mmap_cache_size)
global_context->setMMappedFileCache(mmap_cache_size);
/// In Server.cpp (./clickhouse-server), we would initialize the query cache here.
/// Intentionally not doing this in clickhouse-local as it doesn't make sense.
#if USE_EMBEDDED_COMPILER
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
size_t compiled_expression_cache_size = config().getUInt64("compiled_expression_cache_size", compiled_expression_cache_size_default);
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
size_t compiled_expression_cache_elements_size
= config().getUInt64("compiled_expression_cache_elements_size", compiled_expression_cache_elements_size_default);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
size_t compiled_expression_cache_max_elements = config().getUInt64("compiled_expression_cache_elements_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
#endif
/// NOTE: it is important to apply any overrides before

View File

@ -29,6 +29,7 @@
#include <Common/ShellCommand.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <Common/formatReadable.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getExecutablePath.h>
@ -658,7 +659,7 @@ try
global_context->addWarningMessage("Server was built with sanitizer. It will work slowly.");
#endif
const auto memory_amount = getMemoryAmount();
const size_t memory_amount = getMemoryAmount();
LOG_INFO(log, "Available RAM: {}; physical cores: {}; logical cores: {}.",
formatReadableSizeWithBinarySuffix(memory_amount),
@ -1485,16 +1486,14 @@ try
/// Set up caches.
size_t max_cache_size = static_cast<size_t>(memory_amount * server_settings.cache_size_to_ram_max_ratio);
const size_t max_cache_size = static_cast<size_t>(memory_amount * server_settings.cache_size_to_ram_max_ratio);
String uncompressed_cache_policy = server_settings.uncompressed_cache_policy;
LOG_INFO(log, "Uncompressed cache policy name {}", uncompressed_cache_policy);
size_t uncompressed_cache_size = server_settings.uncompressed_cache_size;
if (uncompressed_cache_size > max_cache_size)
{
uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Uncompressed cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
LOG_INFO(log, "Lowered uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size);
@ -1513,39 +1512,59 @@ try
server_settings.async_insert_queue_flush_on_shutdown));
}
size_t mark_cache_size = server_settings.mark_cache_size;
String mark_cache_policy = server_settings.mark_cache_policy;
size_t mark_cache_size = server_settings.mark_cache_size;
if (!mark_cache_size)
LOG_ERROR(log, "Too low mark cache size will lead to severe performance degradation.");
if (mark_cache_size > max_cache_size)
{
mark_cache_size = max_cache_size;
LOG_INFO(log, "Mark cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(mark_cache_size));
LOG_INFO(log, "Lowered mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size));
}
global_context->setMarkCache(mark_cache_policy, mark_cache_size);
if (server_settings.index_uncompressed_cache_size)
size_t index_uncompressed_cache_size = server_settings.index_uncompressed_cache_size;
if (index_uncompressed_cache_size > max_cache_size)
{
index_uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(server_settings.index_uncompressed_cache_size);
if (server_settings.index_mark_cache_size)
size_t index_mark_cache_size = server_settings.index_mark_cache_size;
if (index_mark_cache_size > max_cache_size)
{
index_mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_mark_cache_size)
global_context->setIndexMarkCache(server_settings.index_mark_cache_size);
if (server_settings.mmap_cache_size)
size_t mmap_cache_size = server_settings.mmap_cache_size;
if (mmap_cache_size > max_cache_size)
{
mmap_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (mmap_cache_size)
global_context->setMMappedFileCache(server_settings.mmap_cache_size);
/// A cache for query results.
global_context->setQueryCache(config());
size_t query_cache_max_size_in_bytes = config().getUInt64("query_cache.max_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_SIZE);
size_t query_cache_max_entries = config().getUInt64("query_cache.max_entries", DEFAULT_QUERY_CACHE_MAX_ENTRIES);
size_t query_cache_query_cache_max_entry_size_in_bytes = config().getUInt64("query_cache.max_entry_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES);
size_t query_cache_max_entry_size_in_rows = config().getUInt64("query_cache.max_entry_rows_in_rows", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS);
if (query_cache_max_size_in_bytes > max_cache_size)
{
query_cache_max_size_in_bytes = max_cache_size;
LOG_INFO(log, "Lowered query cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setQueryCache(query_cache_max_size_in_bytes, query_cache_max_entries, query_cache_query_cache_max_entry_size_in_bytes, query_cache_max_entry_size_in_rows);
#if USE_EMBEDDED_COMPILER
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
size_t compiled_expression_cache_size = config().getUInt64("compiled_expression_cache_size", compiled_expression_cache_size_default);
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
size_t compiled_expression_cache_elements_size = config().getUInt64("compiled_expression_cache_elements_size", compiled_expression_cache_elements_size_default);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
size_t compiled_expression_cache_max_elements = config().getUInt64("compiled_expression_cache_elements_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
#endif
/// Set path for format schema files

View File

@ -524,7 +524,7 @@ void ColumnAggregateFunction::insertDefault()
pushBackAndCreateState(data, arena, func.get());
}
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin, const UInt8 *) const
{
WriteBufferFromArena out(arena, begin);
func->serialize(data[n], out, version);

View File

@ -162,7 +162,7 @@ public:
void insertDefault() override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * src_arena) override;

View File

@ -205,7 +205,7 @@ void ColumnArray::insertData(const char * pos, size_t length)
}
StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
size_t array_size = sizeAt(n);
size_t offset = offsetAt(n);

View File

@ -77,7 +77,7 @@ public:
StringRef getDataAt(size_t n) const override;
bool isDefaultAt(size_t n) const override;
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -88,7 +88,7 @@ public:
void insertData(const char *, size_t) override { throwMustBeDecompressed(); }
void insertDefault() override { throwMustBeDecompressed(); }
void popBack(size_t) override { throwMustBeDecompressed(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeDecompressed(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeDecompressed(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeDecompressed(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeDecompressed(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeDecompressed(); }

View File

@ -151,7 +151,7 @@ public:
s -= n;
}
StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin) const override
StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin, const UInt8 *) const override
{
return data->serializeValueIntoArena(0, arena, begin);
}

View File

@ -59,9 +59,26 @@ bool ColumnDecimal<T>::hasEqualValues() const
}
template <is_decimal T>
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
auto * pos = arena.allocContinue(sizeof(T), begin);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &data[n], sizeof(T));
return StringRef(pos, sizeof(T));
}

View File

@ -80,7 +80,7 @@ public:
Float64 getFloat64(size_t n) const final { return DecimalUtils::convertTo<Float64>(data[n], scale); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -86,11 +86,28 @@ void ColumnFixedString::insertData(const char * pos, size_t length)
memset(chars.data() + old_size + length, 0, n - length);
}
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
auto * pos = arena.allocContinue(n, begin);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &chars[n * index], n);
return StringRef(pos, n);
return res;
}
const char * ColumnFixedString::deserializeAndInsertFromArena(const char * pos)

View File

@ -115,7 +115,7 @@ public:
chars.resize_assume_reserved(chars.size() - n * elems);
}
StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -96,7 +96,7 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot insert into {}", getName());
}
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot serialize from {}", getName());
}

View File

@ -255,7 +255,7 @@ void ColumnLowCardinality::insertData(const char * pos, size_t length)
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertData(pos, length));
}
StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return getDictionary().serializeValueIntoArena(getIndexes().getUInt(n), arena, begin);
}

View File

@ -87,7 +87,7 @@ public:
void popBack(size_t n) override { idx.popBack(n); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -111,7 +111,7 @@ void ColumnMap::popBack(size_t n)
nested->popBack(n);
}
StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return nested->serializeValueIntoArena(n, arena, begin);
}

View File

@ -58,7 +58,7 @@ public:
void insert(const Field & x) override;
void insertDefault() override;
void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -4,6 +4,10 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/WeakHash.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsDateTime.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
@ -34,6 +38,7 @@ ColumnNullable::ColumnNullable(MutableColumnPtr && nested_column_, MutableColumn
{
/// ColumnNullable cannot have constant nested column. But constant argument could be passed. Materialize it.
nested_column = getNestedColumn().convertToFullColumnIfConst();
nested_type = nested_column->getDataType();
if (!getNestedColumn().canBeInsideNullable())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "{} cannot be inside Nullable column", getNestedColumn().getName());
@ -134,21 +139,77 @@ void ColumnNullable::insertData(const char * pos, size_t length)
}
}
StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
const auto & arr = getNullMapData();
static constexpr auto s = sizeof(arr[0]);
char * pos;
auto * pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);
if (arr[n])
return StringRef(pos, s);
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
switch (nested_type)
{
case TypeIndex::UInt8:
return static_cast<const ColumnUInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt16:
return static_cast<const ColumnUInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt32:
return static_cast<const ColumnUInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt64:
return static_cast<const ColumnUInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt128:
return static_cast<const ColumnUInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt256:
return static_cast<const ColumnUInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int8:
return static_cast<const ColumnInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int16:
return static_cast<const ColumnInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int32:
return static_cast<const ColumnInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int64:
return static_cast<const ColumnInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int128:
return static_cast<const ColumnInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int256:
return static_cast<const ColumnInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float32:
return static_cast<const ColumnFloat32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float64:
return static_cast<const ColumnFloat64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date:
return static_cast<const ColumnDate *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date32:
return static_cast<const ColumnDate32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime:
return static_cast<const ColumnDateTime *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime64:
return static_cast<const ColumnDateTime64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::String:
return static_cast<const ColumnString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::FixedString:
return static_cast<const ColumnFixedString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal32:
return static_cast<const ColumnDecimal<Decimal32> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal64:
return static_cast<const ColumnDecimal<Decimal64> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal128:
return static_cast<const ColumnDecimal<Decimal128> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal256:
return static_cast<const ColumnDecimal<Decimal256> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UUID:
return static_cast<const ColumnUUID *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv4:
return static_cast<const ColumnIPv4 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv6:
return static_cast<const ColumnIPv6 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
default:
pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);
if (arr[n])
return StringRef(pos, s);
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
}
}
const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos)

View File

@ -6,6 +6,7 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include "Core/TypeId.h"
#include "config.h"
@ -62,7 +63,7 @@ public:
StringRef getDataAt(size_t) const override;
/// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
@ -212,6 +213,8 @@ public:
private:
WrappedPtr nested_column;
WrappedPtr null_map;
// optimize serializeValueIntoArena
TypeIndex nested_type;
template <bool negative>
void applyNullMapImpl(const NullMap & map);

View File

@ -244,7 +244,7 @@ public:
StringRef getDataAt(size_t) const override { throwMustBeConcrete(); }
bool isDefaultAt(size_t) const override { throwMustBeConcrete(); }
void insertData(const char *, size_t) override { throwMustBeConcrete(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeConcrete(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeConcrete(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeConcrete(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeConcrete(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeConcrete(); }

View File

@ -150,7 +150,7 @@ void ColumnSparse::insertData(const char * pos, size_t length)
insertSingleValue([&](IColumn & column) { column.insertData(pos, length); });
}
StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return values->serializeValueIntoArena(getValueIndex(n), arena, begin);
}

View File

@ -78,7 +78,7 @@ public:
/// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char *) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;

View File

@ -213,17 +213,30 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
}
StringRef ColumnString::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnString::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
size_t string_size = sizeAt(n);
size_t offset = offsetAt(n);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
res.size = sizeof(string_size) + string_size;
char * pos = arena.allocContinue(res.size, begin);
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(string_size) + string_size;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(string_size) + string_size;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &string_size, sizeof(string_size));
memcpy(pos + sizeof(string_size), &chars[offset], string_size);
res.data = pos;
return res;
}

View File

@ -11,6 +11,7 @@
#include <Common/memcmpSmall.h>
#include <Common/assert_cast.h>
#include <Core/Field.h>
#include <Common/Arena.h>
class Collator;
@ -168,7 +169,7 @@ public:
offsets.resize_assume_reserved(offsets.size() - n);
}
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -171,7 +171,7 @@ void ColumnTuple::popBack(size_t n)
column->popBack(n);
}
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
StringRef res(begin, 0);
for (const auto & column : columns)

View File

@ -61,7 +61,7 @@ public:
void insertFrom(const IColumn & src_, size_t n) override;
void insertDefault() override;
void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -79,7 +79,7 @@ public:
Float32 getFloat32(size_t n) const override { return getNestedColumn()->getFloat32(n); }
bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); }
bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash_func) const override
{
@ -373,7 +373,7 @@ size_t ColumnUnique<ColumnType>::uniqueInsertData(const char * pos, size_t lengt
}
template <typename ColumnType>
StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
if (is_nullable)
{

View File

@ -49,11 +49,28 @@ namespace ErrorCodes
}
template <typename T>
StringRef ColumnVector<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnVector<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
auto * pos = arena.allocContinue(sizeof(T), begin);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
unalignedStore<T>(pos, data[n]);
return StringRef(pos, sizeof(T));
return res;
}
template <typename T>

View File

@ -174,7 +174,7 @@ public:
data.resize_assume_reserved(data.size() - n);
}
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;

View File

@ -218,7 +218,7 @@ public:
* For example, to obtain unambiguous representation of Array of strings, strings data should be interleaved with their sizes.
* Parameter begin should be used with Arena::allocContinue.
*/
virtual StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const = 0;
virtual StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit = nullptr) const = 0;
/// Deserializes a value that was serialized using IColumn::serializeValueIntoArena method.
/// Returns pointer to the position after the read data.

View File

@ -57,7 +57,7 @@ public:
++s;
}
StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin) const override
StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin, const UInt8 *) const override
{
/// Has to put one useless byte into Arena, because serialization into zero number of bytes is ambiguous.
char * res = arena.allocContinue(1, begin);

View File

@ -117,7 +117,7 @@ void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const
const char * pos = nullptr;
for (size_t i = 0; i < num_values; ++i)
{
auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos);
auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos, nullptr);
const char * new_pos;
column_unique->uniqueDeserializeAndInsertFromArena(ref.data, new_pos);
ASSERT_EQ(new_pos - ref.data, ref.size) << "Deserialized data has different sizes at position " << i;
@ -140,8 +140,8 @@ void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const
const char * pos_lc = nullptr;
for (size_t i = 0; i < num_values; ++i)
{
auto ref_string = column.serializeValueIntoArena(i, arena_string, pos_string);
auto ref_lc = column_unique->serializeValueIntoArena(idx->getUInt(i), arena_lc, pos_lc);
auto ref_string = column.serializeValueIntoArena(i, arena_string, pos_string, nullptr);
auto ref_lc = column_unique->serializeValueIntoArena(idx->getUInt(i), arena_lc, pos_lc, nullptr);
ASSERT_EQ(ref_string, ref_lc) << "Serialized data is different from pattern at position " << i;
}
}

View File

@ -51,10 +51,11 @@ public:
{
auto on_weight_loss_function = [&](size_t weight_loss) { onRemoveOverflowWeightLoss(weight_loss); };
static constexpr std::string_view default_cache_policy = "SLRU";
if (cache_policy_name.empty())
{
static constexpr auto default_cache_policy = "SLRU";
cache_policy_name = default_cache_policy;
}
if (cache_policy_name == "LRU")
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/defines.h>
#include <base/unit.h>
#define DBMS_DEFAULT_PORT 9000
#define DBMS_DEFAULT_SECURE_PORT 9440
@ -64,6 +65,21 @@
/// Max depth of hierarchical dictionary
#define DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH 1000
/// Default maximum (total and entry) sizes and policies of various caches
static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_MARK_CACHE_MAX_SIZE = 5368_MiB;
static constexpr auto DEFAULT_MARK_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_MMAP_CACHE_MAX_SIZE = 1_KiB; /// chosen by rolling dice
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE = 128_MiB;
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES = 10'000;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_SIZE = 1_GiB;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRIES = 1024uz;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES = 1_MiB;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS = 30'000'000uz;
/// Query profiler cannot work with sanitizers.
/// Sanitizers are using quick "frame walking" stack unwinding (this implies -fno-omit-frame-pointer)
/// And they do unwinding frequently (on every malloc/free, thread/mutex operations, etc).

View File

@ -2,6 +2,7 @@
#include <Core/BaseSettings.h>
#include <Core/Defines.h>
namespace Poco::Util
@ -56,13 +57,13 @@ namespace DB
M(UInt64, max_concurrent_select_queries, 0, "Limit on total number of concurrently select queries. Zero means Unlimited.", 0) \
\
M(Double, cache_size_to_ram_max_ratio, 0.5, "Set cache size ro ram max ratio. Allows to lower cache size on low-memory systems.", 0) \
M(String, uncompressed_cache_policy, "SLRU", "Uncompressed cache policy name.", 0) \
M(UInt64, uncompressed_cache_size, 0, "Size of cache for uncompressed blocks. Zero means disabled.", 0) \
M(UInt64, mark_cache_size, 5368709120, "Size of cache for marks (index of MergeTree family of tables).", 0) \
M(String, mark_cache_policy, "SLRU", "Mark cache policy name.", 0) \
M(UInt64, index_uncompressed_cache_size, 0, "Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.", 0) \
M(UInt64, index_mark_cache_size, 0, "Size of cache for index marks. Zero means disabled.", 0) \
M(UInt64, mmap_cache_size, 1000, "A cache for mmapped files.", 0) /* The choice of default is arbitrary. */ \
M(String, uncompressed_cache_policy, DEFAULT_UNCOMPRESSED_CACHE_POLICY, "Uncompressed cache policy name.", 0) \
M(UInt64, uncompressed_cache_size, DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks. Zero means disabled.", 0) \
M(UInt64, mark_cache_size, DEFAULT_MARK_CACHE_MAX_SIZE, "Size of cache for marks (index of MergeTree family of tables).", 0) \
M(String, mark_cache_policy, DEFAULT_MARK_CACHE_POLICY, "Mark cache policy name.", 0) \
M(UInt64, index_uncompressed_cache_size, DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.", 0) \
M(UInt64, index_mark_cache_size, DEFAULT_INDEX_MARK_CACHE_MAX_SIZE, "Size of cache for index marks. Zero means disabled.", 0) \
M(UInt64, mmap_cache_size, DEFAULT_MMAP_CACHE_MAX_SIZE, "A cache for mmapped files.", 0) \
\
M(Bool, disable_internal_dns_cache, false, "Disable internal DNS caching at all.", 0) \
M(Int32, dns_cache_update_period, 15, "Internal DNS cache update period in seconds.", 0) \

View File

@ -20,8 +20,8 @@ using FunctionCreator = std::function<FunctionOverloadResolverPtr(ContextPtr)>;
using FunctionFactoryData = std::pair<FunctionCreator, FunctionDocumentation>;
/** Creates function by name.
* Function could use for initialization (take ownership of shared_ptr, for example)
* some dictionaries from Context.
* The provided Context is guaranteed to outlive the created function. Functions may use it for
* things like settings, current database, permission checks, etc.
*/
class FunctionFactory : private boost::noncopyable, public IFactoryWithAliases<FunctionFactoryData>
{

View File

@ -62,13 +62,14 @@ namespace ErrorCodes
*/
class FunctionDictHelper
class FunctionDictHelper : WithContext
{
public:
explicit FunctionDictHelper(ContextPtr context_) : current_context(context_) {}
explicit FunctionDictHelper(ContextPtr context_) : WithContext(context_) {}
std::shared_ptr<const IDictionary> getDictionary(const String & dictionary_name)
{
auto current_context = getContext();
auto dict = current_context->getExternalDictionariesLoader().getDictionary(dictionary_name, current_context);
if (!access_checked)
@ -131,12 +132,10 @@ public:
DictionaryStructure getDictionaryStructure(const String & dictionary_name) const
{
return current_context->getExternalDictionariesLoader().getDictionaryStructure(dictionary_name, current_context);
return getContext()->getExternalDictionariesLoader().getDictionaryStructure(dictionary_name, getContext());
}
private:
ContextPtr current_context;
/// Access cannot be not granted, since in this case checkAccess() will throw and access_checked will not be updated.
std::atomic<bool> access_checked = false;

View File

@ -336,7 +336,7 @@ private:
template <typename Name, template<typename> typename Impl>
class ExecutableFunctionJSON : public IExecutableFunction, WithContext
class ExecutableFunctionJSON : public IExecutableFunction
{
public:

View File

@ -493,7 +493,6 @@ void QueryCache::reset()
cache.reset();
std::lock_guard lock(mutex);
times_executed.clear();
cache_size_in_bytes = 0;
}
size_t QueryCache::weight() const
@ -511,7 +510,7 @@ size_t QueryCache::recordQueryRun(const Key & key)
std::lock_guard lock(mutex);
size_t times = ++times_executed[key];
// Regularly drop times_executed to avoid DOS-by-unlimited-growth.
static constexpr size_t TIMES_EXECUTED_MAX_SIZE = 10'000;
static constexpr auto TIMES_EXECUTED_MAX_SIZE = 10'000uz;
if (times_executed.size() > TIMES_EXECUTED_MAX_SIZE)
times_executed.clear();
return times;
@ -522,23 +521,19 @@ std::vector<QueryCache::Cache::KeyMapped> QueryCache::dump() const
return cache.dump();
}
QueryCache::QueryCache()
QueryCache::QueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_)
: cache(std::make_unique<TTLCachePolicy<Key, Entry, KeyHasher, QueryCacheEntryWeight, IsStale>>(std::make_unique<PerUserTTLCachePolicyUserQuota>()))
{
updateConfiguration(max_size_in_bytes, max_entries, max_entry_size_in_bytes_, max_entry_size_in_rows_);
}
void QueryCache::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
void QueryCache::updateConfiguration(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_)
{
std::lock_guard lock(mutex);
size_t max_size_in_bytes = config.getUInt64("query_cache.max_size_in_bytes", 1_GiB);
cache.setMaxSize(max_size_in_bytes);
size_t max_entries = config.getUInt64("query_cache.max_entries", 1024);
cache.setMaxCount(max_entries);
max_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size_in_bytes", 1_MiB);
max_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows_in_rows", 30'000'000);
max_entry_size_in_bytes = max_entry_size_in_bytes_;
max_entry_size_in_rows = max_entry_size_in_rows_;
}
}

View File

@ -4,7 +4,6 @@
#include <Core/Block.h>
#include <Parsers/IAST_fwd.h>
#include <Processors/Sources/SourceFromChunks.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Processors/Chunk.h>
#include <QueryPipeline/Pipe.h>
@ -110,9 +109,6 @@ private:
/// query --> query result
using Cache = CacheBase<Key, Entry, KeyHasher, QueryCacheEntryWeight>;
/// query --> query execution count
using TimesExecuted = std::unordered_map<Key, size_t, KeyHasher>;
public:
/// Buffers multiple partial query result chunks (buffer()) and eventually stores them as cache entry (finalizeWrite()).
///
@ -177,9 +173,9 @@ public:
friend class QueryCache; /// for createReader()
};
QueryCache();
QueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_);
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);
void updateConfiguration(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_);
Reader createReader(const Key & key);
Writer createWriter(const Key & key, std::chrono::milliseconds min_query_runtime, bool squash_partial_results, size_t max_block_size, size_t max_query_cache_size_in_bytes_quota, size_t max_query_cache_entries_quota);
@ -199,14 +195,15 @@ private:
Cache cache; /// has its own locking --> not protected by mutex
mutable std::mutex mutex;
/// query --> query execution count
using TimesExecuted = std::unordered_map<Key, size_t, KeyHasher>;
TimesExecuted times_executed TSA_GUARDED_BY(mutex);
/// Cache configuration
size_t max_entry_size_in_bytes TSA_GUARDED_BY(mutex) = 0;
size_t max_entry_size_in_rows TSA_GUARDED_BY(mutex) = 0;
size_t cache_size_in_bytes TSA_GUARDED_BY(mutex) = 0; /// Updated in each cache insert/delete
friend class StorageSystemQueryCache;
};

View File

@ -245,27 +245,27 @@ struct ContextSharedPart : boost::noncopyable
std::optional<BackupsWorker> backups_worker;
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
std::unique_ptr<AccessControl> access_control;
mutable ResourceManagerPtr resource_manager;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable QueryCachePtr query_cache; /// Cache of query results.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
ProcessList process_list; /// Executing queries at the moment.
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable QueryCachePtr query_cache; /// Cache of query results.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
ProcessList process_list; /// Executing queries at the moment.
SessionTracker session_tracker;
GlobalOvercommitTracker global_overcommit_tracker;
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
ReplicatedFetchList replicated_fetch_list;
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
mutable std::unique_ptr<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
mutable std::unique_ptr<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
@ -2269,7 +2269,7 @@ UncompressedCachePtr Context::getUncompressedCache() const
}
void Context::dropUncompressedCache() const
void Context::clearUncompressedCache() const
{
auto lock = getLock();
if (shared->uncompressed_cache)
@ -2293,7 +2293,7 @@ MarkCachePtr Context::getMarkCache() const
return shared->mark_cache;
}
void Context::dropMarkCache() const
void Context::clearMarkCache() const
{
auto lock = getLock();
if (shared->mark_cache)
@ -2315,32 +2315,6 @@ ThreadPool & Context::getLoadMarksThreadpool() const
return *shared->load_marks_threadpool;
}
static size_t getPrefetchThreadpoolSizeFromConfig(const Poco::Util::AbstractConfiguration & config)
{
return config.getUInt(".prefetch_threadpool_pool_size", 100);
}
size_t Context::getPrefetchThreadpoolSize() const
{
const auto & config = getConfigRef();
return getPrefetchThreadpoolSizeFromConfig(config);
}
ThreadPool & Context::getPrefetchThreadpool() const
{
const auto & config = getConfigRef();
auto lock = getLock();
if (!shared->prefetch_threadpool)
{
auto pool_size = getPrefetchThreadpoolSize();
auto queue_size = config.getUInt(".prefetch_threadpool_queue_size", 1000000);
shared->prefetch_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::IOPrefetchThreads, CurrentMetrics::IOPrefetchThreadsActive, pool_size, pool_size, queue_size);
}
return *shared->prefetch_threadpool;
}
void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
{
auto lock = getLock();
@ -2351,7 +2325,6 @@ void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
shared->index_uncompressed_cache = std::make_shared<UncompressedCache>(max_size_in_bytes);
}
UncompressedCachePtr Context::getIndexUncompressedCache() const
{
auto lock = getLock();
@ -2359,7 +2332,7 @@ UncompressedCachePtr Context::getIndexUncompressedCache() const
}
void Context::dropIndexUncompressedCache() const
void Context::clearIndexUncompressedCache() const
{
auto lock = getLock();
if (shared->index_uncompressed_cache)
@ -2383,44 +2356,13 @@ MarkCachePtr Context::getIndexMarkCache() const
return shared->index_mark_cache;
}
void Context::dropIndexMarkCache() const
void Context::clearIndexMarkCache() const
{
auto lock = getLock();
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
}
void Context::setQueryCache(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();
if (shared->query_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query cache has been already created.");
shared->query_cache = std::make_shared<QueryCache>();
shared->query_cache->updateConfiguration(config);
}
void Context::updateQueryCacheConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();
if (shared->query_cache)
shared->query_cache->updateConfiguration(config);
}
QueryCachePtr Context::getQueryCache() const
{
auto lock = getLock();
return shared->query_cache;
}
void Context::dropQueryCache() const
{
auto lock = getLock();
if (shared->query_cache)
shared->query_cache->reset();
}
void Context::setMMappedFileCache(size_t cache_size_in_num_entries)
{
auto lock = getLock();
@ -2437,15 +2379,50 @@ MMappedFileCachePtr Context::getMMappedFileCache() const
return shared->mmap_cache;
}
void Context::dropMMappedFileCache() const
void Context::clearMMappedFileCache() const
{
auto lock = getLock();
if (shared->mmap_cache)
shared->mmap_cache->reset();
}
void Context::setQueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes, size_t max_entry_size_in_rows)
{
auto lock = getLock();
void Context::dropCaches() const
if (shared->query_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query cache has been already created.");
shared->query_cache = std::make_shared<QueryCache>(max_size_in_bytes, max_entries, max_entry_size_in_bytes, max_entry_size_in_rows);
}
void Context::updateQueryCacheConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();
if (shared->query_cache)
{
size_t max_size_in_bytes = config.getUInt64("query_cache.max_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_SIZE);
size_t max_entries = config.getUInt64("query_cache.max_entries", DEFAULT_QUERY_CACHE_MAX_ENTRIES);
size_t max_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES);
size_t max_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows_in_rows", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS);
shared->query_cache->updateConfiguration(max_size_in_bytes, max_entries, max_entry_size_in_bytes, max_entry_size_in_rows);
}
}
QueryCachePtr Context::getQueryCache() const
{
auto lock = getLock();
return shared->query_cache;
}
void Context::clearQueryCache() const
{
auto lock = getLock();
if (shared->query_cache)
shared->query_cache->reset();
}
void Context::clearCaches() const
{
auto lock = getLock();
@ -2461,11 +2438,31 @@ void Context::dropCaches() const
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
if (shared->query_cache)
shared->query_cache->reset();
if (shared->mmap_cache)
shared->mmap_cache->reset();
/// Intentionally not dropping the query cache which is transactionally inconsistent by design.
}
ThreadPool & Context::getPrefetchThreadpool() const
{
const auto & config = getConfigRef();
auto lock = getLock();
if (!shared->prefetch_threadpool)
{
auto pool_size = getPrefetchThreadpoolSize();
auto queue_size = config.getUInt(".prefetch_threadpool_queue_size", 1000000);
shared->prefetch_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::IOPrefetchThreads, CurrentMetrics::IOPrefetchThreadsActive, pool_size, pool_size, queue_size);
}
return *shared->prefetch_threadpool;
}
size_t Context::getPrefetchThreadpoolSize() const
{
const auto & config = getConfigRef();
return config.getUInt(".prefetch_threadpool_pool_size", 100);
}
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const

View File

@ -915,44 +915,39 @@ public:
void setSystemZooKeeperLogAfterInitializationIfNeeded();
/// --- Caches ------------------------------------------------------------------------------------------
/// Create a cache of uncompressed blocks of specified size. This can be done only once.
void setUncompressedCache(const String & uncompressed_cache_policy, size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getUncompressedCache() const;
void dropUncompressedCache() const;
void clearUncompressedCache() const;
/// Create a cache of marks of specified size. This can be done only once.
void setMarkCache(const String & mark_cache_policy, size_t cache_size_in_bytes);
std::shared_ptr<MarkCache> getMarkCache() const;
void dropMarkCache() const;
void clearMarkCache() const;
ThreadPool & getLoadMarksThreadpool() const;
ThreadPool & getPrefetchThreadpool() const;
/// Note: prefetchThreadpool is different from threadpoolReader
/// in the way that its tasks are - wait for marks to be loaded
/// and make a prefetch by putting a read task to threadpoolReader.
size_t getPrefetchThreadpoolSize() const;
/// Create a cache of index uncompressed blocks of specified size. This can be done only once.
void setIndexUncompressedCache(size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getIndexUncompressedCache() const;
void dropIndexUncompressedCache() const;
void clearIndexUncompressedCache() const;
/// Create a cache of index marks of specified size. This can be done only once.
void setIndexMarkCache(size_t cache_size_in_bytes);
std::shared_ptr<MarkCache> getIndexMarkCache() const;
void dropIndexMarkCache() const;
void clearIndexMarkCache() const;
/// Create a cache of mapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
void setMMappedFileCache(size_t cache_size_in_num_entries);
std::shared_ptr<MMappedFileCache> getMMappedFileCache() const;
void dropMMappedFileCache() const;
void clearMMappedFileCache() const;
/// Create a cache of query results for statements which run repeatedly.
void setQueryCache(const Poco::Util::AbstractConfiguration & config);
void setQueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes, size_t max_entry_size_in_rows);
void updateQueryCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<QueryCache> getQueryCache() const;
void dropQueryCache() const;
void clearQueryCache() const;
/** Clear the caches of the uncompressed blocks and marks.
* This is usually done when renaming tables, changing the type of columns, deleting a table.
@ -960,7 +955,16 @@ public:
* (when deleting a table - it is necessary, since in its place another can appear)
* const - because the change in the cache is not considered significant.
*/
void dropCaches() const;
void clearCaches() const;
/// -----------------------------------------------------------------------------------------------------
ThreadPool & getPrefetchThreadpool() const;
/// Note: prefetchThreadpool is different from threadpoolReader
/// in the way that its tasks are - wait for marks to be loaded
/// and make a prefetch by putting a read task to threadpoolReader.
size_t getPrefetchThreadpoolSize() const;
/// Settings for MergeTree background tasks stored in config.xml
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;

View File

@ -9,6 +9,7 @@
#include <Common/Macros.h>
#include <Common/randomSeed.h>
#include <Common/atomicRename.h>
#include <Common/logger_useful.h>
#include <base/hex.h>
#include <Core/Defines.h>
@ -71,7 +72,6 @@
#include <Interpreters/ApplyWithSubqueryVisitor.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/logger_useful.h>
#include <DataTypes/DataTypeFixedString.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
@ -1329,10 +1329,32 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
}
data_path = database->getTableDataPath(create);
auto full_data_path = fs::path{getContext()->getPath()} / data_path;
if (!create.attach && !data_path.empty() && fs::exists(fs::path{getContext()->getPath()} / data_path))
throw Exception(storage_already_exists_error_code,
"Directory for {} data {} already exists", Poco::toLower(storage_name), String(data_path));
if (!create.attach && !data_path.empty() && fs::exists(full_data_path))
{
if (getContext()->getZooKeeperMetadataTransaction() &&
!getContext()->getZooKeeperMetadataTransaction()->isInitialQuery() &&
!DatabaseCatalog::instance().hasUUIDMapping(create.uuid) &&
Context::getGlobalContextInstance()->isServerCompletelyStarted() &&
Context::getGlobalContextInstance()->getConfigRef().getBool("allow_moving_table_directory_to_trash", false))
{
/// This is a secondary query from a Replicated database. It cannot be retried with another UUID, we must execute it as is.
/// We don't have a table with this UUID (and all metadata is loaded),
/// so the existing directory probably contains some leftovers from previous unsuccessful attempts to create the table
fs::path trash_path = fs::path{getContext()->getPath()} / "trash" / data_path / getHexUIntLowercase(thread_local_rng());
LOG_WARNING(&Poco::Logger::get("InterpreterCreateQuery"), "Directory for {} data {} already exists. Will move it to {}",
Poco::toLower(storage_name), String(data_path), trash_path);
fs::create_directories(trash_path.parent_path());
renameNoReplace(full_data_path, trash_path);
}
else
{
throw Exception(storage_already_exists_error_code,
"Directory for {} data {} already exists", Poco::toLower(storage_name), String(data_path));
}
}
bool from_path = create.attach_from_path.has_value();
String actual_data_path = data_path;

View File

@ -247,10 +247,10 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
DatabaseCatalog::instance().removeDependencies(table_id, check_ref_deps, check_loading_deps, is_drop_or_detach_database);
database->dropTable(context_, table_id.table_name, query.sync);
/// We have to drop mmapio cache when dropping table from Ordinary database
/// We have to clear mmapio cache when dropping table from Ordinary database
/// to avoid reading old data if new table with the same name is created
if (database->getUUID() == UUIDHelpers::Nil)
context_->dropMMappedFileCache();
context_->clearMMappedFileCache();
}
db = database;

View File

@ -541,13 +541,13 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), getContext(), SelectQueryOptions());
interpreter.buildQueryPlan(plan);
context = interpreter.getContext();
// collect the selected marks, rows, parts during build query pipeline.
plan.buildQueryPipeline(
// Collect the selected marks, rows, parts during build query pipeline.
// Hold on to the returned QueryPipelineBuilderPtr because `plan` may have pointers into
// it (through QueryPlanResourceHolder).
auto builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context));
if (settings.optimize)
plan.optimize(QueryPlanOptimizationSettings::fromContext(context));
plan.explainEstimate(res_columns);
insert_buf = false;
break;

View File

@ -319,27 +319,27 @@ BlockIO InterpreterSystemQuery::execute()
}
case Type::DROP_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->dropMarkCache();
system_context->clearMarkCache();
break;
case Type::DROP_UNCOMPRESSED_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->dropUncompressedCache();
system_context->clearUncompressedCache();
break;
case Type::DROP_INDEX_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->dropIndexMarkCache();
system_context->clearIndexMarkCache();
break;
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->dropIndexUncompressedCache();
system_context->clearIndexUncompressedCache();
break;
case Type::DROP_MMAP_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE);
system_context->dropMMappedFileCache();
system_context->clearMMappedFileCache();
break;
case Type::DROP_QUERY_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_QUERY_CACHE);
getContext()->dropQueryCache();
getContext()->clearQueryCache();
break;
#if USE_EMBEDDED_COMPILER
case Type::DROP_COMPILED_EXPRESSION_CACHE:

View File

@ -19,7 +19,7 @@ public:
size_t getCompiledExpressionSize() const { return compiled_expression_size; }
virtual ~CompiledExpressionCacheEntry() {}
virtual ~CompiledExpressionCacheEntry() = default;
private:

View File

@ -24,6 +24,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_SETTING_VALUE;
}
namespace
{
@ -52,7 +57,11 @@ ServerAsynchronousMetrics::ServerAsynchronousMetrics(
: AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_)
, WithContext(global_context_)
, heavy_metric_update_period(heavy_metrics_update_period_seconds)
{}
{
/// sanity check
if (update_period_seconds == 0 || heavy_metrics_update_period_seconds == 0)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting asynchronous_metrics_update_period_s and asynchronous_heavy_metrics_update_period_s must not be zero");
}
void ServerAsynchronousMetrics::updateImpl(AsynchronousMetricValues & new_values, TimePoint update_time, TimePoint current_time)
{

View File

@ -16,7 +16,7 @@ void optimizeTreeFirstPass(const QueryPlanOptimizationSettings & settings, Query
void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes);
/// Third pass is used to apply filters such as key conditions and skip indexes to the storages that support them.
/// After that it add CreateSetsStep for the subqueries that has not be used in the filters.
void optimizeTreeThirdPass(QueryPlan::Node & root, QueryPlan::Nodes & nodes);
void optimizeTreeThirdPass(QueryPlan & plan, QueryPlan::Node & root, QueryPlan::Nodes & nodes);
/// Optimization (first pass) is a function applied to QueryPlan::Node.
/// It can read and update subtree of specified node.
@ -113,7 +113,7 @@ void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections);
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes);
bool addPlansForSets(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
bool addPlansForSets(QueryPlan & plan, QueryPlan::Node & node, QueryPlan::Nodes & nodes);
/// Enable memory bound merging of aggregation states for remote queries
/// in case it was enabled for local plan

View File

@ -6,7 +6,7 @@
namespace DB::QueryPlanOptimizations
{
bool addPlansForSets(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
bool addPlansForSets(QueryPlan & root_plan, QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
auto * delayed = typeid_cast<DelayedCreatingSetsStep *>(node.step.get());
if (!delayed)
@ -23,7 +23,9 @@ bool addPlansForSets(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
input_streams.push_back(plan->getCurrentDataStream());
node.children.push_back(plan->getRootNode());
nodes.splice(nodes.end(), QueryPlan::detachNodes(std::move(*plan)));
auto [add_nodes, add_resources] = QueryPlan::detachNodesAndResources(std::move(*plan));
nodes.splice(nodes.end(), std::move(add_nodes));
root_plan.addResources(std::move(add_resources));
}
auto creating_sets = std::make_unique<CreatingSetsStep>(std::move(input_streams));

View File

@ -181,7 +181,7 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
"No projection is used when optimize_use_projections = 1 and force_optimize_projection = 1");
}
void optimizeTreeThirdPass(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
void optimizeTreeThirdPass(QueryPlan & plan, QueryPlan::Node & root, QueryPlan::Nodes & nodes)
{
Stack stack;
stack.push_back({.node = &root});
@ -205,7 +205,7 @@ void optimizeTreeThirdPass(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
source_step_with_filter->applyFilters();
}
addPlansForSets(*frame.node, nodes);
addPlansForSets(plan, *frame.node, nodes);
stack.pop_back();
}

View File

@ -482,7 +482,7 @@ void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_sett
QueryPlanOptimizations::optimizeTreeFirstPass(optimization_settings, *root, nodes);
QueryPlanOptimizations::optimizeTreeSecondPass(optimization_settings, *root, nodes);
QueryPlanOptimizations::optimizeTreeThirdPass(*root, nodes);
QueryPlanOptimizations::optimizeTreeThirdPass(*this, *root, nodes);
updateDataStreams(*root);
}
@ -542,9 +542,9 @@ void QueryPlan::explainEstimate(MutableColumns & columns)
}
}
QueryPlan::Nodes QueryPlan::detachNodes(QueryPlan && plan)
std::pair<QueryPlan::Nodes, QueryPlanResourceHolder> QueryPlan::detachNodesAndResources(QueryPlan && plan)
{
return std::move(plan.nodes);
return {std::move(plan.nodes), std::move(plan.resources)};
}
}

View File

@ -108,7 +108,7 @@ public:
using Nodes = std::list<Node>;
Node * getRootNode() const { return root; }
static Nodes detachNodes(QueryPlan && plan);
static std::pair<Nodes, QueryPlanResourceHolder> detachNodesAndResources(QueryPlan && plan);
private:
QueryPlanResourceHolder resources;

View File

@ -2328,7 +2328,7 @@ size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
removePartsFinally(parts_to_remove);
/// This is needed to close files to avoid they reside on disk after being deleted.
/// NOTE: we can drop files from cache more selectively but this is good enough.
getContext()->dropMMappedFileCache();
getContext()->clearMMappedFileCache();
return parts_to_remove.size();
}
@ -2834,7 +2834,7 @@ void MergeTreeData::rename(const String & new_table_path, const StorageID & new_
}
if (!getStorageID().hasUUID())
getContext()->dropCaches();
getContext()->clearCaches();
/// TODO: remove const_cast
for (const auto & part : data_parts_by_info)
@ -2875,9 +2875,9 @@ void MergeTreeData::dropAllData()
}
/// Tables in atomic databases have UUID and stored in persistent locations.
/// No need to drop caches (that are keyed by filesystem path) because collision is not possible.
/// No need to clear caches (that are keyed by filesystem path) because collision is not possible.
if (!getStorageID().hasUUID())
getContext()->dropCaches();
getContext()->clearCaches();
/// Removing of each data part before recursive removal of directory is to speed-up removal, because there will be less number of syscalls.
NameSet part_names_failed;

View File

@ -25,6 +25,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static constexpr auto DISTANCE_FUNCTION_L2 = "L2Distance";
static constexpr auto DISTANCE_FUNCTION_COSINE = "cosineDistance";
static constexpr auto DEFAULT_TREES = 100uz;
static constexpr auto DEFAULT_DISTANCE_FUNCTION = DISTANCE_FUNCTION_L2;
template <typename Distance>
AnnoyIndexWithSerialization<Distance>::AnnoyIndexWithSerialization(size_t dimensions)
@ -224,9 +229,9 @@ bool MergeTreeIndexConditionAnnoy::alwaysUnknownOrTrue() const
std::vector<size_t> MergeTreeIndexConditionAnnoy::getUsefulRanges(MergeTreeIndexGranulePtr idx_granule) const
{
if (distance_function == "L2Distance")
if (distance_function == DISTANCE_FUNCTION_L2)
return getUsefulRangesImpl<Annoy::Euclidean>(idx_granule);
else if (distance_function == "cosineDistance")
else if (distance_function == DISTANCE_FUNCTION_COSINE)
return getUsefulRangesImpl<Annoy::Angular>(idx_granule);
std::unreachable();
}
@ -289,9 +294,9 @@ MergeTreeIndexAnnoy::MergeTreeIndexAnnoy(const IndexDescription & index_, UInt64
MergeTreeIndexGranulePtr MergeTreeIndexAnnoy::createIndexGranule() const
{
if (distance_function == "L2Distance")
if (distance_function == DISTANCE_FUNCTION_L2)
return std::make_shared<MergeTreeIndexGranuleAnnoy<Annoy::Euclidean>>(index.name, index.sample_block);
else if (distance_function == "cosineDistance")
else if (distance_function == DISTANCE_FUNCTION_COSINE)
return std::make_shared<MergeTreeIndexGranuleAnnoy<Annoy::Angular>>(index.name, index.sample_block);
std::unreachable();
}
@ -299,9 +304,9 @@ MergeTreeIndexGranulePtr MergeTreeIndexAnnoy::createIndexGranule() const
MergeTreeIndexAggregatorPtr MergeTreeIndexAnnoy::createIndexAggregator() const
{
/// TODO: Support more metrics. Available metrics: https://github.com/spotify/annoy/blob/master/src/annoymodule.cc#L151-L171
if (distance_function == "L2Distance")
if (distance_function == DISTANCE_FUNCTION_L2)
return std::make_shared<MergeTreeIndexAggregatorAnnoy<Annoy::Euclidean>>(index.name, index.sample_block, trees);
else if (distance_function == "cosineDistance")
else if (distance_function == DISTANCE_FUNCTION_COSINE)
return std::make_shared<MergeTreeIndexAggregatorAnnoy<Annoy::Angular>>(index.name, index.sample_block, trees);
std::unreachable();
}
@ -313,14 +318,11 @@ MergeTreeIndexConditionPtr MergeTreeIndexAnnoy::createIndexCondition(const Selec
MergeTreeIndexPtr annoyIndexCreator(const IndexDescription & index)
{
static constexpr auto default_trees = 100uz;
static constexpr auto default_distance_function = "L2Distance";
String distance_function = default_distance_function;
String distance_function = DEFAULT_DISTANCE_FUNCTION;
if (!index.arguments.empty())
distance_function = index.arguments[0].get<String>();
UInt64 trees = default_trees;
UInt64 trees = DEFAULT_TREES;
if (index.arguments.size() > 1)
trees = index.arguments[1].get<UInt64>();
@ -350,8 +352,8 @@ void annoyIndexValidator(const IndexDescription & index, bool /* attach */)
if (!index.arguments.empty())
{
String distance_name = index.arguments[0].get<String>();
if (distance_name != "L2Distance" && distance_name != "cosineDistance")
throw Exception(ErrorCodes::INCORRECT_DATA, "Annoy index only supports distance functions 'L2Distance' and 'cosineDistance'");
if (distance_name != DISTANCE_FUNCTION_L2 && distance_name != DISTANCE_FUNCTION_COSINE)
throw Exception(ErrorCodes::INCORRECT_DATA, "Annoy index only supports distance functions '{}' and '{}'", DISTANCE_FUNCTION_L2, DISTANCE_FUNCTION_COSINE);
}
/// Check data type of indexed column:

View File

@ -777,7 +777,7 @@ void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr
num_marks_saved = 0;
total_rows = 0;
total_bytes = 0;
getContext()->dropMMappedFileCache();
getContext()->clearMMappedFileCache();
}

View File

@ -1997,7 +1997,7 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition(
}
/// New parts with other data may appear in place of deleted parts.
local_context->dropCaches();
local_context->clearCaches();
return results;
}

View File

@ -426,7 +426,7 @@ void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
num_indices_saved = 0;
total_rows = 0;
total_bytes = 0;
getContext()->dropMMappedFileCache();
getContext()->clearMMappedFileCache();
}

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python3
import os
import logging
import sys
@ -20,11 +19,15 @@ from get_robot_token import get_best_robot_token, get_parameter_from_ssm
from pr_info import PRInfo
from s3_helper import S3Helper
from tee_popen import TeePopen
from clickhouse_helper import get_instance_type
from stopwatch import Stopwatch
IMAGE_NAME = "clickhouse/performance-comparison"
def get_run_command(
check_start_time,
check_name,
workspace,
result_path,
repo_tests_path,
@ -33,12 +36,24 @@ def get_run_command(
additional_env,
image,
):
instance_type = get_instance_type()
envs = [
f"-e CHECK_START_TIME='{check_start_time}'",
f"-e CHECK_NAME='{check_name}'",
f"-e INSTANCE_TYPE='{instance_type}'",
f"-e PR_TO_TEST={pr_to_test}",
f"-e SHA_TO_TEST={sha_to_test}",
]
env_str = " ".join(envs)
return (
f"docker run --privileged --volume={workspace}:/workspace "
f"--volume={result_path}:/output "
f"--volume={repo_tests_path}:/usr/share/clickhouse-test "
f"--cap-add syslog --cap-add sys_admin --cap-add sys_rawio "
f"-e PR_TO_TEST={pr_to_test} -e SHA_TO_TEST={sha_to_test} {additional_env} "
f"{env_str} {additional_env} "
f"{image}"
)
@ -62,6 +77,9 @@ class RamDrive:
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
stopwatch = Stopwatch()
temp_path = os.getenv("TEMP_PATH", os.path.abspath("."))
repo_path = os.getenv("REPO_COPY", os.path.abspath("../../"))
repo_tests_path = os.path.join(repo_path, "tests")
@ -157,6 +175,8 @@ if __name__ == "__main__":
docker_env += "".join([f" -e {name}" for name in env_extra])
run_command = get_run_command(
stopwatch.start_time_str,
check_name,
result_path,
result_path,
repo_tests_path,
@ -168,6 +188,7 @@ if __name__ == "__main__":
logging.info("Going to run command %s", run_command)
run_log_path = os.path.join(temp_path, "run.log")
compare_log_path = os.path.join(result_path, "compare.log")
popen_env = os.environ.copy()
popen_env.update(env_extra)
@ -181,7 +202,7 @@ if __name__ == "__main__":
subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {temp_path}", shell=True)
paths = {
"compare.log": os.path.join(result_path, "compare.log"),
"compare.log": compare_log_path,
"output.7z": os.path.join(result_path, "output.7z"),
"report.html": os.path.join(result_path, "report.html"),
"all-queries.html": os.path.join(result_path, "all-queries.html"),

View File

@ -1,5 +1,6 @@
<clickhouse>
<database_atomic_delay_before_drop_table_sec>10</database_atomic_delay_before_drop_table_sec>
<allow_moving_table_directory_to_trash>1</allow_moving_table_directory_to_trash>
<merge_tree>
<initialization_retry_period>10</initialization_retry_period>
</merge_tree>

View File

@ -1262,7 +1262,7 @@ def test_recover_digest_mismatch(started_cluster):
"mv /var/lib/clickhouse/metadata/recover_digest_mismatch/t1.sql /var/lib/clickhouse/metadata/recover_digest_mismatch/m1.sql",
"sed --follow-symlinks -i 's/Int32/String/' /var/lib/clickhouse/metadata/recover_digest_mismatch/mv1.sql",
"rm -f /var/lib/clickhouse/metadata/recover_digest_mismatch/d1.sql",
# f"rm -rf /var/lib/clickhouse/metadata/recover_digest_mismatch/", # Directory already exists
"rm -rf /var/lib/clickhouse/metadata/recover_digest_mismatch/", # Will trigger "Directory already exists"
"rm -rf /var/lib/clickhouse/store",
]

View File

@ -190,15 +190,3 @@ def test_information_schema():
)
== "1\n"
)
assert (
node.query(
"SELECT count() FROM information_schema.TABLES WHERE table_name='TABLES'"
)
== "2\n"
)
assert (
node.query(
"SELECT count() FROM INFORMATION_SCHEMA.tables WHERE table_name='tables'"
)
== "3\n"
)

View File

@ -0,0 +1,32 @@
<test>
<settings>
<max_insert_threads>8</max_insert_threads>
<allow_experimental_projection_optimization>0</allow_experimental_projection_optimization>
<max_threads>4</max_threads>
</settings>
<create_query>
CREATE TABLE t_nullable
(
key_string1 Nullable(String),
key_string2 Nullable(String),
key_string3 Nullable(String),
key_int64_1 Nullable(Int64),
key_int64_2 Nullable(Int64),
key_int64_3 Nullable(Int64),
key_int64_4 Nullable(Int64),
key_int64_5 Nullable(Int64),
m1 Int64,
m2 Int64
)
ENGINE = Memory
</create_query>
<fill_query>insert into t_nullable select ['aaaaaa','bbaaaa','ccaaaa','ddaaaa'][number % 101 + 1], ['aa','bb','cc','dd'][number % 100 + 1], ['aa','bb','cc','dd'][number % 102 + 1], number%10+1, number%10+2, number%10+3, number%10+4,number%10+5, number%6000+1, number%5000+2 from numbers_mt(20000000)</fill_query>
<query>select key_string1,key_string2,key_string3, min(m1) from t_nullable group by key_string1,key_string2,key_string3</query>
<query>select key_string3,key_int64_1,key_int64_2, min(m1) from t_nullable group by key_string3,key_int64_1,key_int64_2</query>
<query>select key_int64_1,key_int64_2,key_int64_3,key_int64_4,key_int64_5, min(m1) from t_nullable group by key_int64_1,key_int64_2,key_int64_3,key_int64_4,key_int64_5</query>
<query>select toFloat64(key_int64_1),toFloat64(key_int64_2),toFloat64(key_int64_3),toFloat64(key_int64_4),toFloat64(key_int64_5), min(m1) from t_nullable group by toFloat64(key_int64_1),toFloat64(key_int64_2),toFloat64(key_int64_3),toFloat64(key_int64_4),toFloat64(key_int64_5) limit 10</query>
<query>select toDecimal64(key_int64_1, 3),toDecimal64(key_int64_2, 3),toDecimal64(key_int64_3, 3),toDecimal64(key_int64_4, 3),toDecimal64(key_int64_5, 3), min(m1) from t_nullable group by toDecimal64(key_int64_1, 3),toDecimal64(key_int64_2, 3),toDecimal64(key_int64_3, 3),toDecimal64(key_int64_4, 3),toDecimal64(key_int64_5, 3) limit 10</query>
<drop_query>drop table if exists t_nullable</drop_query>
</test>

View File

@ -33,3 +33,5 @@ default default v default v f 2 0 Float64 \N \N \N \N \N \N \N \N \N \N \N \N \
tmp tmp d 1 0 Date \N \N \N \N \N 0 \N \N \N \N \N \N \N \N \N Date
tmp tmp dt 2 0 DateTime \N \N \N \N \N 0 \N \N \N \N \N \N \N \N \N DateTime
tmp tmp dtms 3 0 DateTime64(3) \N \N \N \N \N 3 \N \N \N \N \N \N \N \N \N DateTime64(3)
1
1

View File

@ -1,20 +1,31 @@
show tables from information_schema;
SHOW TABLES FROM information_schema;
SHOW TABLES FROM INFORMATION_SCHEMA;
create table t (n UInt64, f Float32, s String, fs FixedString(42), d Decimal(9, 6)) engine=Memory;
create view v (n Nullable(Int32), f Float64) as select n, f from t;
create materialized view mv engine=Null as select * from system.one;
create temporary table tmp (d Date, dt DateTime, dtms DateTime64(3));
DROP TABLE IF EXISTS t;
DROP VIEW IF EXISTS v;
DROP VIEW IF EXISTS mv;
DROP TABLE IF EXISTS tmp;
CREATE TABLE t (n UInt64, f Float32, s String, fs FixedString(42), d Decimal(9, 6)) ENGINE=Memory;
CREATE VIEW v (n Nullable(Int32), f Float64) AS SELECT n, f FROM t;
CREATE MATERIALIZED VIEW mv ENGINE=Null AS SELECT * FROM system.one;
CREATE TEMPORARY TABLE tmp (d Date, dt DateTime, dtms DateTime64(3));
-- FIXME #28687
select * from information_schema.schemata where schema_name ilike 'information_schema';
SELECT * FROM information_schema.schemata WHERE schema_name ilike 'information_schema';
-- SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE (TABLE_SCHEMA=currentDatabase() OR TABLE_SCHEMA='') AND TABLE_NAME NOT LIKE '%inner%';
SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE (table_schema=currentDatabase() OR table_schema='') AND table_name NOT LIKE '%inner%';
select * from information_schema.views where table_schema=currentDatabase();
SELECT * FROM information_schema.views WHERE table_schema=currentDatabase();
-- SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE (TABLE_SCHEMA=currentDatabase() OR TABLE_SCHEMA='') AND TABLE_NAME NOT LIKE '%inner%';
SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE (table_schema=currentDatabase() OR table_schema='') AND table_name NOT LIKE '%inner%';
drop table t;
drop view v;
-- mixed upper/lowercase schema and table name:
SELECT count() FROM information_schema.TABLES WHERE table_schema=currentDatabase() AND table_name = 't';
SELECT count() FROM INFORMATION_SCHEMA.tables WHERE table_schema=currentDatabase() AND table_name = 't';
SELECT count() FROM INFORMATION_schema.tables WHERE table_schema=currentDatabase() AND table_name = 't'; -- { serverError UNKNOWN_DATABASE }
SELECT count() FROM information_schema.taBLES WHERE table_schema=currentDatabase() AND table_name = 't'; -- { serverError UNKNOWN_TABLE }
drop view mv;
drop view v;
drop table t;

View File

@ -1,4 +1,4 @@
default begin inserts
default end inserts
20 210
20 210
30 465
30 465

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: race, zookeeper, no-parallel
# Tags: race, zookeeper, long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
@ -29,9 +29,19 @@ function thread_attach()
done
}
insert_type=$(($RANDOM % 3))
$CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'insert_type $insert_type' FORMAT Null"
function insert()
{
$CLICKHOUSE_CLIENT -q "INSERT INTO alter_table$(($RANDOM % 2)) SELECT $RANDOM, $i" 2>/dev/null
# Fault injection may lead to duplicates
if [[ "$insert_type" -eq 0 ]]; then
$CLICKHOUSE_CLIENT --insert_deduplication_token=$1 -q "INSERT INTO alter_table$(($RANDOM % 2)) SELECT $RANDOM, $1" 2>/dev/null
elif [[ "$insert_type" -eq 1 ]]; then
$CLICKHOUSE_CLIENT -q "INSERT INTO alter_table$(($RANDOM % 2)) SELECT $1, $1" 2>/dev/null
else
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "INSERT INTO alter_table$(($RANDOM % 2)) SELECT $RANDOM, $1" 2>/dev/null
fi
}
thread_detach & PID_1=$!
@ -41,8 +51,8 @@ thread_attach & PID_4=$!
function do_inserts()
{
for i in {1..20}; do
while ! insert; do $CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'retrying insert $i' FORMAT Null"; done
for i in {1..30}; do
while ! insert $i; do $CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'retrying insert $i' FORMAT Null"; done
done
}

View File

@ -0,0 +1,34 @@
sessions:
150
port_0_sessions:
0
address_0_sessions:
0
tcp_sessions
60
http_sessions
30
http_with_session_id_sessions
30
my_sql_sessions
30
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10

View File

@ -0,0 +1,138 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-debug
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
# Each user uses a separate thread.
readonly TCP_USERS=( "02833_TCP_USER_${PID}"_{1,2} ) # 2 concurrent TCP users
readonly HTTP_USERS=( "02833_HTTP_USER_${PID}" )
readonly HTTP_WITH_SESSION_ID_SESSION_USERS=( "02833_HTTP_WITH_SESSION_ID_USER_${PID}" )
readonly MYSQL_USERS=( "02833_MYSQL_USER_${PID}")
readonly ALL_USERS=( "${TCP_USERS[@]}" "${HTTP_USERS[@]}" "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}" "${MYSQL_USERS[@]}" )
readonly TCP_USERS_SQL_COLLECTION_STRING="$( echo "${TCP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly HTTP_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_WITH_SESSION_ID_SESSION_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly MYSQL_USERS_SQL_COLLECTION_STRING="$( echo "${MYSQL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly ALL_USERS_SQL_COLLECTION_STRING="$( echo "${ALL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
for user in "${ALL_USERS[@]}"; do
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${user} IDENTIFIED WITH plaintext_password BY 'pass'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${user}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${user}";
done
# All <type>_session functions execute in separate threads.
# These functions try to create a session with successful login and logout.
# Sleep a small, random amount of time to make concurrency more intense.
# and try to login with an invalid password.
function tcp_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM})" --user="${user}" --password="pass"
# login failure
${CLICKHOUSE_CLIENT} -q "SELECT 2" --user="${user}" --password 'invalid'
done
}
function http_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT 3, sleep(0.01${RANDOM})"
# login failure
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=wrong" -d "SELECT 4"
done
}
function http_with_session_id_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=pass" -d "SELECT 5, sleep 0.01${RANDOM}"
# login failure
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=wrong" -d "SELECT 6"
done
}
function mysql_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM}) FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'pass')"
# login failure
${CLICKHOUSE_CLIENT} -q "SELECT 1 FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'wrong', SETTINGS connection_max_tries=1)"
done
}
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})"
export -f tcp_session;
export -f http_session;
export -f http_with_session_id_session;
export -f mysql_session;
for user in "${TCP_USERS[@]}"; do
timeout 60s bash -c "tcp_session ${user}" >/dev/null 2>&1 &
done
for user in "${HTTP_USERS[@]}"; do
timeout 60s bash -c "http_session ${user}" >/dev/null 2>&1 &
done
for user in "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}"; do
timeout 60s bash -c "http_with_session_id_session ${user}" >/dev/null 2>&1 &
done
for user in "${MYSQL_USERS[@]}"; do
timeout 60s bash -c "mysql_session ${user}" >/dev/null 2>&1 &
done
wait
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})"
echo "port_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_port = 0"
echo "address_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_address = toIPv6('::')"
echo "tcp_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${TCP_USERS_SQL_COLLECTION_STRING}) AND interface = 'TCP'"
echo "http_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
echo "http_with_session_id_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
echo "my_sql_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${MYSQL_USERS_SQL_COLLECTION_STRING}) AND interface = 'MySQL'"
for user in "${ALL_USERS[@]}"; do
${CLICKHOUSE_CLIENT} -q "DROP USER ${user}"
echo "Corresponding LoginSuccess/Logout"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'Logout')"
echo "LoginFailure"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${user}' AND type = 'LoginFailure'"
done

View File

@ -0,0 +1,13 @@
0
0
0
0
client_port 0 connections:
0
client_address '::' connections:
0
login failures:
0
TCP Login and logout count is equal
HTTP Login and logout count is equal
MySQL Login and logout count is equal

View File

@ -0,0 +1,56 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
readonly TEST_USER=$"02834_USER_${PID}"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER} IDENTIFIED WITH plaintext_password BY 'pass'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT CREATE TEMPORARY TABLE, MYSQL, REMOTE ON *.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'"
${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \
-d "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')"
${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \
-d "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "client_port 0 connections:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_port = 0"
echo "client_address '::' connections:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_address = toIPv6('::')"
echo "login failures:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and type = 'LoginFailure'"
# remote(...) function sometimes reuses old cached sessions for query execution.
# This makes LoginSuccess/Logout entries count unstable, but success and logouts must always match.
for interface in 'TCP' 'HTTP' 'MySQL'
do
LOGIN_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}'"`
CORRESPONDING_LOGOUT_RECORDS_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}')"`
if [ "$LOGIN_COUNT" == "$CORRESPONDING_LOGOUT_RECORDS_COUNT" ]; then
echo "${interface} Login and logout count is equal"
else
TOTAL_LOGOUT_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}'"`
echo "${interface} Login count ${LOGIN_COUNT} != corresponding logout count ${CORRESPONDING_LOGOUT_RECORDS_COUNT}. TOTAL_LOGOUT_COUNT ${TOTAL_LOGOUT_COUNT}"
fi
done
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"

View File

@ -0,0 +1,8 @@
port_0_sessions:
0
address_0_sessions:
0
Corresponding LoginSuccess/Logout
9
LoginFailure
0

View File

@ -0,0 +1,114 @@
#!/usr/bin/env bash
# Tags: no-debug
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
readonly TEST_USER="02835_USER_${PID}"
readonly TEST_ROLE="02835_ROLE_${PID}"
readonly TEST_PROFILE="02835_PROFILE_${PID}"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
function tcp_session()
{
local user=$1
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.numbers" --user="${user}"
}
function http_session()
{
local user=$1
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers"
}
function http_with_session_id_session()
{
local user=$1
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers"
}
# Busy-waits until user $1, specified amount of queries ($2) will run simultaneously.
function wait_for_queries_start()
{
local user=$1
local queries_count=$2
# 10 seconds waiting
counter=0 retries=100
while [[ $counter -lt $retries ]]; do
result=$($CLICKHOUSE_CLIENT --query "SELECT COUNT(*) FROM system.processes WHERE user = '${user}'")
if [[ $result == "${queries_count}" ]]; then
break;
fi
sleep 0.1
((++counter))
done
}
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'"
# DROP USE CASE
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
export -f tcp_session;
export -f http_session;
export -f http_with_session_id_session;
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
# DROP ROLE CASE
${CLICKHOUSE_CLIENT} -q "CREATE ROLE IF NOT EXISTS ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} DEFAULT ROLE ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP ROLE ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
# DROP PROFILE CASE
${CLICKHOUSE_CLIENT} -q "CREATE SETTINGS PROFILE IF NOT EXISTS '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} SETTINGS PROFILE '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP SETTINGS PROFILE '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "port_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_port = 0"
echo "address_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_address = toIPv6('::')"
echo "Corresponding LoginSuccess/Logout"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS}, FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout')"
echo "LoginFailure"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginFailure'"

View File

@ -1,23 +1,36 @@
DROP DICTIONARY IF EXISTS dict;
DROP TABLE IF EXISTS source;
DROP DICTIONARY IF EXISTS 02843_dict;
DROP TABLE IF EXISTS 02843_source;
DROP TABLE IF EXISTS 02843_join;
CREATE TABLE source
CREATE TABLE 02843_source
(
id UInt64,
value String
)
ENGINE=Memory;
CREATE DICTIONARY dict
CREATE DICTIONARY 02843_dict
(
id UInt64,
value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(TABLE 'source'))
SOURCE(CLICKHOUSE(TABLE '02843_source'))
LAYOUT(DIRECT());
SELECT 1 IN (SELECT dictGet('dict', 'value', materialize('1')));
SELECT 1 IN (SELECT dictGet('02843_dict', 'value', materialize('1')));
DROP DICTIONARY dict;
DROP TABLE source;
CREATE TABLE 02843_join (id UInt8, value String) ENGINE Join(ANY, LEFT, id);
SELECT 1 IN (SELECT joinGet(02843_join, 'value', materialize(1)));
SELECT 1 IN (SELECT joinGetOrNull(02843_join, 'value', materialize(1)));
SELECT 1 IN (SELECT materialize(connectionId()));
SELECT 1000000 IN (SELECT materialize(getSetting('max_threads')));
SELECT 1 in (SELECT file(materialize('a'))); -- { serverError 107 }
EXPLAIN ESTIMATE SELECT 1 IN (SELECT dictGet('02843_dict', 'value', materialize('1')));
EXPLAIN ESTIMATE SELECT 1 IN (SELECT joinGet(`02843_join`, 'value', materialize(1)));
DROP DICTIONARY 02843_dict;
DROP TABLE 02843_source;
DROP TABLE 02843_join;