mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge branch 'master' into add-reading-from-archives
This commit is contained in:
commit
e366ca61f5
@ -23,7 +23,6 @@
|
||||
* Added `Overlay` database engine to combine multiple databases into one. Added `Filesystem` database engine to represent a directory in the filesystem as a set of implicitly available tables with auto-detected formats and structures. A new `S3` database engine allows to read-only interact with s3 storage by representing a prefix as a set of tables. A new `HDFS` database engine allows to interact with HDFS storage in the same way. [#48821](https://github.com/ClickHouse/ClickHouse/pull/48821) ([alekseygolub](https://github.com/alekseygolub)).
|
||||
* Add support for external disks in Keeper for storing snapshots and logs. [#50098](https://github.com/ClickHouse/ClickHouse/pull/50098) ([Antonio Andelic](https://github.com/antonio2368)).
|
||||
* Add support for multi-directory selection (`{}`) globs. [#50559](https://github.com/ClickHouse/ClickHouse/pull/50559) ([Andrey Zvonov](https://github.com/zvonand)).
|
||||
* Support ZooKeeper `reconfig` command for ClickHouse Keeper with incremental reconfiguration which can be enabled via `keeper_server.enable_reconfiguration` setting. Support adding servers, removing servers, and changing server priorities. [#49450](https://github.com/ClickHouse/ClickHouse/pull/49450) ([Mike Kot](https://github.com/myrrc)).
|
||||
* Kafka connector can fetch Avro schema from schema registry with basic authentication using url-encoded credentials. [#49664](https://github.com/ClickHouse/ClickHouse/pull/49664) ([Ilya Golshtein](https://github.com/ilejn)).
|
||||
* Add function `arrayJaccardIndex` which computes the Jaccard similarity between two arrays. [#50076](https://github.com/ClickHouse/ClickHouse/pull/50076) ([FFFFFFFHHHHHHH](https://github.com/FFFFFFFHHHHHHH)).
|
||||
* Add a column `is_obsolete` to `system.settings` and similar tables. Closes [#50819](https://github.com/ClickHouse/ClickHouse/issues/50819). [#50826](https://github.com/ClickHouse/ClickHouse/pull/50826) ([flynn](https://github.com/ucasfl)).
|
||||
@ -124,6 +123,7 @@
|
||||
* (experimental MaterializedMySQL) Now double quoted comments are supported in MaterializedMySQL. [#52355](https://github.com/ClickHouse/ClickHouse/pull/52355) ([Val Doroshchuk](https://github.com/valbok)).
|
||||
* Upgrade Intel QPL from v1.1.0 to v1.2.0 2. Upgrade Intel accel-config from v3.5 to v4.0 3. Fixed issue that Device IOTLB miss has big perf. impact for IAA accelerators. [#52180](https://github.com/ClickHouse/ClickHouse/pull/52180) ([jasperzhu](https://github.com/jinjunzh)).
|
||||
* The `session_timezone` setting (new in version 23.6) is demoted to experimental. [#52445](https://github.com/ClickHouse/ClickHouse/pull/52445) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
* Support ZooKeeper `reconfig` command for ClickHouse Keeper with incremental reconfiguration which can be enabled via `keeper_server.enable_reconfiguration` setting. Support adding servers, removing servers, and changing server priorities. [#49450](https://github.com/ClickHouse/ClickHouse/pull/49450) ([Mike Kot](https://github.com/myrrc)). It is suspected that this feature is incomplete.
|
||||
|
||||
#### Build/Testing/Packaging Improvement
|
||||
* Add experimental ClickHouse builds for Linux RISC-V 64 to CI. [#31398](https://github.com/ClickHouse/ClickHouse/pull/31398) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <functional>
|
||||
#include <iosfwd>
|
||||
|
||||
#include <base/defines.h>
|
||||
#include <base/types.h>
|
||||
#include <base/unaligned.h>
|
||||
|
||||
@ -274,6 +275,8 @@ struct CRC32Hash
|
||||
if (size == 0)
|
||||
return 0;
|
||||
|
||||
chassert(pos);
|
||||
|
||||
if (size < 8)
|
||||
{
|
||||
return static_cast<unsigned>(hashLessThan8(x.data, x.size));
|
||||
|
@ -115,8 +115,15 @@
|
||||
/// because SIGABRT is easier to debug than SIGTRAP (the second one makes gdb crazy)
|
||||
#if !defined(chassert)
|
||||
#if defined(ABORT_ON_LOGICAL_ERROR)
|
||||
// clang-format off
|
||||
#include <base/types.h>
|
||||
namespace DB
|
||||
{
|
||||
void abortOnFailedAssertion(const String & description);
|
||||
}
|
||||
#define chassert(x) static_cast<bool>(x) ? void(0) : ::DB::abortOnFailedAssertion(#x)
|
||||
#define UNREACHABLE() abort()
|
||||
// clang-format off
|
||||
#else
|
||||
/// Here sizeof() trick is used to suppress unused warning for result,
|
||||
/// since simple "(void)x" will evaluate the expression, while
|
||||
|
@ -57,7 +57,7 @@ public:
|
||||
URI();
|
||||
/// Creates an empty URI.
|
||||
|
||||
explicit URI(const std::string & uri, bool disable_url_encoding = false);
|
||||
explicit URI(const std::string & uri, bool enable_url_encoding = true);
|
||||
/// Parses an URI from the given string. Throws a
|
||||
/// SyntaxException if the uri is not valid.
|
||||
|
||||
@ -362,7 +362,7 @@ private:
|
||||
std::string _query;
|
||||
std::string _fragment;
|
||||
|
||||
bool _disable_url_encoding = false;
|
||||
bool _enable_url_encoding = true;
|
||||
};
|
||||
|
||||
|
||||
|
@ -36,8 +36,8 @@ URI::URI():
|
||||
}
|
||||
|
||||
|
||||
URI::URI(const std::string& uri, bool decode_and_encode_path):
|
||||
_port(0), _disable_url_encoding(decode_and_encode_path)
|
||||
URI::URI(const std::string& uri, bool enable_url_encoding):
|
||||
_port(0), _enable_url_encoding(enable_url_encoding)
|
||||
{
|
||||
parse(uri);
|
||||
}
|
||||
@ -108,7 +108,7 @@ URI::URI(const URI& uri):
|
||||
_path(uri._path),
|
||||
_query(uri._query),
|
||||
_fragment(uri._fragment),
|
||||
_disable_url_encoding(uri._disable_url_encoding)
|
||||
_enable_url_encoding(uri._enable_url_encoding)
|
||||
{
|
||||
}
|
||||
|
||||
@ -121,7 +121,7 @@ URI::URI(const URI& baseURI, const std::string& relativeURI):
|
||||
_path(baseURI._path),
|
||||
_query(baseURI._query),
|
||||
_fragment(baseURI._fragment),
|
||||
_disable_url_encoding(baseURI._disable_url_encoding)
|
||||
_enable_url_encoding(baseURI._enable_url_encoding)
|
||||
{
|
||||
resolve(relativeURI);
|
||||
}
|
||||
@ -153,7 +153,7 @@ URI& URI::operator = (const URI& uri)
|
||||
_path = uri._path;
|
||||
_query = uri._query;
|
||||
_fragment = uri._fragment;
|
||||
_disable_url_encoding = uri._disable_url_encoding;
|
||||
_enable_url_encoding = uri._enable_url_encoding;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
@ -184,7 +184,7 @@ void URI::swap(URI& uri)
|
||||
std::swap(_path, uri._path);
|
||||
std::swap(_query, uri._query);
|
||||
std::swap(_fragment, uri._fragment);
|
||||
std::swap(_disable_url_encoding, uri._disable_url_encoding);
|
||||
std::swap(_enable_url_encoding, uri._enable_url_encoding);
|
||||
}
|
||||
|
||||
|
||||
@ -687,18 +687,18 @@ void URI::decode(const std::string& str, std::string& decodedStr, bool plusAsSpa
|
||||
|
||||
void URI::encodePath(std::string & encodedStr) const
|
||||
{
|
||||
if (_disable_url_encoding)
|
||||
encodedStr = _path;
|
||||
else
|
||||
if (_enable_url_encoding)
|
||||
encode(_path, RESERVED_PATH, encodedStr);
|
||||
else
|
||||
encodedStr = _path;
|
||||
}
|
||||
|
||||
void URI::decodePath(const std::string & encodedStr)
|
||||
{
|
||||
if (_disable_url_encoding)
|
||||
_path = encodedStr;
|
||||
else
|
||||
if (_enable_url_encoding)
|
||||
decode(encodedStr, _path);
|
||||
else
|
||||
_path = encodedStr;
|
||||
}
|
||||
|
||||
bool URI::isWellKnownPort() const
|
||||
|
@ -161,5 +161,9 @@
|
||||
"docker/test/sqllogic": {
|
||||
"name": "clickhouse/sqllogic-test",
|
||||
"dependent": []
|
||||
},
|
||||
"docker/test/integration/nginx_dav": {
|
||||
"name": "clickhouse/nginx-dav",
|
||||
"dependent": []
|
||||
}
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ ENV MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1'
|
||||
RUN echo "en_US.UTF-8 UTF-8" > /etc/locale.gen && locale-gen en_US.UTF-8
|
||||
ENV LC_ALL en_US.UTF-8
|
||||
|
||||
ENV TZ=Europe/Moscow
|
||||
ENV TZ=Europe/Amsterdam
|
||||
RUN ln -snf "/usr/share/zoneinfo/$TZ" /etc/localtime && echo "$TZ" > /etc/timezone
|
||||
|
||||
CMD sleep 1
|
||||
|
@ -32,7 +32,7 @@ RUN mkdir -p /tmp/clickhouse-odbc-tmp \
|
||||
&& odbcinst -i -s -l -f /tmp/clickhouse-odbc-tmp/share/doc/clickhouse-odbc/config/odbc.ini.sample \
|
||||
&& rm -rf /tmp/clickhouse-odbc-tmp
|
||||
|
||||
ENV TZ=Europe/Moscow
|
||||
ENV TZ=Europe/Amsterdam
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
ENV COMMIT_SHA=''
|
||||
|
@ -8,7 +8,7 @@ ARG apt_archive="http://archive.ubuntu.com"
|
||||
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
|
||||
|
||||
ENV LANG=C.UTF-8
|
||||
ENV TZ=Europe/Moscow
|
||||
ENV TZ=Europe/Amsterdam
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
RUN apt-get update \
|
||||
|
6
docker/test/integration/nginx_dav/Dockerfile
Normal file
6
docker/test/integration/nginx_dav/Dockerfile
Normal file
@ -0,0 +1,6 @@
|
||||
FROM nginx:alpine-slim
|
||||
|
||||
COPY default.conf /etc/nginx/conf.d/
|
||||
|
||||
RUN mkdir /usr/share/nginx/files/ \
|
||||
&& chown nginx: /usr/share/nginx/files/ -R
|
25
docker/test/integration/nginx_dav/default.conf
Normal file
25
docker/test/integration/nginx_dav/default.conf
Normal file
@ -0,0 +1,25 @@
|
||||
server {
|
||||
listen 80;
|
||||
|
||||
#root /usr/share/nginx/test.com;
|
||||
index index.html index.htm;
|
||||
|
||||
server_name test.com localhost;
|
||||
|
||||
location / {
|
||||
expires max;
|
||||
root /usr/share/nginx/files;
|
||||
client_max_body_size 20m;
|
||||
client_body_temp_path /usr/share/nginx/tmp;
|
||||
dav_methods PUT; # Allowed methods, only PUT is necessary
|
||||
|
||||
create_full_put_path on; # nginx automatically creates nested directories
|
||||
dav_access user:rw group:r all:r; # access permissions for files
|
||||
|
||||
limit_except GET {
|
||||
allow all;
|
||||
}
|
||||
}
|
||||
|
||||
error_page 405 =200 $uri;
|
||||
}
|
@ -1,16 +1,15 @@
|
||||
version: '2.3'
|
||||
services:
|
||||
meili1:
|
||||
image: getmeili/meilisearch:v0.27.0
|
||||
image: getmeili/meilisearch:v0.27.0
|
||||
restart: always
|
||||
ports:
|
||||
- ${MEILI_EXTERNAL_PORT:-7700}:${MEILI_INTERNAL_PORT:-7700}
|
||||
|
||||
meili_secure:
|
||||
image: getmeili/meilisearch:v0.27.0
|
||||
image: getmeili/meilisearch:v0.27.0
|
||||
restart: always
|
||||
ports:
|
||||
- ${MEILI_SECURE_EXTERNAL_PORT:-7700}:${MEILI_SECURE_INTERNAL_PORT:-7700}
|
||||
environment:
|
||||
MEILI_MASTER_KEY: "password"
|
||||
|
||||
|
@ -9,10 +9,10 @@ services:
|
||||
DATADIR: /mysql/
|
||||
expose:
|
||||
- ${MYSQL_PORT:-3306}
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-1.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-1.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
--enforce-gtid-consistency
|
||||
--log-error-verbosity=3
|
||||
--log-error=/mysql/error.log
|
||||
@ -21,4 +21,4 @@ services:
|
||||
volumes:
|
||||
- type: ${MYSQL_LOGS_FS:-tmpfs}
|
||||
source: ${MYSQL_LOGS:-}
|
||||
target: /mysql/
|
||||
target: /mysql/
|
||||
|
@ -9,9 +9,9 @@ services:
|
||||
DATADIR: /mysql/
|
||||
expose:
|
||||
- ${MYSQL8_PORT:-3306}
|
||||
command: --server_id=100 --log-bin='mysql-bin-1.log'
|
||||
--default_authentication_plugin='mysql_native_password'
|
||||
--default-time-zone='+3:00' --gtid-mode="ON"
|
||||
command: --server_id=100 --log-bin='mysql-bin-1.log'
|
||||
--default_authentication_plugin='mysql_native_password'
|
||||
--default-time-zone='+3:00' --gtid-mode="ON"
|
||||
--enforce-gtid-consistency
|
||||
--log-error-verbosity=3
|
||||
--log-error=/mysql/error.log
|
||||
@ -20,4 +20,4 @@ services:
|
||||
volumes:
|
||||
- type: ${MYSQL8_LOGS_FS:-tmpfs}
|
||||
source: ${MYSQL8_LOGS:-}
|
||||
target: /mysql/
|
||||
target: /mysql/
|
||||
|
@ -9,10 +9,10 @@ services:
|
||||
DATADIR: /mysql/
|
||||
expose:
|
||||
- ${MYSQL_CLUSTER_PORT:-3306}
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-2.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-2.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
--enforce-gtid-consistency
|
||||
--log-error-verbosity=3
|
||||
--log-error=/mysql/2_error.log
|
||||
@ -31,10 +31,10 @@ services:
|
||||
DATADIR: /mysql/
|
||||
expose:
|
||||
- ${MYSQL_CLUSTER_PORT:-3306}
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-3.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-3.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
--enforce-gtid-consistency
|
||||
--log-error-verbosity=3
|
||||
--log-error=/mysql/3_error.log
|
||||
@ -53,10 +53,10 @@ services:
|
||||
DATADIR: /mysql/
|
||||
expose:
|
||||
- ${MYSQL_CLUSTER_PORT:-3306}
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-4.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-4.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
--enforce-gtid-consistency
|
||||
--log-error-verbosity=3
|
||||
--log-error=/mysql/4_error.log
|
||||
@ -65,4 +65,4 @@ services:
|
||||
volumes:
|
||||
- type: ${MYSQL_CLUSTER_LOGS_FS:-tmpfs}
|
||||
source: ${MYSQL_CLUSTER_LOGS:-}
|
||||
target: /mysql/
|
||||
target: /mysql/
|
||||
|
@ -5,7 +5,7 @@ services:
|
||||
# Files will be put into /usr/share/nginx/files.
|
||||
|
||||
nginx:
|
||||
image: kssenii/nginx-test:1.1
|
||||
image: clickhouse/nginx-dav:${DOCKER_NGINX_DAV_TAG:-latest}
|
||||
restart: always
|
||||
ports:
|
||||
- 80:80
|
||||
|
@ -12,9 +12,9 @@ services:
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
networks:
|
||||
default:
|
||||
aliases:
|
||||
- postgre-sql.local
|
||||
default:
|
||||
aliases:
|
||||
- postgre-sql.local
|
||||
environment:
|
||||
POSTGRES_HOST_AUTH_METHOD: "trust"
|
||||
POSTGRES_PASSWORD: mysecretpassword
|
||||
|
@ -12,7 +12,7 @@ services:
|
||||
command: ["zkServer.sh", "start-foreground"]
|
||||
entrypoint: /zookeeper-ssl-entrypoint.sh
|
||||
volumes:
|
||||
- type: bind
|
||||
- type: bind
|
||||
source: /misc/zookeeper-ssl-entrypoint.sh
|
||||
target: /zookeeper-ssl-entrypoint.sh
|
||||
- type: bind
|
||||
@ -37,7 +37,7 @@ services:
|
||||
command: ["zkServer.sh", "start-foreground"]
|
||||
entrypoint: /zookeeper-ssl-entrypoint.sh
|
||||
volumes:
|
||||
- type: bind
|
||||
- type: bind
|
||||
source: /misc/zookeeper-ssl-entrypoint.sh
|
||||
target: /zookeeper-ssl-entrypoint.sh
|
||||
- type: bind
|
||||
@ -61,7 +61,7 @@ services:
|
||||
command: ["zkServer.sh", "start-foreground"]
|
||||
entrypoint: /zookeeper-ssl-entrypoint.sh
|
||||
volumes:
|
||||
- type: bind
|
||||
- type: bind
|
||||
source: /misc/zookeeper-ssl-entrypoint.sh
|
||||
target: /zookeeper-ssl-entrypoint.sh
|
||||
- type: bind
|
||||
|
@ -64,15 +64,16 @@ export CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH=/clickhouse-odbc-bridge
|
||||
export CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH=/clickhouse-library-bridge
|
||||
|
||||
export DOCKER_BASE_TAG=${DOCKER_BASE_TAG:=latest}
|
||||
export DOCKER_HELPER_TAG=${DOCKER_HELPER_TAG:=latest}
|
||||
export DOCKER_MYSQL_GOLANG_CLIENT_TAG=${DOCKER_MYSQL_GOLANG_CLIENT_TAG:=latest}
|
||||
export DOCKER_DOTNET_CLIENT_TAG=${DOCKER_DOTNET_CLIENT_TAG:=latest}
|
||||
export DOCKER_HELPER_TAG=${DOCKER_HELPER_TAG:=latest}
|
||||
export DOCKER_KERBERIZED_HADOOP_TAG=${DOCKER_KERBERIZED_HADOOP_TAG:=latest}
|
||||
export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest}
|
||||
export DOCKER_MYSQL_GOLANG_CLIENT_TAG=${DOCKER_MYSQL_GOLANG_CLIENT_TAG:=latest}
|
||||
export DOCKER_MYSQL_JAVA_CLIENT_TAG=${DOCKER_MYSQL_JAVA_CLIENT_TAG:=latest}
|
||||
export DOCKER_MYSQL_JS_CLIENT_TAG=${DOCKER_MYSQL_JS_CLIENT_TAG:=latest}
|
||||
export DOCKER_MYSQL_PHP_CLIENT_TAG=${DOCKER_MYSQL_PHP_CLIENT_TAG:=latest}
|
||||
export DOCKER_NGINX_DAV_TAG=${DOCKER_NGINX_DAV_TAG:=latest}
|
||||
export DOCKER_POSTGRESQL_JAVA_CLIENT_TAG=${DOCKER_POSTGRESQL_JAVA_CLIENT_TAG:=latest}
|
||||
export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest}
|
||||
export DOCKER_KERBERIZED_HADOOP_TAG=${DOCKER_KERBERIZED_HADOOP_TAG:=latest}
|
||||
|
||||
cd /ClickHouse/tests/integration
|
||||
exec "$@"
|
||||
|
@ -11,7 +11,7 @@ ARG apt_archive="http://archive.ubuntu.com"
|
||||
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
|
||||
|
||||
ENV LANG=C.UTF-8
|
||||
ENV TZ=Europe/Moscow
|
||||
ENV TZ=Europe/Amsterdam
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
RUN apt-get update \
|
||||
|
@ -54,7 +54,7 @@ RUN mkdir -p /tmp/clickhouse-odbc-tmp \
|
||||
&& odbcinst -i -s -l -f /tmp/clickhouse-odbc-tmp/share/doc/clickhouse-odbc/config/odbc.ini.sample \
|
||||
&& rm -rf /tmp/clickhouse-odbc-tmp
|
||||
|
||||
ENV TZ=Europe/Moscow
|
||||
ENV TZ=Europe/Amsterdam
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
ENV NUM_TRIES=1
|
||||
|
@ -233,4 +233,10 @@ rowNumberInAllBlocks()
|
||||
LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv || echo "failure\tCannot parse test_results.tsv" > /test_output/check_status.tsv
|
||||
[ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv
|
||||
|
||||
# But OOMs in stress test are allowed
|
||||
if rg 'OOM in dmesg|Signal 9' /test_output/check_status.tsv
|
||||
then
|
||||
sed -i 's/failure/success/' /test_output/check_status.tsv
|
||||
fi
|
||||
|
||||
collect_core_dumps
|
||||
|
@ -231,4 +231,10 @@ rowNumberInAllBlocks()
|
||||
LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv || echo "failure\tCannot parse test_results.tsv" > /test_output/check_status.tsv
|
||||
[ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv
|
||||
|
||||
# But OOMs in stress test are allowed
|
||||
if rg 'OOM in dmesg|Signal 9' /test_output/check_status.tsv
|
||||
then
|
||||
sed -i 's/failure/success/' /test_output/check_status.tsv
|
||||
fi
|
||||
|
||||
collect_core_dumps
|
||||
|
@ -106,4 +106,4 @@ For partitioning by month, use the `toYYYYMM(date_column)` expression, where `da
|
||||
## Storage Settings {#storage-settings}
|
||||
|
||||
- [engine_url_skip_empty_files](/docs/en/operations/settings/settings.md#engine_url_skip_empty_files) - allows to skip empty files while reading. Disabled by default.
|
||||
- [disable_url_encoding](/docs/en/operations/settings/settings.md#disable_url_encoding) -allows to disable decoding/encoding path in uri. Disabled by default.
|
||||
- [enable_url_encoding](/docs/en/operations/settings/settings.md#enable_url_encoding) - allows to enable/disable decoding/encoding path in uri. Enabled by default.
|
||||
|
@ -84,6 +84,7 @@ The BACKUP and RESTORE statements take a list of DATABASE and TABLE names, a des
|
||||
- `password` for the file on disk
|
||||
- `base_backup`: the destination of the previous backup of this source. For example, `Disk('backups', '1.zip')`
|
||||
- `structure_only`: if enabled, allows to only backup or restore the CREATE statements without the data of tables
|
||||
- `s3_storage_class`: the storage class used for S3 backup. For example, `STANDARD`
|
||||
|
||||
### Usage examples
|
||||
|
||||
|
@ -2288,6 +2288,8 @@ This section contains the following parameters:
|
||||
- `session_timeout_ms` — Maximum timeout for the client session in milliseconds.
|
||||
- `operation_timeout_ms` — Maximum timeout for one operation in milliseconds.
|
||||
- `root` — The [znode](http://zookeeper.apache.org/doc/r3.5.5/zookeeperOver.html#Nodes+and+ephemeral+nodes) that is used as the root for znodes used by the ClickHouse server. Optional.
|
||||
- `fallback_session_lifetime.min` - If the first zookeeper host resolved by zookeeper_load_balancing strategy is unavailable, limit the lifetime of a zookeeper session to the fallback node. This is done for load-balancing purposes to avoid excessive load on one of zookeeper hosts. This setting sets the minimal duration of the fallback session. Set in seconds. Optional. Default is 3 hours.
|
||||
- `fallback_session_lifetime.max` - If the first zookeeper host resolved by zookeeper_load_balancing strategy is unavailable, limit the lifetime of a zookeeper session to the fallback node. This is done for load-balancing purposes to avoid excessive load on one of zookeeper hosts. This setting sets the maximum duration of the fallback session. Set in seconds. Optional. Default is 6 hours.
|
||||
- `identity` — User and password, that can be required by ZooKeeper to give access to requested znodes. Optional.
|
||||
- zookeeper_load_balancing - Specifies the algorithm of ZooKeeper node selection.
|
||||
* random - randomly selects one of ZooKeeper nodes.
|
||||
|
@ -327,3 +327,39 @@ The maximum amount of data consumed by temporary files on disk in bytes for all
|
||||
Zero means unlimited.
|
||||
|
||||
Default value: 0.
|
||||
|
||||
## max_sessions_for_user {#max-sessions-per-user}
|
||||
|
||||
Maximum number of simultaneous sessions per authenticated user to the ClickHouse server.
|
||||
|
||||
Example:
|
||||
|
||||
``` xml
|
||||
<profiles>
|
||||
<single_session_profile>
|
||||
<max_sessions_for_user>1</max_sessions_for_user>
|
||||
</single_session_profile>
|
||||
<two_sessions_profile>
|
||||
<max_sessions_for_user>2</max_sessions_for_user>
|
||||
</two_sessions_profile>
|
||||
<unlimited_sessions_profile>
|
||||
<max_sessions_for_user>0</max_sessions_for_user>
|
||||
</unlimited_sessions_profile>
|
||||
</profiles>
|
||||
<users>
|
||||
<!-- User Alice can connect to a ClickHouse server no more than once at a time. -->
|
||||
<Alice>
|
||||
<profile>single_session_user</profile>
|
||||
</Alice>
|
||||
<!-- User Bob can use 2 simultaneous sessions. -->
|
||||
<Bob>
|
||||
<profile>two_sessions_profile</profile>
|
||||
</Bob>
|
||||
<!-- User Charles can use arbitrarily many of simultaneous sessions. -->
|
||||
<Charles>
|
||||
<profile>unlimited_sessions_profile</profile>
|
||||
</Charles>
|
||||
</users>
|
||||
```
|
||||
|
||||
Default value: 0 (Infinite count of simultaneous sessions).
|
||||
|
@ -39,7 +39,7 @@ Example:
|
||||
<max_threads>8</max_threads>
|
||||
</default>
|
||||
|
||||
<!-- Settings for quries from the user interface -->
|
||||
<!-- Settings for queries from the user interface -->
|
||||
<web>
|
||||
<max_rows_to_read>1000000000</max_rows_to_read>
|
||||
<max_bytes_to_read>100000000000</max_bytes_to_read>
|
||||
@ -67,6 +67,8 @@ Example:
|
||||
<max_ast_depth>50</max_ast_depth>
|
||||
<max_ast_elements>100</max_ast_elements>
|
||||
|
||||
<max_sessions_for_user>4</max_sessions_for_user>
|
||||
|
||||
<readonly>1</readonly>
|
||||
</web>
|
||||
</profiles>
|
||||
|
@ -3468,11 +3468,11 @@ Possible values:
|
||||
|
||||
Default value: `0`.
|
||||
|
||||
## disable_url_encoding {#disable_url_encoding}
|
||||
## enable_url_encoding {#enable_url_encoding}
|
||||
|
||||
Allows to disable decoding/encoding path in uri in [URL](../../engines/table-engines/special/url.md) engine tables.
|
||||
Allows to enable/disable decoding/encoding path in uri in [URL](../../engines/table-engines/special/url.md) engine tables.
|
||||
|
||||
Disabled by default.
|
||||
Enabled by default.
|
||||
|
||||
## database_atomic_wait_for_drop_and_detach_synchronously {#database_atomic_wait_for_drop_and_detach_synchronously}
|
||||
|
||||
|
@ -140,8 +140,8 @@ Time shifts for multiple days. Some pacific islands changed their timezone offse
|
||||
- [Type conversion functions](../../sql-reference/functions/type-conversion-functions.md)
|
||||
- [Functions for working with dates and times](../../sql-reference/functions/date-time-functions.md)
|
||||
- [Functions for working with arrays](../../sql-reference/functions/array-functions.md)
|
||||
- [The `date_time_input_format` setting](../../operations/settings/settings.md#settings-date_time_input_format)
|
||||
- [The `date_time_output_format` setting](../../operations/settings/settings.md#settings-date_time_output_format)
|
||||
- [The `date_time_input_format` setting](../../operations/settings/settings-formats.md#settings-date_time_input_format)
|
||||
- [The `date_time_output_format` setting](../../operations/settings/settings-formats.md#settings-date_time_output_format)
|
||||
- [The `timezone` server configuration parameter](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone)
|
||||
- [The `session_timezone` setting](../../operations/settings/settings.md#session_timezone)
|
||||
- [Operators for working with dates and times](../../sql-reference/operators/index.md#operators-datetime)
|
||||
|
@ -56,7 +56,7 @@ Character `|` inside patterns is used to specify failover addresses. They are it
|
||||
## Storage Settings {#storage-settings}
|
||||
|
||||
- [engine_url_skip_empty_files](/docs/en/operations/settings/settings.md#engine_url_skip_empty_files) - allows to skip empty files while reading. Disabled by default.
|
||||
- [disable_url_encoding](/docs/en/operations/settings/settings.md#disable_url_encoding) - allows to disable decoding/encoding path in uri. Disabled by default.
|
||||
- [enable_url_encoding](/docs/en/operations/settings/settings.md#enable_url_encoding) - allows to enable/disable decoding/encoding path in uri. Enabled by default.
|
||||
|
||||
**See Also**
|
||||
|
||||
|
@ -314,3 +314,40 @@ FORMAT Null;
|
||||
При вставке данных, ClickHouse вычисляет количество партиций во вставленном блоке. Если число партиций больше, чем `max_partitions_per_insert_block`, ClickHouse генерирует исключение со следующим текстом:
|
||||
|
||||
> «Too many partitions for single INSERT block (more than» + toString(max_parts) + «). The limit is controlled by ‘max_partitions_per_insert_block’ setting. Large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).»
|
||||
|
||||
## max_sessions_for_user {#max-sessions-per-user}
|
||||
|
||||
Максимальное количество одновременных сессий на одного аутентифицированного пользователя.
|
||||
|
||||
Пример:
|
||||
|
||||
``` xml
|
||||
<profiles>
|
||||
<single_session_profile>
|
||||
<max_sessions_for_user>1</max_sessions_for_user>
|
||||
</single_session_profile>
|
||||
<two_sessions_profile>
|
||||
<max_sessions_for_user>2</max_sessions_for_user>
|
||||
</two_sessions_profile>
|
||||
<unlimited_sessions_profile>
|
||||
<max_sessions_for_user>0</max_sessions_for_user>
|
||||
</unlimited_sessions_profile>
|
||||
</profiles>
|
||||
<users>
|
||||
<!-- Пользователь Alice может одновременно подключаться не
|
||||
более одного раза к серверу ClickHouse. -->
|
||||
<Alice>
|
||||
<profile>single_session_profile</profile>
|
||||
</Alice>
|
||||
<!-- Пользователь Bob может использовать 2 одновременных сессии. -->
|
||||
<Bob>
|
||||
<profile>two_sessions_profile</profile>
|
||||
</Bob>
|
||||
<!-- Пользователь Charles может иметь любое количество одновременных сессий. -->
|
||||
<Charles>
|
||||
<profile>unlimited_sessions_profile</profile>
|
||||
</Charles>
|
||||
</users>
|
||||
```
|
||||
|
||||
Значение по умолчанию: 0 (неограниченное количество сессий).
|
||||
|
@ -39,7 +39,7 @@ SET profile = 'web'
|
||||
<max_threads>8</max_threads>
|
||||
</default>
|
||||
|
||||
<!-- Settings for quries from the user interface -->
|
||||
<!-- Settings for queries from the user interface -->
|
||||
<web>
|
||||
<max_rows_to_read>1000000000</max_rows_to_read>
|
||||
<max_bytes_to_read>100000000000</max_bytes_to_read>
|
||||
@ -67,6 +67,7 @@ SET profile = 'web'
|
||||
<max_ast_depth>50</max_ast_depth>
|
||||
<max_ast_elements>100</max_ast_elements>
|
||||
|
||||
<max_sessions_for_user>4</max_sessions_for_user>
|
||||
<readonly>1</readonly>
|
||||
</web>
|
||||
</profiles>
|
||||
|
@ -2045,27 +2045,26 @@ void Server::createServers(
|
||||
|
||||
for (const auto & protocol : protocols)
|
||||
{
|
||||
if (!server_type.shouldStart(ServerType::Type::CUSTOM, protocol))
|
||||
std::string prefix = "protocols." + protocol + ".";
|
||||
std::string port_name = prefix + "port";
|
||||
std::string description {"<undefined> protocol"};
|
||||
if (config.has(prefix + "description"))
|
||||
description = config.getString(prefix + "description");
|
||||
|
||||
if (!config.has(prefix + "port"))
|
||||
continue;
|
||||
|
||||
if (!server_type.shouldStart(ServerType::Type::CUSTOM, port_name))
|
||||
continue;
|
||||
|
||||
std::vector<std::string> hosts;
|
||||
if (config.has("protocols." + protocol + ".host"))
|
||||
hosts.push_back(config.getString("protocols." + protocol + ".host"));
|
||||
if (config.has(prefix + "host"))
|
||||
hosts.push_back(config.getString(prefix + "host"));
|
||||
else
|
||||
hosts = listen_hosts;
|
||||
|
||||
for (const auto & host : hosts)
|
||||
{
|
||||
std::string conf_name = "protocols." + protocol;
|
||||
std::string prefix = conf_name + ".";
|
||||
|
||||
if (!config.has(prefix + "port"))
|
||||
continue;
|
||||
|
||||
std::string description {"<undefined> protocol"};
|
||||
if (config.has(prefix + "description"))
|
||||
description = config.getString(prefix + "description");
|
||||
std::string port_name = prefix + "port";
|
||||
bool is_secure = false;
|
||||
auto stack = buildProtocolStackFromConfig(config, protocol, http_params, async_metrics, is_secure);
|
||||
|
||||
|
@ -328,9 +328,6 @@ void ContextAccess::setRolesInfo(const std::shared_ptr<const EnabledRolesInfo> &
|
||||
|
||||
enabled_row_policies = access_control->getEnabledRowPolicies(*params.user_id, roles_info->enabled_roles);
|
||||
|
||||
enabled_quota = access_control->getEnabledQuota(
|
||||
*params.user_id, user_name, roles_info->enabled_roles, params.address, params.forwarded_address, params.quota_key);
|
||||
|
||||
enabled_settings = access_control->getEnabledSettings(
|
||||
*params.user_id, user->settings, roles_info->enabled_roles, roles_info->settings_from_enabled_roles);
|
||||
|
||||
@ -416,19 +413,32 @@ RowPolicyFilterPtr ContextAccess::getRowPolicyFilter(const String & database, co
|
||||
std::shared_ptr<const EnabledQuota> ContextAccess::getQuota() const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
if (enabled_quota)
|
||||
return enabled_quota;
|
||||
static const auto unlimited_quota = EnabledQuota::getUnlimitedQuota();
|
||||
return unlimited_quota;
|
||||
|
||||
if (!enabled_quota)
|
||||
{
|
||||
if (roles_info)
|
||||
{
|
||||
enabled_quota = access_control->getEnabledQuota(*params.user_id,
|
||||
user_name,
|
||||
roles_info->enabled_roles,
|
||||
params.address,
|
||||
params.forwarded_address,
|
||||
params.quota_key);
|
||||
}
|
||||
else
|
||||
{
|
||||
static const auto unlimited_quota = EnabledQuota::getUnlimitedQuota();
|
||||
return unlimited_quota;
|
||||
}
|
||||
}
|
||||
|
||||
return enabled_quota;
|
||||
}
|
||||
|
||||
|
||||
std::optional<QuotaUsage> ContextAccess::getQuotaUsage() const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
if (enabled_quota)
|
||||
return enabled_quota->getUsage();
|
||||
return {};
|
||||
return getQuota()->getUsage();
|
||||
}
|
||||
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <string_view>
|
||||
#include <unordered_map>
|
||||
#include <Access/SettingsConstraints.h>
|
||||
#include <Access/resolveSetting.h>
|
||||
#include <Access/AccessControl.h>
|
||||
@ -6,6 +7,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
#include <Common/FieldVisitorToString.h>
|
||||
#include <Common/FieldVisitorsAccurateComparison.h>
|
||||
#include <Common/SettingSource.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <boost/range/algorithm_ext/erase.hpp>
|
||||
@ -20,6 +22,39 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_SETTING;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
struct SettingSourceRestrictions
|
||||
{
|
||||
constexpr SettingSourceRestrictions() { allowed_sources.set(); }
|
||||
|
||||
constexpr SettingSourceRestrictions(std::initializer_list<SettingSource> allowed_sources_)
|
||||
{
|
||||
for (auto allowed_source : allowed_sources_)
|
||||
setSourceAllowed(allowed_source, true);
|
||||
}
|
||||
|
||||
constexpr bool isSourceAllowed(SettingSource source) { return allowed_sources[source]; }
|
||||
constexpr void setSourceAllowed(SettingSource source, bool allowed) { allowed_sources[source] = allowed; }
|
||||
|
||||
std::bitset<SettingSource::COUNT> allowed_sources;
|
||||
};
|
||||
|
||||
const std::unordered_map<std::string_view, SettingSourceRestrictions> SETTINGS_SOURCE_RESTRICTIONS = {
|
||||
{"max_sessions_for_user", {SettingSource::PROFILE}},
|
||||
};
|
||||
|
||||
SettingSourceRestrictions getSettingSourceRestrictions(std::string_view name)
|
||||
{
|
||||
auto settingConstraintIter = SETTINGS_SOURCE_RESTRICTIONS.find(name);
|
||||
if (settingConstraintIter != SETTINGS_SOURCE_RESTRICTIONS.end())
|
||||
return settingConstraintIter->second;
|
||||
else
|
||||
return SettingSourceRestrictions(); // allows everything
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
SettingsConstraints::SettingsConstraints(const AccessControl & access_control_) : access_control(&access_control_)
|
||||
{
|
||||
}
|
||||
@ -98,7 +133,7 @@ void SettingsConstraints::merge(const SettingsConstraints & other)
|
||||
}
|
||||
|
||||
|
||||
void SettingsConstraints::check(const Settings & current_settings, const SettingsProfileElements & profile_elements) const
|
||||
void SettingsConstraints::check(const Settings & current_settings, const SettingsProfileElements & profile_elements, SettingSource source) const
|
||||
{
|
||||
for (const auto & element : profile_elements)
|
||||
{
|
||||
@ -108,19 +143,19 @@ void SettingsConstraints::check(const Settings & current_settings, const Setting
|
||||
if (element.value)
|
||||
{
|
||||
SettingChange value(element.setting_name, *element.value);
|
||||
check(current_settings, value);
|
||||
check(current_settings, value, source);
|
||||
}
|
||||
|
||||
if (element.min_value)
|
||||
{
|
||||
SettingChange value(element.setting_name, *element.min_value);
|
||||
check(current_settings, value);
|
||||
check(current_settings, value, source);
|
||||
}
|
||||
|
||||
if (element.max_value)
|
||||
{
|
||||
SettingChange value(element.setting_name, *element.max_value);
|
||||
check(current_settings, value);
|
||||
check(current_settings, value, source);
|
||||
}
|
||||
|
||||
SettingConstraintWritability new_value = SettingConstraintWritability::WRITABLE;
|
||||
@ -142,24 +177,24 @@ void SettingsConstraints::check(const Settings & current_settings, const Setting
|
||||
}
|
||||
}
|
||||
|
||||
void SettingsConstraints::check(const Settings & current_settings, const SettingChange & change) const
|
||||
void SettingsConstraints::check(const Settings & current_settings, const SettingChange & change, SettingSource source) const
|
||||
{
|
||||
checkImpl(current_settings, const_cast<SettingChange &>(change), THROW_ON_VIOLATION);
|
||||
checkImpl(current_settings, const_cast<SettingChange &>(change), THROW_ON_VIOLATION, source);
|
||||
}
|
||||
|
||||
void SettingsConstraints::check(const Settings & current_settings, const SettingsChanges & changes) const
|
||||
void SettingsConstraints::check(const Settings & current_settings, const SettingsChanges & changes, SettingSource source) const
|
||||
{
|
||||
for (const auto & change : changes)
|
||||
check(current_settings, change);
|
||||
check(current_settings, change, source);
|
||||
}
|
||||
|
||||
void SettingsConstraints::check(const Settings & current_settings, SettingsChanges & changes) const
|
||||
void SettingsConstraints::check(const Settings & current_settings, SettingsChanges & changes, SettingSource source) const
|
||||
{
|
||||
boost::range::remove_erase_if(
|
||||
changes,
|
||||
[&](SettingChange & change) -> bool
|
||||
{
|
||||
return !checkImpl(current_settings, const_cast<SettingChange &>(change), THROW_ON_VIOLATION);
|
||||
return !checkImpl(current_settings, const_cast<SettingChange &>(change), THROW_ON_VIOLATION, source);
|
||||
});
|
||||
}
|
||||
|
||||
@ -174,13 +209,13 @@ void SettingsConstraints::check(const MergeTreeSettings & current_settings, cons
|
||||
check(current_settings, change);
|
||||
}
|
||||
|
||||
void SettingsConstraints::clamp(const Settings & current_settings, SettingsChanges & changes) const
|
||||
void SettingsConstraints::clamp(const Settings & current_settings, SettingsChanges & changes, SettingSource source) const
|
||||
{
|
||||
boost::range::remove_erase_if(
|
||||
changes,
|
||||
[&](SettingChange & change) -> bool
|
||||
{
|
||||
return !checkImpl(current_settings, change, CLAMP_ON_VIOLATION);
|
||||
return !checkImpl(current_settings, change, CLAMP_ON_VIOLATION, source);
|
||||
});
|
||||
}
|
||||
|
||||
@ -215,7 +250,10 @@ bool getNewValueToCheck(const T & current_settings, SettingChange & change, Fiel
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SettingsConstraints::checkImpl(const Settings & current_settings, SettingChange & change, ReactionOnViolation reaction) const
|
||||
bool SettingsConstraints::checkImpl(const Settings & current_settings,
|
||||
SettingChange & change,
|
||||
ReactionOnViolation reaction,
|
||||
SettingSource source) const
|
||||
{
|
||||
std::string_view setting_name = Settings::Traits::resolveName(change.name);
|
||||
|
||||
@ -247,7 +285,7 @@ bool SettingsConstraints::checkImpl(const Settings & current_settings, SettingCh
|
||||
if (!getNewValueToCheck(current_settings, change, new_value, reaction == THROW_ON_VIOLATION))
|
||||
return false;
|
||||
|
||||
return getChecker(current_settings, setting_name).check(change, new_value, reaction);
|
||||
return getChecker(current_settings, setting_name).check(change, new_value, reaction, source);
|
||||
}
|
||||
|
||||
bool SettingsConstraints::checkImpl(const MergeTreeSettings & current_settings, SettingChange & change, ReactionOnViolation reaction) const
|
||||
@ -255,10 +293,13 @@ bool SettingsConstraints::checkImpl(const MergeTreeSettings & current_settings,
|
||||
Field new_value;
|
||||
if (!getNewValueToCheck(current_settings, change, new_value, reaction == THROW_ON_VIOLATION))
|
||||
return false;
|
||||
return getMergeTreeChecker(change.name).check(change, new_value, reaction);
|
||||
return getMergeTreeChecker(change.name).check(change, new_value, reaction, SettingSource::QUERY);
|
||||
}
|
||||
|
||||
bool SettingsConstraints::Checker::check(SettingChange & change, const Field & new_value, ReactionOnViolation reaction) const
|
||||
bool SettingsConstraints::Checker::check(SettingChange & change,
|
||||
const Field & new_value,
|
||||
ReactionOnViolation reaction,
|
||||
SettingSource source) const
|
||||
{
|
||||
if (!explain.empty())
|
||||
{
|
||||
@ -326,6 +367,14 @@ bool SettingsConstraints::Checker::check(SettingChange & change, const Field & n
|
||||
change.value = max_value;
|
||||
}
|
||||
|
||||
if (!getSettingSourceRestrictions(setting_name).isSourceAllowed(source))
|
||||
{
|
||||
if (reaction == THROW_ON_VIOLATION)
|
||||
throw Exception(ErrorCodes::READONLY, "Setting {} is not allowed to be set by {}", setting_name, toString(source));
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Access/SettingsProfileElement.h>
|
||||
#include <Common/SettingsChanges.h>
|
||||
#include <Common/SettingSource.h>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace Poco::Util
|
||||
@ -73,17 +74,18 @@ public:
|
||||
void merge(const SettingsConstraints & other);
|
||||
|
||||
/// Checks whether `change` violates these constraints and throws an exception if so.
|
||||
void check(const Settings & current_settings, const SettingsProfileElements & profile_elements) const;
|
||||
void check(const Settings & current_settings, const SettingChange & change) const;
|
||||
void check(const Settings & current_settings, const SettingsChanges & changes) const;
|
||||
void check(const Settings & current_settings, SettingsChanges & changes) const;
|
||||
void check(const Settings & current_settings, const SettingsProfileElements & profile_elements, SettingSource source) const;
|
||||
void check(const Settings & current_settings, const SettingChange & change, SettingSource source) const;
|
||||
void check(const Settings & current_settings, const SettingsChanges & changes, SettingSource source) const;
|
||||
void check(const Settings & current_settings, SettingsChanges & changes, SettingSource source) const;
|
||||
|
||||
/// Checks whether `change` violates these constraints and throws an exception if so. (setting short name is expected inside `changes`)
|
||||
void check(const MergeTreeSettings & current_settings, const SettingChange & change) const;
|
||||
void check(const MergeTreeSettings & current_settings, const SettingsChanges & changes) const;
|
||||
|
||||
/// Checks whether `change` violates these and clamps the `change` if so.
|
||||
void clamp(const Settings & current_settings, SettingsChanges & changes) const;
|
||||
void clamp(const Settings & current_settings, SettingsChanges & changes, SettingSource source) const;
|
||||
|
||||
|
||||
friend bool operator ==(const SettingsConstraints & left, const SettingsConstraints & right);
|
||||
friend bool operator !=(const SettingsConstraints & left, const SettingsConstraints & right) { return !(left == right); }
|
||||
@ -133,7 +135,10 @@ private:
|
||||
{}
|
||||
|
||||
// Perform checking
|
||||
bool check(SettingChange & change, const Field & new_value, ReactionOnViolation reaction) const;
|
||||
bool check(SettingChange & change,
|
||||
const Field & new_value,
|
||||
ReactionOnViolation reaction,
|
||||
SettingSource source) const;
|
||||
};
|
||||
|
||||
struct StringHash
|
||||
@ -145,7 +150,11 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
bool checkImpl(const Settings & current_settings, SettingChange & change, ReactionOnViolation reaction) const;
|
||||
bool checkImpl(const Settings & current_settings,
|
||||
SettingChange & change,
|
||||
ReactionOnViolation reaction,
|
||||
SettingSource source) const;
|
||||
|
||||
bool checkImpl(const MergeTreeSettings & current_settings, SettingChange & change, ReactionOnViolation reaction) const;
|
||||
|
||||
Checker getChecker(const Settings & current_settings, std::string_view setting_name) const;
|
||||
|
@ -0,0 +1,221 @@
|
||||
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
|
||||
|
||||
#include <Functions/FunctionFactory.h>
|
||||
|
||||
#include <Analyzer/InDepthQueryTreeVisitor.h>
|
||||
#include <Analyzer/ColumnNode.h>
|
||||
#include <Analyzer/ConstantNode.h>
|
||||
#include <Analyzer/FunctionNode.h>
|
||||
#include <Common/DateLUT.h>
|
||||
#include <Common/DateLUTImpl.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
class OptimizeDateOrDateTimeConverterWithPreimageVisitor : public InDepthQueryTreeVisitorWithContext<OptimizeDateOrDateTimeConverterWithPreimageVisitor>
|
||||
{
|
||||
public:
|
||||
using Base = InDepthQueryTreeVisitorWithContext<OptimizeDateOrDateTimeConverterWithPreimageVisitor>;
|
||||
|
||||
explicit OptimizeDateOrDateTimeConverterWithPreimageVisitor(ContextPtr context)
|
||||
: Base(std::move(context))
|
||||
{}
|
||||
|
||||
static bool needChildVisit(QueryTreeNodePtr & node, QueryTreeNodePtr & /*child*/)
|
||||
{
|
||||
const static std::unordered_set<String> relations = {
|
||||
"equals",
|
||||
"notEquals",
|
||||
"less",
|
||||
"greater",
|
||||
"lessOrEquals",
|
||||
"greaterOrEquals",
|
||||
};
|
||||
|
||||
if (const auto * function = node->as<FunctionNode>())
|
||||
{
|
||||
return !relations.contains(function->getFunctionName());
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void visitImpl(QueryTreeNodePtr & node) const
|
||||
{
|
||||
const static std::unordered_map<String, String> swap_relations = {
|
||||
{"equals", "equals"},
|
||||
{"notEquals", "notEquals"},
|
||||
{"less", "greater"},
|
||||
{"greater", "less"},
|
||||
{"lessOrEquals", "greaterOrEquals"},
|
||||
{"greaterOrEquals", "lessOrEquals"},
|
||||
};
|
||||
|
||||
const auto * function = node->as<FunctionNode>();
|
||||
|
||||
if (!function || !swap_relations.contains(function->getFunctionName())) return;
|
||||
|
||||
if (function->getArguments().getNodes().size() != 2) return;
|
||||
|
||||
size_t func_id = function->getArguments().getNodes().size();
|
||||
|
||||
for (size_t i = 0; i < function->getArguments().getNodes().size(); i++)
|
||||
{
|
||||
if (const auto * func = function->getArguments().getNodes()[i]->as<FunctionNode>())
|
||||
{
|
||||
func_id = i;
|
||||
}
|
||||
}
|
||||
|
||||
if (func_id == function->getArguments().getNodes().size()) return;
|
||||
|
||||
size_t literal_id = 1 - func_id;
|
||||
const auto * literal = function->getArguments().getNodes()[literal_id]->as<ConstantNode>();
|
||||
|
||||
if (!literal || literal->getValue().getType() != Field::Types::UInt64) return;
|
||||
|
||||
String comparator = literal_id > func_id ? function->getFunctionName(): swap_relations.at(function->getFunctionName());
|
||||
|
||||
const auto * func_node = function->getArguments().getNodes()[func_id]->as<FunctionNode>();
|
||||
/// Currently we only handle single-argument functions.
|
||||
if (!func_node || func_node->getArguments().getNodes().size() != 1) return;
|
||||
|
||||
const auto * column_id = func_node->getArguments().getNodes()[0]->as<ColumnNode>();
|
||||
if (!column_id) return;
|
||||
|
||||
const auto * column_type = column_id->getColumnType().get();
|
||||
if (!isDateOrDate32(column_type) && !isDateTime(column_type) && !isDateTime64(column_type)) return;
|
||||
|
||||
const auto & converter = FunctionFactory::instance().tryGet(func_node->getFunctionName(), getContext());
|
||||
if (!converter) return;
|
||||
|
||||
ColumnsWithTypeAndName args;
|
||||
args.emplace_back(column_id->getColumnType(), "tmp");
|
||||
auto converter_base = converter->build(args);
|
||||
if (!converter_base || !converter_base->hasInformationAboutPreimage()) return;
|
||||
|
||||
auto preimage_range = converter_base->getPreimage(*(column_id->getColumnType()), literal->getValue());
|
||||
if (!preimage_range) return;
|
||||
|
||||
const auto new_node = generateOptimizedDateFilter(comparator, *column_id, *preimage_range);
|
||||
|
||||
if (!new_node) return;
|
||||
|
||||
node = new_node;
|
||||
}
|
||||
|
||||
private:
|
||||
QueryTreeNodePtr generateOptimizedDateFilter(const String & comparator, const ColumnNode & column_node, const std::pair<Field, Field>& range) const
|
||||
{
|
||||
const DateLUTImpl & date_lut = DateLUT::instance("UTC");
|
||||
|
||||
String start_date_or_date_time;
|
||||
String end_date_or_date_time;
|
||||
|
||||
if (isDateOrDate32(column_node.getColumnType().get()))
|
||||
{
|
||||
start_date_or_date_time = date_lut.dateToString(range.first.get<DateLUTImpl::Time>());
|
||||
end_date_or_date_time = date_lut.dateToString(range.second.get<DateLUTImpl::Time>());
|
||||
}
|
||||
else if (isDateTime(column_node.getColumnType().get()) || isDateTime64(column_node.getColumnType().get()))
|
||||
{
|
||||
start_date_or_date_time = date_lut.timeToString(range.first.get<DateLUTImpl::Time>());
|
||||
end_date_or_date_time = date_lut.timeToString(range.second.get<DateLUTImpl::Time>());
|
||||
}
|
||||
else [[unlikely]] return {};
|
||||
|
||||
if (comparator == "equals")
|
||||
{
|
||||
const auto lhs = std::make_shared<FunctionNode>("greaterOrEquals");
|
||||
lhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
|
||||
lhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(start_date_or_date_time));
|
||||
resolveOrdinaryFunctionNode(*lhs, lhs->getFunctionName());
|
||||
|
||||
const auto rhs = std::make_shared<FunctionNode>("less");
|
||||
rhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
|
||||
rhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
|
||||
resolveOrdinaryFunctionNode(*rhs, rhs->getFunctionName());
|
||||
|
||||
const auto new_date_filter = std::make_shared<FunctionNode>("and");
|
||||
new_date_filter->getArguments().getNodes() = {lhs, rhs};
|
||||
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
|
||||
|
||||
return new_date_filter;
|
||||
}
|
||||
else if (comparator == "notEquals")
|
||||
{
|
||||
const auto lhs = std::make_shared<FunctionNode>("less");
|
||||
lhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
|
||||
lhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(start_date_or_date_time));
|
||||
resolveOrdinaryFunctionNode(*lhs, lhs->getFunctionName());
|
||||
|
||||
const auto rhs = std::make_shared<FunctionNode>("greaterOrEquals");
|
||||
rhs->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
|
||||
rhs->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
|
||||
resolveOrdinaryFunctionNode(*rhs, rhs->getFunctionName());
|
||||
|
||||
const auto new_date_filter = std::make_shared<FunctionNode>("or");
|
||||
new_date_filter->getArguments().getNodes() = {lhs, rhs};
|
||||
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
|
||||
|
||||
return new_date_filter;
|
||||
}
|
||||
else if (comparator == "greater")
|
||||
{
|
||||
const auto new_date_filter = std::make_shared<FunctionNode>("greaterOrEquals");
|
||||
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
|
||||
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
|
||||
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
|
||||
|
||||
return new_date_filter;
|
||||
}
|
||||
else if (comparator == "lessOrEquals")
|
||||
{
|
||||
const auto new_date_filter = std::make_shared<FunctionNode>("less");
|
||||
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
|
||||
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(end_date_or_date_time));
|
||||
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
|
||||
|
||||
return new_date_filter;
|
||||
}
|
||||
else if (comparator == "less" || comparator == "greaterOrEquals")
|
||||
{
|
||||
const auto new_date_filter = std::make_shared<FunctionNode>(comparator);
|
||||
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ColumnNode>(column_node.getColumn(), column_node.getColumnSource()));
|
||||
new_date_filter->getArguments().getNodes().push_back(std::make_shared<ConstantNode>(start_date_or_date_time));
|
||||
resolveOrdinaryFunctionNode(*new_date_filter, new_date_filter->getFunctionName());
|
||||
|
||||
return new_date_filter;
|
||||
}
|
||||
else [[unlikely]]
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Expected equals, notEquals, less, lessOrEquals, greater, greaterOrEquals. Actual {}",
|
||||
comparator);
|
||||
}
|
||||
}
|
||||
|
||||
void resolveOrdinaryFunctionNode(FunctionNode & function_node, const String & function_name) const
|
||||
{
|
||||
auto function = FunctionFactory::instance().get(function_name, getContext());
|
||||
function_node.resolveAsFunction(function->build(function_node.getArgumentColumns()));
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
void OptimizeDateOrDateTimeConverterWithPreimagePass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
|
||||
{
|
||||
OptimizeDateOrDateTimeConverterWithPreimageVisitor visitor(std::move(context));
|
||||
visitor.visit(query_tree_node);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
#pragma once
|
||||
|
||||
#include <Analyzer/IQueryTreePass.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Replace predicate having Date/DateTime converters with their preimages to improve performance.
|
||||
* Given a Date column c, toYear(c) = 2023 -> c >= '2023-01-01' AND c < '2024-01-01'
|
||||
* Or if c is a DateTime column, toYear(c) = 2023 -> c >= '2023-01-01 00:00:00' AND c < '2024-01-01 00:00:00'.
|
||||
* The similar optimization also applies to other converters.
|
||||
*/
|
||||
class OptimizeDateOrDateTimeConverterWithPreimagePass final : public IQueryTreePass
|
||||
{
|
||||
public:
|
||||
String getName() override { return "OptimizeDateOrDateTimeConverterWithPreimagePass"; }
|
||||
|
||||
String getDescription() override { return "Replace predicate having Date/DateTime converters with their preimages"; }
|
||||
|
||||
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
|
||||
|
||||
};
|
||||
|
||||
}
|
@ -6494,55 +6494,69 @@ void QueryAnalyzer::resolveArrayJoin(QueryTreeNodePtr & array_join_node, Identif
|
||||
|
||||
resolveExpressionNode(array_join_expression, scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
|
||||
|
||||
auto result_type = array_join_expression->getResultType();
|
||||
bool is_array_type = isArray(result_type);
|
||||
bool is_map_type = isMap(result_type);
|
||||
|
||||
if (!is_array_type && !is_map_type)
|
||||
throw Exception(ErrorCodes::TYPE_MISMATCH,
|
||||
"ARRAY JOIN {} requires expression {} with Array or Map type. Actual {}. In scope {}",
|
||||
array_join_node_typed.formatASTForErrorMessage(),
|
||||
array_join_expression->formatASTForErrorMessage(),
|
||||
result_type->getName(),
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
|
||||
if (is_map_type)
|
||||
result_type = assert_cast<const DataTypeMap &>(*result_type).getNestedType();
|
||||
|
||||
result_type = assert_cast<const DataTypeArray &>(*result_type).getNestedType();
|
||||
|
||||
String array_join_column_name;
|
||||
|
||||
if (!array_join_expression_alias.empty())
|
||||
auto process_array_join_expression = [&](QueryTreeNodePtr & expression)
|
||||
{
|
||||
array_join_column_name = array_join_expression_alias;
|
||||
}
|
||||
else if (auto * array_join_expression_inner_column = array_join_expression->as<ColumnNode>())
|
||||
auto result_type = expression->getResultType();
|
||||
bool is_array_type = isArray(result_type);
|
||||
bool is_map_type = isMap(result_type);
|
||||
|
||||
if (!is_array_type && !is_map_type)
|
||||
throw Exception(ErrorCodes::TYPE_MISMATCH,
|
||||
"ARRAY JOIN {} requires expression {} with Array or Map type. Actual {}. In scope {}",
|
||||
array_join_node_typed.formatASTForErrorMessage(),
|
||||
expression->formatASTForErrorMessage(),
|
||||
result_type->getName(),
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
|
||||
if (is_map_type)
|
||||
result_type = assert_cast<const DataTypeMap &>(*result_type).getNestedType();
|
||||
|
||||
result_type = assert_cast<const DataTypeArray &>(*result_type).getNestedType();
|
||||
|
||||
String array_join_column_name;
|
||||
|
||||
if (!array_join_expression_alias.empty())
|
||||
{
|
||||
array_join_column_name = array_join_expression_alias;
|
||||
}
|
||||
else if (auto * array_join_expression_inner_column = array_join_expression->as<ColumnNode>())
|
||||
{
|
||||
array_join_column_name = array_join_expression_inner_column->getColumnName();
|
||||
}
|
||||
else if (!identifier_full_name.empty())
|
||||
{
|
||||
array_join_column_name = identifier_full_name;
|
||||
}
|
||||
else
|
||||
{
|
||||
array_join_column_name = "__array_join_expression_" + std::to_string(array_join_expressions_counter);
|
||||
++array_join_expressions_counter;
|
||||
}
|
||||
|
||||
if (array_join_column_names.contains(array_join_column_name))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"ARRAY JOIN {} multiple columns with name {}. In scope {}",
|
||||
array_join_node_typed.formatASTForErrorMessage(),
|
||||
array_join_column_name,
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
array_join_column_names.emplace(array_join_column_name);
|
||||
|
||||
NameAndTypePair array_join_column(array_join_column_name, result_type);
|
||||
auto array_join_column_node = std::make_shared<ColumnNode>(std::move(array_join_column), expression, array_join_node);
|
||||
array_join_column_node->setAlias(array_join_expression_alias);
|
||||
array_join_column_expressions.push_back(std::move(array_join_column_node));
|
||||
};
|
||||
|
||||
// Support ARRAY JOIN COLUMNS(...). COLUMNS transformer is resolved to list of columns.
|
||||
if (auto * columns_list = array_join_expression->as<ListNode>())
|
||||
{
|
||||
array_join_column_name = array_join_expression_inner_column->getColumnName();
|
||||
}
|
||||
else if (!identifier_full_name.empty())
|
||||
{
|
||||
array_join_column_name = identifier_full_name;
|
||||
for (auto & array_join_subexpression : columns_list->getNodes())
|
||||
process_array_join_expression(array_join_subexpression);
|
||||
}
|
||||
else
|
||||
{
|
||||
array_join_column_name = "__array_join_expression_" + std::to_string(array_join_expressions_counter);
|
||||
++array_join_expressions_counter;
|
||||
process_array_join_expression(array_join_expression);
|
||||
}
|
||||
|
||||
if (array_join_column_names.contains(array_join_column_name))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"ARRAY JOIN {} multiple columns with name {}. In scope {}",
|
||||
array_join_node_typed.formatASTForErrorMessage(),
|
||||
array_join_column_name,
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
array_join_column_names.emplace(array_join_column_name);
|
||||
|
||||
NameAndTypePair array_join_column(array_join_column_name, result_type);
|
||||
auto array_join_column_node = std::make_shared<ColumnNode>(std::move(array_join_column), array_join_expression, array_join_node);
|
||||
array_join_column_node->setAlias(array_join_expression_alias);
|
||||
array_join_column_expressions.push_back(std::move(array_join_column_node));
|
||||
}
|
||||
|
||||
/** Allow to resolve ARRAY JOIN columns from aliases with types after ARRAY JOIN only after ARRAY JOIN expression list is resolved, because
|
||||
@ -6554,11 +6568,9 @@ void QueryAnalyzer::resolveArrayJoin(QueryTreeNodePtr & array_join_node, Identif
|
||||
* And it is expected that `value_element` inside projection expression list will be resolved as `value_element` expression
|
||||
* with type after ARRAY JOIN.
|
||||
*/
|
||||
for (size_t i = 0; i < array_join_nodes_size; ++i)
|
||||
array_join_nodes = std::move(array_join_column_expressions);
|
||||
for (auto & array_join_column_expression : array_join_nodes)
|
||||
{
|
||||
auto & array_join_column_expression = array_join_nodes[i];
|
||||
array_join_column_expression = std::move(array_join_column_expressions[i]);
|
||||
|
||||
auto it = scope.alias_name_to_expression_node.find(array_join_column_expression->getAlias());
|
||||
if (it != scope.alias_name_to_expression_node.end())
|
||||
{
|
||||
|
@ -42,6 +42,7 @@
|
||||
#include <Analyzer/Passes/CrossToInnerJoinPass.h>
|
||||
#include <Analyzer/Passes/ShardNumColumnToFunctionPass.h>
|
||||
#include <Analyzer/Passes/ConvertQueryToCNFPass.h>
|
||||
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -278,6 +279,7 @@ void addQueryTreePasses(QueryTreePassManager & manager)
|
||||
manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
|
||||
manager.addPass(std::make_unique<CrossToInnerJoinPass>());
|
||||
manager.addPass(std::make_unique<ShardNumColumnToFunctionPass>());
|
||||
manager.addPass(std::make_unique<OptimizeDateOrDateTimeConverterWithPreimagePass>());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ public:
|
||||
String compression_method;
|
||||
int compression_level = -1;
|
||||
String password;
|
||||
String s3_storage_class;
|
||||
ContextPtr context;
|
||||
bool is_internal_backup = false;
|
||||
std::shared_ptr<IBackupCoordination> backup_coordination;
|
||||
|
@ -88,7 +88,7 @@ namespace
|
||||
request.SetMaxKeys(1);
|
||||
auto outcome = client.ListObjects(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception::createDeprecated(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
return outcome.GetResult().GetContents();
|
||||
}
|
||||
|
||||
@ -178,7 +178,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
|
||||
|
||||
|
||||
BackupWriterS3::BackupWriterS3(
|
||||
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ContextPtr & context_)
|
||||
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ContextPtr & context_)
|
||||
: BackupWriterDefault(&Poco::Logger::get("BackupWriterS3"), context_)
|
||||
, s3_uri(s3_uri_)
|
||||
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
|
||||
@ -188,6 +188,7 @@ BackupWriterS3::BackupWriterS3(
|
||||
request_settings.updateFromSettings(context_->getSettingsRef());
|
||||
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
|
||||
request_settings.allow_native_copy = allow_s3_native_copy;
|
||||
request_settings.setStorageClassName(storage_class_name);
|
||||
}
|
||||
|
||||
void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
|
||||
@ -271,7 +272,7 @@ void BackupWriterS3::removeFile(const String & file_name)
|
||||
request.SetKey(fs::path(s3_uri.key) / file_name);
|
||||
auto outcome = client->DeleteObject(request);
|
||||
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
|
||||
throw Exception::createDeprecated(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
}
|
||||
|
||||
void BackupWriterS3::removeFiles(const Strings & file_names)
|
||||
@ -329,7 +330,7 @@ void BackupWriterS3::removeFilesBatch(const Strings & file_names)
|
||||
|
||||
auto outcome = client->DeleteObjects(request);
|
||||
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
|
||||
throw Exception::createDeprecated(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -38,7 +38,7 @@ private:
|
||||
class BackupWriterS3 : public BackupWriterDefault
|
||||
{
|
||||
public:
|
||||
BackupWriterS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ContextPtr & context_);
|
||||
BackupWriterS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ContextPtr & context_);
|
||||
~BackupWriterS3() override;
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
|
@ -21,6 +21,7 @@ namespace ErrorCodes
|
||||
M(String, id) \
|
||||
M(String, compression_method) \
|
||||
M(String, password) \
|
||||
M(String, s3_storage_class) \
|
||||
M(Bool, structure_only) \
|
||||
M(Bool, async) \
|
||||
M(Bool, decrypt_files_from_encrypted_disks) \
|
||||
|
@ -25,6 +25,9 @@ struct BackupSettings
|
||||
/// Password used to encrypt the backup.
|
||||
String password;
|
||||
|
||||
/// S3 storage class.
|
||||
String s3_storage_class = "";
|
||||
|
||||
/// If this is set to true then only create queries will be written to backup,
|
||||
/// without the data of tables.
|
||||
bool structure_only = false;
|
||||
|
@ -344,6 +344,7 @@ void BackupsWorker::doBackup(
|
||||
backup_create_params.compression_method = backup_settings.compression_method;
|
||||
backup_create_params.compression_level = backup_settings.compression_level;
|
||||
backup_create_params.password = backup_settings.password;
|
||||
backup_create_params.s3_storage_class = backup_settings.s3_storage_class;
|
||||
backup_create_params.is_internal_backup = backup_settings.internal;
|
||||
backup_create_params.backup_coordination = backup_coordination;
|
||||
backup_create_params.backup_uuid = backup_settings.backup_uuid;
|
||||
|
@ -112,7 +112,7 @@ void registerBackupEngineS3(BackupFactory & factory)
|
||||
}
|
||||
else
|
||||
{
|
||||
auto writer = std::make_shared<BackupWriterS3>(S3::URI{s3_uri}, access_key_id, secret_access_key, params.allow_s3_native_copy, params.context);
|
||||
auto writer = std::make_shared<BackupWriterS3>(S3::URI{s3_uri}, access_key_id, secret_access_key, params.allow_s3_native_copy, params.s3_storage_class, params.context);
|
||||
return std::make_unique<BackupImpl>(
|
||||
backup_name_for_logging,
|
||||
archive_params,
|
||||
|
@ -124,6 +124,9 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
|
||||
if (e.code() == ErrorCodes::DEADLOCK_AVOIDED)
|
||||
continue;
|
||||
|
||||
/// Client can successfully connect to the server and
|
||||
/// get ErrorCodes::USER_SESSION_LIMIT_EXCEEDED for suggestion connection.
|
||||
|
||||
/// We should not use std::cerr here, because this method works concurrently with the main thread.
|
||||
/// WriteBufferFromFileDescriptor will write directly to the file descriptor, avoiding data race on std::cerr.
|
||||
|
||||
|
@ -564,15 +564,22 @@ void ColumnNullable::updatePermutationImpl(IColumn::PermutationSortDirection dir
|
||||
else
|
||||
getNestedColumn().updatePermutation(direction, stability, limit, null_direction_hint, res, new_ranges);
|
||||
|
||||
equal_ranges = std::move(new_ranges);
|
||||
|
||||
if (unlikely(stability == PermutationSortStability::Stable))
|
||||
{
|
||||
for (auto & null_range : null_ranges)
|
||||
::sort(res.begin() + null_range.first, res.begin() + null_range.second);
|
||||
}
|
||||
|
||||
std::move(null_ranges.begin(), null_ranges.end(), std::back_inserter(equal_ranges));
|
||||
if (is_nulls_last || null_ranges.empty())
|
||||
{
|
||||
equal_ranges = std::move(new_ranges);
|
||||
std::move(null_ranges.begin(), null_ranges.end(), std::back_inserter(equal_ranges));
|
||||
}
|
||||
else
|
||||
{
|
||||
equal_ranges = std::move(null_ranges);
|
||||
std::move(new_ranges.begin(), new_ranges.end(), std::back_inserter(equal_ranges));
|
||||
}
|
||||
}
|
||||
|
||||
void ColumnNullable::getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
|
||||
|
@ -439,7 +439,7 @@ void ColumnSparse::compareColumn(const IColumn & rhs, size_t rhs_row_num,
|
||||
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
|
||||
int direction, int nan_direction_hint) const
|
||||
{
|
||||
if (row_indexes)
|
||||
if (row_indexes || !typeid_cast<const ColumnSparse *>(&rhs))
|
||||
{
|
||||
/// TODO: implement without conversion to full column.
|
||||
auto this_full = convertToFullColumnIfSparse();
|
||||
|
@ -582,6 +582,7 @@
|
||||
M(697, CANNOT_RESTORE_TO_NONENCRYPTED_DISK) \
|
||||
M(698, INVALID_REDIS_STORAGE_TYPE) \
|
||||
M(699, INVALID_REDIS_TABLE_STRUCTURE) \
|
||||
M(700, USER_SESSION_LIMIT_EXCEEDED) \
|
||||
\
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
M(1000, POCO_EXCEPTION) \
|
||||
|
43
src/Common/SettingSource.h
Normal file
43
src/Common/SettingSource.h
Normal file
@ -0,0 +1,43 @@
|
||||
#pragma once
|
||||
|
||||
#include <string_view>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
enum SettingSource
|
||||
{
|
||||
/// Query or session change:
|
||||
/// SET <setting> = <value>
|
||||
/// SELECT ... SETTINGS [<setting> = <value]
|
||||
QUERY,
|
||||
|
||||
/// Profile creation or altering:
|
||||
/// CREATE SETTINGS PROFILE ... SETTINGS [<setting> = <value]
|
||||
/// ALTER SETTINGS PROFILE ... SETTINGS [<setting> = <value]
|
||||
PROFILE,
|
||||
|
||||
/// Role creation or altering:
|
||||
/// CREATE ROLE ... SETTINGS [<setting> = <value>]
|
||||
/// ALTER ROLE ... SETTINGS [<setting> = <value]
|
||||
ROLE,
|
||||
|
||||
/// User creation or altering:
|
||||
/// CREATE USER ... SETTINGS [<setting> = <value>]
|
||||
/// ALTER USER ... SETTINGS [<setting> = <value]
|
||||
USER,
|
||||
|
||||
COUNT,
|
||||
};
|
||||
|
||||
constexpr std::string_view toString(SettingSource source)
|
||||
{
|
||||
switch (source)
|
||||
{
|
||||
case SettingSource::QUERY: return "query";
|
||||
case SettingSource::PROFILE: return "profile";
|
||||
case SettingSource::USER: return "user";
|
||||
case SettingSource::ROLE: return "role";
|
||||
default: return "unknown";
|
||||
}
|
||||
}
|
||||
}
|
@ -492,8 +492,6 @@ public:
|
||||
/// Useful to check owner of ephemeral node.
|
||||
virtual int64_t getSessionID() const = 0;
|
||||
|
||||
virtual Poco::Net::SocketAddress getConnectedAddress() const = 0;
|
||||
|
||||
/// If the method will throw an exception, callbacks won't be called.
|
||||
///
|
||||
/// After the method is executed successfully, you must wait for callbacks
|
||||
@ -566,6 +564,10 @@ public:
|
||||
|
||||
virtual const DB::KeeperFeatureFlags * getKeeperFeatureFlags() const { return nullptr; }
|
||||
|
||||
/// A ZooKeeper session can have an optional deadline set on it.
|
||||
/// After it has been reached, the session needs to be finalized.
|
||||
virtual bool hasReachedDeadline() const = 0;
|
||||
|
||||
/// Expire session and finish all pending requests
|
||||
virtual void finalize(const String & reason) = 0;
|
||||
};
|
||||
|
@ -39,8 +39,8 @@ public:
|
||||
~TestKeeper() override;
|
||||
|
||||
bool isExpired() const override { return expired; }
|
||||
bool hasReachedDeadline() const override { return false; }
|
||||
int64_t getSessionID() const override { return 0; }
|
||||
Poco::Net::SocketAddress getConnectedAddress() const override { return connected_zk_address; }
|
||||
|
||||
|
||||
void create(
|
||||
@ -135,8 +135,6 @@ private:
|
||||
|
||||
zkutil::ZooKeeperArgs args;
|
||||
|
||||
Poco::Net::SocketAddress connected_zk_address;
|
||||
|
||||
std::mutex push_request_mutex;
|
||||
std::atomic<bool> expired{false};
|
||||
|
||||
|
@ -112,31 +112,17 @@ void ZooKeeper::init(ZooKeeperArgs args_)
|
||||
throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZCONNECTIONLOSS);
|
||||
}
|
||||
|
||||
impl = std::make_unique<Coordination::ZooKeeper>(nodes, args, zk_log);
|
||||
impl = std::make_unique<Coordination::ZooKeeper>(nodes, args, zk_log, [this](size_t node_idx, const Coordination::ZooKeeper::Node & node)
|
||||
{
|
||||
connected_zk_host = node.address.host().toString();
|
||||
connected_zk_port = node.address.port();
|
||||
connected_zk_index = node_idx;
|
||||
});
|
||||
|
||||
if (args.chroot.empty())
|
||||
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ","));
|
||||
else
|
||||
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
|
||||
|
||||
Poco::Net::SocketAddress address = impl->getConnectedAddress();
|
||||
|
||||
connected_zk_host = address.host().toString();
|
||||
connected_zk_port = address.port();
|
||||
|
||||
connected_zk_index = 0;
|
||||
|
||||
if (args.hosts.size() > 1)
|
||||
{
|
||||
for (size_t i = 0; i < args.hosts.size(); i++)
|
||||
{
|
||||
if (args.hosts[i] == address.toString())
|
||||
{
|
||||
connected_zk_index = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (args.implementation == "testkeeper")
|
||||
{
|
||||
|
@ -521,6 +521,7 @@ public:
|
||||
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
|
||||
|
||||
UInt32 getSessionUptime() const { return static_cast<UInt32>(session_uptime.elapsedSeconds()); }
|
||||
bool hasReachedDeadline() const { return impl->hasReachedDeadline(); }
|
||||
|
||||
void setServerCompletelyStarted();
|
||||
|
||||
|
@ -204,6 +204,14 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
|
||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown load balancing: {}", load_balancing_str);
|
||||
get_priority_load_balancing.load_balancing = *load_balancing;
|
||||
}
|
||||
else if (key == "fallback_session_lifetime")
|
||||
{
|
||||
fallback_session_lifetime = SessionLifetimeConfiguration
|
||||
{
|
||||
.min_sec = config.getUInt(config_name + "." + key + ".min"),
|
||||
.max_sec = config.getUInt(config_name + "." + key + ".max"),
|
||||
};
|
||||
}
|
||||
else
|
||||
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::Error::ZBADARGUMENTS);
|
||||
}
|
||||
|
@ -11,8 +11,17 @@ namespace Poco::Util
|
||||
namespace zkutil
|
||||
{
|
||||
|
||||
constexpr UInt32 ZK_MIN_FALLBACK_SESSION_DEADLINE_SEC = 3 * 60 * 60;
|
||||
constexpr UInt32 ZK_MAX_FALLBACK_SESSION_DEADLINE_SEC = 6 * 60 * 60;
|
||||
|
||||
struct ZooKeeperArgs
|
||||
{
|
||||
struct SessionLifetimeConfiguration
|
||||
{
|
||||
UInt32 min_sec = ZK_MIN_FALLBACK_SESSION_DEADLINE_SEC;
|
||||
UInt32 max_sec = ZK_MAX_FALLBACK_SESSION_DEADLINE_SEC;
|
||||
bool operator == (const SessionLifetimeConfiguration &) const = default;
|
||||
};
|
||||
ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const String & config_name);
|
||||
|
||||
/// hosts_string -- comma separated [secure://]host:port list
|
||||
@ -36,6 +45,7 @@ struct ZooKeeperArgs
|
||||
UInt64 send_sleep_ms = 0;
|
||||
UInt64 recv_sleep_ms = 0;
|
||||
|
||||
SessionLifetimeConfiguration fallback_session_lifetime = {};
|
||||
DB::GetPriorityForLoadBalancing get_priority_load_balancing;
|
||||
|
||||
private:
|
||||
|
@ -313,8 +313,8 @@ ZooKeeper::~ZooKeeper()
|
||||
ZooKeeper::ZooKeeper(
|
||||
const Nodes & nodes,
|
||||
const zkutil::ZooKeeperArgs & args_,
|
||||
std::shared_ptr<ZooKeeperLog> zk_log_)
|
||||
: args(args_)
|
||||
std::shared_ptr<ZooKeeperLog> zk_log_, std::optional<ConnectedCallback> && connected_callback_)
|
||||
: args(args_), connected_callback(std::move(connected_callback_))
|
||||
{
|
||||
log = &Poco::Logger::get("ZooKeeperClient");
|
||||
std::atomic_store(&zk_log, std::move(zk_log_));
|
||||
@ -395,8 +395,9 @@ void ZooKeeper::connect(
|
||||
WriteBufferFromOwnString fail_reasons;
|
||||
for (size_t try_no = 0; try_no < num_tries; ++try_no)
|
||||
{
|
||||
for (const auto & node : nodes)
|
||||
for (size_t i = 0; i < nodes.size(); ++i)
|
||||
{
|
||||
const auto & node = nodes[i];
|
||||
try
|
||||
{
|
||||
/// Reset the state of previous attempt.
|
||||
@ -443,9 +444,25 @@ void ZooKeeper::connect(
|
||||
e.addMessage("while receiving handshake from ZooKeeper");
|
||||
throw;
|
||||
}
|
||||
|
||||
connected = true;
|
||||
connected_zk_address = node.address;
|
||||
|
||||
if (connected_callback.has_value())
|
||||
(*connected_callback)(i, node);
|
||||
|
||||
if (i != 0)
|
||||
{
|
||||
std::uniform_int_distribution<UInt32> fallback_session_lifetime_distribution
|
||||
{
|
||||
args.fallback_session_lifetime.min_sec,
|
||||
args.fallback_session_lifetime.max_sec,
|
||||
};
|
||||
UInt32 session_lifetime_seconds = fallback_session_lifetime_distribution(thread_local_rng);
|
||||
client_session_deadline = clock::now() + std::chrono::seconds(session_lifetime_seconds);
|
||||
|
||||
LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})."
|
||||
" To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds",
|
||||
node.address.toString(), i, session_lifetime_seconds);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
@ -462,7 +479,6 @@ void ZooKeeper::connect(
|
||||
if (!connected)
|
||||
{
|
||||
WriteBufferFromOwnString message;
|
||||
connected_zk_address = Poco::Net::SocketAddress();
|
||||
|
||||
message << "All connection tries failed while connecting to ZooKeeper. nodes: ";
|
||||
bool first = true;
|
||||
@ -1060,6 +1076,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
|
||||
{
|
||||
try
|
||||
{
|
||||
checkSessionDeadline();
|
||||
info.time = clock::now();
|
||||
if (zk_log)
|
||||
{
|
||||
@ -1482,6 +1499,17 @@ void ZooKeeper::setupFaultDistributions()
|
||||
inject_setup.test_and_set();
|
||||
}
|
||||
|
||||
void ZooKeeper::checkSessionDeadline() const
|
||||
{
|
||||
if (unlikely(hasReachedDeadline()))
|
||||
throw Exception(Error::ZSESSIONEXPIRED, "Session expired (force expiry client-side)");
|
||||
}
|
||||
|
||||
bool ZooKeeper::hasReachedDeadline() const
|
||||
{
|
||||
return client_session_deadline.has_value() && clock::now() >= client_session_deadline.value();
|
||||
}
|
||||
|
||||
void ZooKeeper::maybeInjectSendFault()
|
||||
{
|
||||
if (unlikely(inject_setup.test() && send_inject_fault && send_inject_fault.value()(thread_local_rng)))
|
||||
|
@ -107,6 +107,7 @@ public:
|
||||
};
|
||||
|
||||
using Nodes = std::vector<Node>;
|
||||
using ConnectedCallback = std::function<void(size_t, const Node&)>;
|
||||
|
||||
/** Connection to nodes is performed in order. If you want, shuffle them manually.
|
||||
* Operation timeout couldn't be greater than session timeout.
|
||||
@ -115,7 +116,8 @@ public:
|
||||
ZooKeeper(
|
||||
const Nodes & nodes,
|
||||
const zkutil::ZooKeeperArgs & args_,
|
||||
std::shared_ptr<ZooKeeperLog> zk_log_);
|
||||
std::shared_ptr<ZooKeeperLog> zk_log_,
|
||||
std::optional<ConnectedCallback> && connected_callback_ = {});
|
||||
|
||||
~ZooKeeper() override;
|
||||
|
||||
@ -123,11 +125,13 @@ public:
|
||||
/// If expired, you can only destroy the object. All other methods will throw exception.
|
||||
bool isExpired() const override { return requests_queue.isFinished(); }
|
||||
|
||||
/// A ZooKeeper session can have an optional deadline set on it.
|
||||
/// After it has been reached, the session needs to be finalized.
|
||||
bool hasReachedDeadline() const override;
|
||||
|
||||
/// Useful to check owner of ephemeral node.
|
||||
int64_t getSessionID() const override { return session_id; }
|
||||
|
||||
Poco::Net::SocketAddress getConnectedAddress() const override { return connected_zk_address; }
|
||||
|
||||
void executeGenericRequest(
|
||||
const ZooKeeperRequestPtr & request,
|
||||
ResponseCallback callback);
|
||||
@ -213,9 +217,9 @@ public:
|
||||
|
||||
private:
|
||||
ACLs default_acls;
|
||||
Poco::Net::SocketAddress connected_zk_address;
|
||||
|
||||
zkutil::ZooKeeperArgs args;
|
||||
std::optional<ConnectedCallback> connected_callback = {};
|
||||
|
||||
/// Fault injection
|
||||
void maybeInjectSendFault();
|
||||
@ -252,6 +256,7 @@ private:
|
||||
clock::time_point time;
|
||||
};
|
||||
|
||||
std::optional<clock::time_point> client_session_deadline {};
|
||||
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
|
||||
|
||||
RequestsQueue requests_queue{1024};
|
||||
@ -324,6 +329,8 @@ private:
|
||||
|
||||
void initFeatureFlags();
|
||||
|
||||
void checkSessionDeadline() const;
|
||||
|
||||
CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
|
||||
std::shared_ptr<ZooKeeperLog> zk_log;
|
||||
|
||||
|
@ -153,7 +153,10 @@ Pool::Entry Pool::get(uint64_t wait_timeout)
|
||||
for (auto & connection : connections)
|
||||
{
|
||||
if (connection->ref_count == 0)
|
||||
{
|
||||
logger.test("Found free connection in pool, returning it to the caller");
|
||||
return Entry(connection, this);
|
||||
}
|
||||
}
|
||||
|
||||
logger.trace("(%s): Trying to allocate a new connection.", getDescription());
|
||||
|
@ -26,7 +26,7 @@ namespace mysqlxx
|
||||
*
|
||||
* void thread()
|
||||
* {
|
||||
* mysqlxx::Pool::Entry connection = pool.Get();
|
||||
* mysqlxx::Pool::Entry connection = pool.Get();
|
||||
* std::string s = connection->query("SELECT 'Hello, world!' AS world").use().fetch()["world"].getString();
|
||||
* }
|
||||
* TODO: simplify with PoolBase.
|
||||
|
@ -320,8 +320,6 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
|
||||
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
|
||||
request_info.session_id = session_id;
|
||||
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
|
||||
if (shutdown_called)
|
||||
return false;
|
||||
|
||||
@ -423,13 +421,10 @@ void KeeperDispatcher::shutdown()
|
||||
try
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
|
||||
if (shutdown_called)
|
||||
if (shutdown_called.exchange(true))
|
||||
return;
|
||||
|
||||
LOG_DEBUG(log, "Shutting down storage dispatcher");
|
||||
shutdown_called = true;
|
||||
|
||||
if (session_cleaner_thread.joinable())
|
||||
session_cleaner_thread.join();
|
||||
@ -582,12 +577,9 @@ void KeeperDispatcher::sessionCleanerTask()
|
||||
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
|
||||
.request = std::move(request),
|
||||
};
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
if (!requests_queue->push(std::move(request_info)))
|
||||
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
}
|
||||
if (!requests_queue->push(std::move(request_info)))
|
||||
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
|
||||
/// Remove session from registered sessions
|
||||
finishSession(dead_session);
|
||||
@ -607,6 +599,10 @@ void KeeperDispatcher::sessionCleanerTask()
|
||||
|
||||
void KeeperDispatcher::finishSession(int64_t session_id)
|
||||
{
|
||||
/// shutdown() method will cleanup sessions if needed
|
||||
if (shutdown_called)
|
||||
return;
|
||||
|
||||
{
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
auto session_it = session_to_response_callback.find(session_id);
|
||||
@ -698,12 +694,9 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
|
||||
}
|
||||
|
||||
/// Push new session request to queue
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push session id request to queue within session timeout");
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
}
|
||||
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push session id request to queue within session timeout");
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
|
||||
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot receive session id within session timeout");
|
||||
@ -871,10 +864,7 @@ uint64_t KeeperDispatcher::getSnapDirSize() const
|
||||
Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
|
||||
{
|
||||
Keeper4LWInfo result = server->getPartiallyFilled4LWInfo();
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
result.outstanding_requests_count = requests_queue->size();
|
||||
}
|
||||
result.outstanding_requests_count = requests_queue->size();
|
||||
{
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
result.alive_connections_count = session_to_response_callback.size();
|
||||
|
@ -27,8 +27,6 @@ using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeep
|
||||
class KeeperDispatcher
|
||||
{
|
||||
private:
|
||||
mutable std::mutex push_request_mutex;
|
||||
|
||||
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
|
||||
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
|
||||
using ClusterUpdateQueue = ConcurrentBoundedQueue<ClusterUpdateAction>;
|
||||
|
@ -794,8 +794,14 @@ bool KeeperServer::applyConfigUpdate(const ClusterUpdateAction & action)
|
||||
std::lock_guard _{server_write_mutex};
|
||||
|
||||
if (const auto * add = std::get_if<AddRaftServer>(&action))
|
||||
return raft_instance->get_srv_config(add->id) != nullptr
|
||||
|| raft_instance->add_srv(static_cast<nuraft::srv_config>(*add))->get_accepted();
|
||||
{
|
||||
if (raft_instance->get_srv_config(add->id) != nullptr)
|
||||
return true;
|
||||
|
||||
auto resp = raft_instance->add_srv(static_cast<nuraft::srv_config>(*add));
|
||||
resp->get();
|
||||
return resp->get_accepted();
|
||||
}
|
||||
else if (const auto * remove = std::get_if<RemoveRaftServer>(&action))
|
||||
{
|
||||
if (remove->id == raft_instance->get_leader())
|
||||
@ -807,8 +813,12 @@ bool KeeperServer::applyConfigUpdate(const ClusterUpdateAction & action)
|
||||
return false;
|
||||
}
|
||||
|
||||
return raft_instance->get_srv_config(remove->id) == nullptr
|
||||
|| raft_instance->remove_srv(remove->id)->get_accepted();
|
||||
if (raft_instance->get_srv_config(remove->id) == nullptr)
|
||||
return true;
|
||||
|
||||
auto resp = raft_instance->remove_srv(remove->id);
|
||||
resp->get();
|
||||
return resp->get_accepted();
|
||||
}
|
||||
else if (const auto * update = std::get_if<UpdateRaftServerPriority>(&action))
|
||||
{
|
||||
|
@ -386,6 +386,8 @@ class IColumn;
|
||||
M(UInt64, max_temporary_columns, 0, "If a query generates more than the specified number of temporary columns in memory as a result of intermediate calculation, exception is thrown. Zero value means unlimited. This setting is useful to prevent too complex queries.", 0) \
|
||||
M(UInt64, max_temporary_non_const_columns, 0, "Similar to the 'max_temporary_columns' setting but applies only to non-constant columns. This makes sense, because constant columns are cheap and it is reasonable to allow more of them.", 0) \
|
||||
\
|
||||
M(UInt64, max_sessions_for_user, 0, "Maximum number of simultaneous sessions for a user.", 0) \
|
||||
\
|
||||
M(UInt64, max_subquery_depth, 100, "If a query has more than specified number of nested subqueries, throw an exception. This allows you to have a sanity check to protect the users of your cluster from going insane with their queries.", 0) \
|
||||
M(UInt64, max_analyze_depth, 5000, "Maximum number of analyses performed by interpreter.", 0) \
|
||||
M(UInt64, max_ast_depth, 1000, "Maximum depth of query syntax tree. Checked after parsing.", 0) \
|
||||
@ -624,7 +626,7 @@ class IColumn;
|
||||
M(Bool, engine_file_allow_create_multiple_files, false, "Enables or disables creating a new file on each insert in file engine tables if format has suffix.", 0) \
|
||||
M(Bool, engine_file_skip_empty_files, false, "Allows to skip empty files in file table engine", 0) \
|
||||
M(Bool, engine_url_skip_empty_files, false, "Allows to skip empty files in url table engine", 0) \
|
||||
M(Bool, disable_url_encoding, false, " Allows to disable decoding/encoding path in uri in URL table engine", 0) \
|
||||
M(Bool, enable_url_encoding, true, " Allows to enable/disable decoding/encoding path in uri in URL table engine", 0) \
|
||||
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
|
||||
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
|
||||
M(Bool, database_replicated_enforce_synchronous_settings, false, "Enforces synchronous waiting for some queries (see also database_atomic_wait_for_drop_and_detach_synchronously, mutation_sync, alter_sync). Not recommended to enable these settings.", 0) \
|
||||
|
@ -65,6 +65,7 @@ void DatabaseMaterializedMySQL::setException(const std::exception_ptr & exceptio
|
||||
|
||||
void DatabaseMaterializedMySQL::startupTables(ThreadPool & thread_pool, LoadingStrictnessLevel mode)
|
||||
{
|
||||
LOG_TRACE(log, "Starting MaterializeMySQL tables");
|
||||
DatabaseAtomic::startupTables(thread_pool, mode);
|
||||
|
||||
if (mode < LoadingStrictnessLevel::FORCE_ATTACH)
|
||||
@ -122,6 +123,7 @@ void DatabaseMaterializedMySQL::alterTable(ContextPtr context_, const StorageID
|
||||
|
||||
void DatabaseMaterializedMySQL::drop(ContextPtr context_)
|
||||
{
|
||||
LOG_TRACE(log, "Dropping MaterializeMySQL database");
|
||||
/// Remove metadata info
|
||||
fs::path metadata(getMetadataPath() + "/.metadata");
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <Databases/DatabaseAtomic.h>
|
||||
#include <Databases/MySQL/MaterializedMySQLSettings.h>
|
||||
#include <Databases/MySQL/MaterializedMySQLSyncThread.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include "Common/logger_useful.h"
|
||||
#include "config.h"
|
||||
|
||||
#if USE_MYSQL
|
||||
@ -499,7 +500,10 @@ bool MaterializedMySQLSyncThread::prepareSynchronized(MaterializeMetadata & meta
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (const mysqlxx::ConnectionFailed &) {}
|
||||
catch (const mysqlxx::ConnectionFailed & ex)
|
||||
{
|
||||
LOG_TRACE(log, "Connection to MySQL failed {}", ex.displayText());
|
||||
}
|
||||
catch (const mysqlxx::BadQuery & e)
|
||||
{
|
||||
// Lost connection to MySQL server during query
|
||||
|
@ -135,7 +135,7 @@ private:
|
||||
return result;
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::S3_ERROR, "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
|
||||
throw S3Exception(outcome.GetError().GetErrorType(), "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
|
||||
quoteString(request.GetBucket()), quoteString(request.GetPrefix()),
|
||||
backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage()));
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ namespace ErrorCodes
|
||||
message.empty() ? "" : ": " + message);
|
||||
}
|
||||
|
||||
Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::string & root_name, const ASTs & disk_args, ContextPtr context)
|
||||
Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const ASTs & disk_args, ContextPtr context)
|
||||
{
|
||||
if (disk_args.empty())
|
||||
throwBadConfiguration("expected non-empty list of arguments");
|
||||
@ -39,8 +39,6 @@ Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::st
|
||||
Poco::AutoPtr<Poco::XML::Document> xml_document(new Poco::XML::Document());
|
||||
Poco::AutoPtr<Poco::XML::Element> root(xml_document->createElement("disk"));
|
||||
xml_document->appendChild(root);
|
||||
Poco::AutoPtr<Poco::XML::Element> disk_configuration(xml_document->createElement(root_name));
|
||||
root->appendChild(disk_configuration);
|
||||
|
||||
for (const auto & arg : disk_args)
|
||||
{
|
||||
@ -62,7 +60,7 @@ Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::st
|
||||
|
||||
const std::string & key = key_identifier->name();
|
||||
Poco::AutoPtr<Poco::XML::Element> key_element(xml_document->createElement(key));
|
||||
disk_configuration->appendChild(key_element);
|
||||
root->appendChild(key_element);
|
||||
|
||||
if (!function_args[1]->as<ASTLiteral>() && !function_args[1]->as<ASTIdentifier>())
|
||||
throwBadConfiguration("expected values to be literals or identifiers");
|
||||
@ -75,9 +73,9 @@ Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::st
|
||||
return xml_document;
|
||||
}
|
||||
|
||||
DiskConfigurationPtr getDiskConfigurationFromAST(const std::string & root_name, const ASTs & disk_args, ContextPtr context)
|
||||
DiskConfigurationPtr getDiskConfigurationFromAST(const ASTs & disk_args, ContextPtr context)
|
||||
{
|
||||
auto xml_document = getDiskConfigurationFromASTImpl(root_name, disk_args, context);
|
||||
auto xml_document = getDiskConfigurationFromASTImpl(disk_args, context);
|
||||
Poco::AutoPtr<Poco::Util::XMLConfiguration> conf(new Poco::Util::XMLConfiguration());
|
||||
conf->load(xml_document);
|
||||
return conf;
|
||||
|
@ -14,19 +14,19 @@ using DiskConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
|
||||
/**
|
||||
* Transform a list of pairs ( key1=value1, key2=value2, ... ), where keys and values are ASTLiteral or ASTIdentifier
|
||||
* into
|
||||
* <root_name>
|
||||
* <disk>
|
||||
* <key1>value1</key1>
|
||||
* <key2>value2</key2>
|
||||
* ...
|
||||
* </root_name>
|
||||
* </disk>
|
||||
*
|
||||
* Used in case disk configuration is passed via AST when creating
|
||||
* a disk object on-the-fly without any configuration file.
|
||||
*/
|
||||
DiskConfigurationPtr getDiskConfigurationFromAST(const std::string & root_name, const ASTs & disk_args, ContextPtr context);
|
||||
DiskConfigurationPtr getDiskConfigurationFromAST(const ASTs & disk_args, ContextPtr context);
|
||||
|
||||
/// The same as above function, but return XML::Document for easier modification of result configuration.
|
||||
[[ maybe_unused ]] Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::string & root_name, const ASTs & disk_args, ContextPtr context);
|
||||
[[ maybe_unused ]] Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const ASTs & disk_args, ContextPtr context);
|
||||
|
||||
/*
|
||||
* A reverse function.
|
||||
|
@ -26,8 +26,16 @@ namespace
|
||||
{
|
||||
std::string getOrCreateDiskFromDiskAST(const ASTFunction & function, ContextPtr context)
|
||||
{
|
||||
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function.arguments.get());
|
||||
const auto & function_args = function_args_expr->children;
|
||||
auto config = getDiskConfigurationFromAST(function_args, context);
|
||||
|
||||
std::string disk_name;
|
||||
if (function.name == "disk")
|
||||
if (config->has("name"))
|
||||
{
|
||||
disk_name = config->getString("name");
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We need a unique name for a created custom disk, but it needs to be the same
|
||||
/// after table is reattached or server is restarted, so take a hash of the disk
|
||||
@ -36,21 +44,9 @@ namespace
|
||||
disk_name = DiskSelector::TMP_INTERNAL_DISK_PREFIX
|
||||
+ toString(sipHash128(disk_setting_string.data(), disk_setting_string.size()));
|
||||
}
|
||||
else
|
||||
{
|
||||
static constexpr std::string_view custom_disk_prefix = "disk_";
|
||||
|
||||
if (function.name.size() <= custom_disk_prefix.size() || !function.name.starts_with(custom_disk_prefix))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid disk name: {}", function.name);
|
||||
|
||||
disk_name = function.name.substr(custom_disk_prefix.size());
|
||||
}
|
||||
|
||||
auto result_disk = context->getOrCreateDisk(disk_name, [&](const DisksMap & disks_map) -> DiskPtr {
|
||||
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function.arguments.get());
|
||||
const auto & function_args = function_args_expr->children;
|
||||
auto config = getDiskConfigurationFromAST(disk_name, function_args, context);
|
||||
auto disk = DiskFactory::instance().create(disk_name, *config, disk_name, context, disks_map);
|
||||
auto disk = DiskFactory::instance().create(disk_name, *config, "", context, disks_map);
|
||||
/// Mark that disk can be used without storage policy.
|
||||
disk->markDiskAsCustom();
|
||||
return disk;
|
||||
|
@ -783,7 +783,7 @@ namespace
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
abortMultipartUpload();
|
||||
throw Exception::createDeprecated(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
}
|
||||
|
||||
return outcome.GetResult().GetCopyPartResult().GetETag();
|
||||
|
@ -85,7 +85,7 @@ ObjectInfo getObjectInfo(
|
||||
}
|
||||
else if (throw_on_error)
|
||||
{
|
||||
throw DB::Exception(ErrorCodes::S3_ERROR,
|
||||
throw S3Exception(error.GetErrorType(),
|
||||
"Failed to get object info: {}. HTTP response code: {}",
|
||||
error.GetMessage(), static_cast<size_t>(error.GetResponseCode()));
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ BlockIO InterpreterCreateRoleQuery::execute()
|
||||
settings_from_query = SettingsProfileElements{*query.settings, access_control};
|
||||
|
||||
if (!query.attach)
|
||||
getContext()->checkSettingsConstraints(*settings_from_query);
|
||||
getContext()->checkSettingsConstraints(*settings_from_query, SettingSource::ROLE);
|
||||
}
|
||||
|
||||
if (!query.cluster.empty())
|
||||
|
@ -54,7 +54,7 @@ BlockIO InterpreterCreateSettingsProfileQuery::execute()
|
||||
settings_from_query = SettingsProfileElements{*query.settings, access_control};
|
||||
|
||||
if (!query.attach)
|
||||
getContext()->checkSettingsConstraints(*settings_from_query);
|
||||
getContext()->checkSettingsConstraints(*settings_from_query, SettingSource::PROFILE);
|
||||
}
|
||||
|
||||
if (!query.cluster.empty())
|
||||
|
@ -133,7 +133,7 @@ BlockIO InterpreterCreateUserQuery::execute()
|
||||
settings_from_query = SettingsProfileElements{*query.settings, access_control};
|
||||
|
||||
if (!query.attach)
|
||||
getContext()->checkSettingsConstraints(*settings_from_query);
|
||||
getContext()->checkSettingsConstraints(*settings_from_query, SettingSource::USER);
|
||||
}
|
||||
|
||||
if (!query.cluster.empty())
|
||||
|
@ -45,6 +45,7 @@
|
||||
#include <Interpreters/Cache/QueryCache.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/SessionTracker.h>
|
||||
#include <Core/ServerSettings.h>
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
#include <Core/Settings.h>
|
||||
@ -158,6 +159,7 @@ namespace CurrentMetrics
|
||||
extern const Metric IOWriterThreadsActive;
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -276,6 +278,7 @@ struct ContextSharedPart : boost::noncopyable
|
||||
mutable QueryCachePtr query_cache; /// Cache of query results.
|
||||
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
|
||||
ProcessList process_list; /// Executing queries at the moment.
|
||||
SessionTracker session_tracker;
|
||||
GlobalOvercommitTracker global_overcommit_tracker;
|
||||
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
|
||||
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
|
||||
@ -739,6 +742,9 @@ std::unique_lock<std::recursive_mutex> Context::getLock() const
|
||||
ProcessList & Context::getProcessList() { return shared->process_list; }
|
||||
const ProcessList & Context::getProcessList() const { return shared->process_list; }
|
||||
OvercommitTracker * Context::getGlobalOvercommitTracker() const { return &shared->global_overcommit_tracker; }
|
||||
|
||||
SessionTracker & Context::getSessionTracker() { return shared->session_tracker; }
|
||||
|
||||
MergeList & Context::getMergeList() { return shared->merge_list; }
|
||||
const MergeList & Context::getMergeList() const { return shared->merge_list; }
|
||||
MovesList & Context::getMovesList() { return shared->moves_list; }
|
||||
@ -1094,7 +1100,7 @@ void Context::setUser(const UUID & user_id_, bool set_current_profiles_, bool se
|
||||
std::optional<ContextAccessParams> params;
|
||||
{
|
||||
auto lock = getLock();
|
||||
params.emplace(ContextAccessParams{user_id_, /* full_access= */ false, /* use_default_roles = */ true, {}, settings, current_database, client_info});
|
||||
params.emplace(ContextAccessParams{user_id_, /* full_access= */ false, /* use_default_roles = */ true, {}, settings, current_database, client_info });
|
||||
}
|
||||
/// `temp_access` is used here only to extract information about the user, not to actually check access.
|
||||
/// NOTE: AccessControl::getContextAccess() may require some IO work, so Context::getLock() must be unlocked while we're doing this.
|
||||
@ -1157,13 +1163,6 @@ std::optional<UUID> Context::getUserID() const
|
||||
}
|
||||
|
||||
|
||||
void Context::setQuotaKey(String quota_key_)
|
||||
{
|
||||
auto lock = getLock();
|
||||
client_info.quota_key = std::move(quota_key_);
|
||||
}
|
||||
|
||||
|
||||
void Context::setCurrentRoles(const std::vector<UUID> & current_roles_)
|
||||
{
|
||||
auto lock = getLock();
|
||||
@ -1303,7 +1302,7 @@ void Context::setCurrentProfiles(const SettingsProfilesInfo & profiles_info, boo
|
||||
{
|
||||
auto lock = getLock();
|
||||
if (check_constraints)
|
||||
checkSettingsConstraints(profiles_info.settings);
|
||||
checkSettingsConstraints(profiles_info.settings, SettingSource::PROFILE);
|
||||
applySettingsChanges(profiles_info.settings);
|
||||
settings_constraints_and_current_profiles = profiles_info.getConstraintsAndProfileIDs(settings_constraints_and_current_profiles);
|
||||
}
|
||||
@ -1857,29 +1856,29 @@ void Context::applySettingsChanges(const SettingsChanges & changes)
|
||||
}
|
||||
|
||||
|
||||
void Context::checkSettingsConstraints(const SettingsProfileElements & profile_elements) const
|
||||
void Context::checkSettingsConstraints(const SettingsProfileElements & profile_elements, SettingSource source) const
|
||||
{
|
||||
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, profile_elements);
|
||||
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, profile_elements, source);
|
||||
}
|
||||
|
||||
void Context::checkSettingsConstraints(const SettingChange & change) const
|
||||
void Context::checkSettingsConstraints(const SettingChange & change, SettingSource source) const
|
||||
{
|
||||
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, change);
|
||||
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, change, source);
|
||||
}
|
||||
|
||||
void Context::checkSettingsConstraints(const SettingsChanges & changes) const
|
||||
void Context::checkSettingsConstraints(const SettingsChanges & changes, SettingSource source) const
|
||||
{
|
||||
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, changes);
|
||||
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, changes, source);
|
||||
}
|
||||
|
||||
void Context::checkSettingsConstraints(SettingsChanges & changes) const
|
||||
void Context::checkSettingsConstraints(SettingsChanges & changes, SettingSource source) const
|
||||
{
|
||||
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, changes);
|
||||
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, changes, source);
|
||||
}
|
||||
|
||||
void Context::clampToSettingsConstraints(SettingsChanges & changes) const
|
||||
void Context::clampToSettingsConstraints(SettingsChanges & changes, SettingSource source) const
|
||||
{
|
||||
getSettingsConstraintsAndCurrentProfiles()->constraints.clamp(settings, changes);
|
||||
getSettingsConstraintsAndCurrentProfiles()->constraints.clamp(settings, changes, source);
|
||||
}
|
||||
|
||||
void Context::checkMergeTreeSettingsConstraints(const MergeTreeSettings & merge_tree_settings, const SettingsChanges & changes) const
|
||||
@ -2711,7 +2710,10 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
|
||||
const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef();
|
||||
if (!shared->zookeeper)
|
||||
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, zkutil::getZooKeeperConfigName(config), getZooKeeperLog());
|
||||
else if (shared->zookeeper->expired())
|
||||
else if (shared->zookeeper->hasReachedDeadline())
|
||||
shared->zookeeper->finalize("ZooKeeper session has reached its deadline");
|
||||
|
||||
if (shared->zookeeper->expired())
|
||||
{
|
||||
Stopwatch watch;
|
||||
LOG_DEBUG(shared->log, "Trying to establish a new connection with ZooKeeper");
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Common/HTTPHeaderFilter.h>
|
||||
#include <Common/ThreadPool_fwd.h>
|
||||
#include <Common/Throttler_fwd.h>
|
||||
#include <Common/SettingSource.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Core/Settings.h>
|
||||
#include <Core/UUID.h>
|
||||
@ -202,6 +203,8 @@ using MergeTreeMetadataCachePtr = std::shared_ptr<MergeTreeMetadataCache>;
|
||||
class PreparedSetsCache;
|
||||
using PreparedSetsCachePtr = std::shared_ptr<PreparedSetsCache>;
|
||||
|
||||
class SessionTracker;
|
||||
|
||||
/// An empty interface for an arbitrary object that may be attached by a shared pointer
|
||||
/// to query context, when using ClickHouse as a library.
|
||||
struct IHostContext
|
||||
@ -539,8 +542,6 @@ public:
|
||||
|
||||
String getUserName() const;
|
||||
|
||||
void setQuotaKey(String quota_key_);
|
||||
|
||||
void setCurrentRoles(const std::vector<UUID> & current_roles_);
|
||||
void setCurrentRolesDefault();
|
||||
boost::container::flat_set<UUID> getCurrentRoles() const;
|
||||
@ -735,11 +736,11 @@ public:
|
||||
void applySettingsChanges(const SettingsChanges & changes);
|
||||
|
||||
/// Checks the constraints.
|
||||
void checkSettingsConstraints(const SettingsProfileElements & profile_elements) const;
|
||||
void checkSettingsConstraints(const SettingChange & change) const;
|
||||
void checkSettingsConstraints(const SettingsChanges & changes) const;
|
||||
void checkSettingsConstraints(SettingsChanges & changes) const;
|
||||
void clampToSettingsConstraints(SettingsChanges & changes) const;
|
||||
void checkSettingsConstraints(const SettingsProfileElements & profile_elements, SettingSource source) const;
|
||||
void checkSettingsConstraints(const SettingChange & change, SettingSource source) const;
|
||||
void checkSettingsConstraints(const SettingsChanges & changes, SettingSource source) const;
|
||||
void checkSettingsConstraints(SettingsChanges & changes, SettingSource source) const;
|
||||
void clampToSettingsConstraints(SettingsChanges & changes, SettingSource source) const;
|
||||
void checkMergeTreeSettingsConstraints(const MergeTreeSettings & merge_tree_settings, const SettingsChanges & changes) const;
|
||||
|
||||
/// Reset settings to default value
|
||||
@ -861,6 +862,8 @@ public:
|
||||
|
||||
OvercommitTracker * getGlobalOvercommitTracker() const;
|
||||
|
||||
SessionTracker & getSessionTracker();
|
||||
|
||||
MergeList & getMergeList();
|
||||
const MergeList & getMergeList() const;
|
||||
|
||||
|
@ -764,7 +764,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
|
||||
/// Table function without columns list.
|
||||
auto table_function_ast = create.as_table_function->ptr();
|
||||
auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext());
|
||||
properties.columns = table_function->getActualTableStructure(getContext());
|
||||
properties.columns = table_function->getActualTableStructure(getContext(), /*is_insert_query*/ true);
|
||||
}
|
||||
else if (create.is_dictionary)
|
||||
{
|
||||
|
@ -96,7 +96,7 @@ BlockIO InterpreterDescribeQuery::execute()
|
||||
else if (table_expression.table_function)
|
||||
{
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression.table_function, getContext());
|
||||
auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext());
|
||||
auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext(), /*is_insert_query*/ true);
|
||||
for (const auto & table_function_column_description : table_function_column_descriptions)
|
||||
columns.emplace_back(table_function_column_description);
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ namespace DB
|
||||
BlockIO InterpreterSetQuery::execute()
|
||||
{
|
||||
const auto & ast = query_ptr->as<ASTSetQuery &>();
|
||||
getContext()->checkSettingsConstraints(ast.changes);
|
||||
getContext()->checkSettingsConstraints(ast.changes, SettingSource::QUERY);
|
||||
auto session_context = getContext()->getSessionContext();
|
||||
session_context->applySettingsChanges(ast.changes);
|
||||
session_context->addQueryParameters(ast.query_parameters);
|
||||
@ -28,7 +28,7 @@ void InterpreterSetQuery::executeForCurrentContext(bool ignore_setting_constrain
|
||||
{
|
||||
const auto & ast = query_ptr->as<ASTSetQuery &>();
|
||||
if (!ignore_setting_constraints)
|
||||
getContext()->checkSettingsConstraints(ast.changes);
|
||||
getContext()->checkSettingsConstraints(ast.changes, SettingSource::QUERY);
|
||||
getContext()->applySettingsChanges(ast.changes);
|
||||
getContext()->resetSettingsToDefaultValue(ast.default_settings);
|
||||
}
|
||||
|
@ -3,11 +3,13 @@
|
||||
#include <Access/AccessControl.h>
|
||||
#include <Access/Credentials.h>
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Access/SettingsProfilesInfo.h>
|
||||
#include <Access/User.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Interpreters/SessionTracker.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/SessionLog.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
@ -200,7 +202,6 @@ private:
|
||||
|
||||
LOG_TEST(log, "Schedule closing session with session_id: {}, user_id: {}",
|
||||
session.key.second, session.key.first);
|
||||
|
||||
}
|
||||
|
||||
void cleanThread()
|
||||
@ -336,6 +337,9 @@ void Session::authenticate(const Credentials & credentials_, const Poco::Net::So
|
||||
if (session_context)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "If there is a session context it must be created after authentication");
|
||||
|
||||
if (session_tracker_handle)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session tracker handle was created before authentication finish");
|
||||
|
||||
auto address = address_;
|
||||
if ((address == Poco::Net::SocketAddress{}) && (prepared_client_info->interface == ClientInfo::Interface::LOCAL))
|
||||
address = Poco::Net::SocketAddress{"127.0.0.1", 0};
|
||||
@ -490,6 +494,8 @@ ContextMutablePtr Session::makeSessionContext()
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session context must be created before any query context");
|
||||
if (!user_id)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session context must be created after authentication");
|
||||
if (session_tracker_handle)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session tracker handle was created before making session");
|
||||
|
||||
LOG_DEBUG(log, "{} Creating session context with user_id: {}",
|
||||
toString(auth_id), toString(*user_id));
|
||||
@ -503,13 +509,17 @@ ContextMutablePtr Session::makeSessionContext()
|
||||
prepared_client_info.reset();
|
||||
|
||||
/// Set user information for the new context: current profiles, roles, access rights.
|
||||
if (user_id)
|
||||
new_session_context->setUser(*user_id);
|
||||
new_session_context->setUser(*user_id);
|
||||
|
||||
/// Session context is ready.
|
||||
session_context = new_session_context;
|
||||
user = session_context->getUser();
|
||||
|
||||
session_tracker_handle = session_context->getSessionTracker().trackSession(
|
||||
*user_id,
|
||||
{},
|
||||
session_context->getSettingsRef().max_sessions_for_user);
|
||||
|
||||
return session_context;
|
||||
}
|
||||
|
||||
@ -521,6 +531,8 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session context must be created before any query context");
|
||||
if (!user_id)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session context must be created after authentication");
|
||||
if (session_tracker_handle)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session tracker handle was created before making session");
|
||||
|
||||
LOG_DEBUG(log, "{} Creating named session context with name: {}, user_id: {}",
|
||||
toString(auth_id), session_name_, toString(*user_id));
|
||||
@ -541,9 +553,23 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
|
||||
new_session_context->setClientInfo(*prepared_client_info);
|
||||
prepared_client_info.reset();
|
||||
|
||||
auto access = new_session_context->getAccess();
|
||||
UInt64 max_sessions_for_user = 0;
|
||||
/// Set user information for the new context: current profiles, roles, access rights.
|
||||
if (user_id && !new_session_context->getAccess()->tryGetUser())
|
||||
if (!access->tryGetUser())
|
||||
{
|
||||
new_session_context->setUser(*user_id);
|
||||
max_sessions_for_user = new_session_context->getSettingsRef().max_sessions_for_user;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Always get setting from profile
|
||||
// profile can be changed by ALTER PROFILE during single session
|
||||
auto settings = access->getDefaultSettings();
|
||||
const Field * max_session_for_user_field = settings.tryGet("max_sessions_for_user");
|
||||
if (max_session_for_user_field)
|
||||
max_sessions_for_user = max_session_for_user_field->safeGet<UInt64>();
|
||||
}
|
||||
|
||||
/// Session context is ready.
|
||||
session_context = std::move(new_session_context);
|
||||
@ -551,6 +577,11 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
|
||||
named_session_created = new_named_session_created;
|
||||
user = session_context->getUser();
|
||||
|
||||
session_tracker_handle = session_context->getSessionTracker().trackSession(
|
||||
*user_id,
|
||||
{ session_name_ },
|
||||
max_sessions_for_user);
|
||||
|
||||
return session_context;
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Access/AuthenticationData.h>
|
||||
#include <Interpreters/ClientInfo.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Interpreters/SessionTracker.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
@ -113,6 +114,8 @@ private:
|
||||
std::shared_ptr<NamedSessionData> named_session;
|
||||
bool named_session_created = false;
|
||||
|
||||
SessionTracker::SessionTrackerHandle session_tracker_handle;
|
||||
|
||||
Poco::Logger * log = nullptr;
|
||||
};
|
||||
|
||||
|
62
src/Interpreters/SessionTracker.cpp
Normal file
62
src/Interpreters/SessionTracker.cpp
Normal file
@ -0,0 +1,62 @@
|
||||
#include "SessionTracker.h"
|
||||
|
||||
#include <Common/Exception.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int USER_SESSION_LIMIT_EXCEEDED;
|
||||
}
|
||||
|
||||
SessionTracker::Session::Session(SessionTracker & tracker_,
|
||||
const UUID& user_id_,
|
||||
SessionInfos::const_iterator session_info_iter_) noexcept
|
||||
: tracker(tracker_), user_id(user_id_), session_info_iter(session_info_iter_)
|
||||
{
|
||||
}
|
||||
|
||||
SessionTracker::Session::~Session()
|
||||
{
|
||||
tracker.stopTracking(user_id, session_info_iter);
|
||||
}
|
||||
|
||||
SessionTracker::SessionTrackerHandle
|
||||
SessionTracker::trackSession(const UUID & user_id,
|
||||
const SessionInfo & session_info,
|
||||
size_t max_sessions_for_user)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
auto sessions_for_user_iter = sessions_for_user.find(user_id);
|
||||
if (sessions_for_user_iter == sessions_for_user.end())
|
||||
sessions_for_user_iter = sessions_for_user.emplace(user_id, SessionInfos()).first;
|
||||
|
||||
SessionInfos & session_infos = sessions_for_user_iter->second;
|
||||
if (max_sessions_for_user && session_infos.size() >= max_sessions_for_user)
|
||||
{
|
||||
throw Exception(ErrorCodes::USER_SESSION_LIMIT_EXCEEDED,
|
||||
"User {} has overflown session count {}",
|
||||
toString(user_id),
|
||||
max_sessions_for_user);
|
||||
}
|
||||
|
||||
session_infos.emplace_front(session_info);
|
||||
|
||||
return std::make_unique<SessionTracker::Session>(*this, user_id, session_infos.begin());
|
||||
}
|
||||
|
||||
void SessionTracker::stopTracking(const UUID& user_id, SessionInfos::const_iterator session_info_iter)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
auto sessions_for_user_iter = sessions_for_user.find(user_id);
|
||||
chassert(sessions_for_user_iter != sessions_for_user.end());
|
||||
|
||||
sessions_for_user_iter->second.erase(session_info_iter);
|
||||
if (sessions_for_user_iter->second.empty())
|
||||
sessions_for_user.erase(sessions_for_user_iter);
|
||||
}
|
||||
|
||||
}
|
60
src/Interpreters/SessionTracker.h
Normal file
60
src/Interpreters/SessionTracker.h
Normal file
@ -0,0 +1,60 @@
|
||||
#pragma once
|
||||
|
||||
#include "ClientInfo.h"
|
||||
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct SessionInfo
|
||||
{
|
||||
const String session_id;
|
||||
};
|
||||
|
||||
using SessionInfos = std::list<SessionInfo>;
|
||||
|
||||
using SessionsForUser = std::unordered_map<UUID, SessionInfos>;
|
||||
|
||||
class SessionTracker;
|
||||
|
||||
class SessionTracker
|
||||
{
|
||||
public:
|
||||
class Session : boost::noncopyable
|
||||
{
|
||||
public:
|
||||
explicit Session(SessionTracker & tracker_,
|
||||
const UUID & user_id_,
|
||||
SessionInfos::const_iterator session_info_iter_) noexcept;
|
||||
|
||||
~Session();
|
||||
|
||||
private:
|
||||
friend class SessionTracker;
|
||||
|
||||
SessionTracker & tracker;
|
||||
const UUID user_id;
|
||||
const SessionInfos::const_iterator session_info_iter;
|
||||
};
|
||||
|
||||
using SessionTrackerHandle = std::unique_ptr<SessionTracker::Session>;
|
||||
|
||||
SessionTrackerHandle trackSession(const UUID & user_id,
|
||||
const SessionInfo & session_info,
|
||||
size_t max_sessions_for_user);
|
||||
|
||||
private:
|
||||
/// disallow manual messing with session tracking
|
||||
friend class Session;
|
||||
|
||||
std::mutex mutex;
|
||||
SessionsForUser sessions_for_user TSA_GUARDED_BY(mutex);
|
||||
|
||||
void stopTracking(const UUID& user_id, SessionInfos::const_iterator session_info_iter);
|
||||
};
|
||||
|
||||
}
|
@ -215,7 +215,7 @@ bool ParserSetQuery::parseNameValuePair(SettingChange & change, IParser::Pos & p
|
||||
else if (ParserKeyword("FALSE").ignore(pos, expected))
|
||||
value = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(0)));
|
||||
/// for SETTINGS disk=disk(type='s3', path='', ...)
|
||||
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name.starts_with("disk"))
|
||||
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name == "disk")
|
||||
{
|
||||
tryGetIdentifierNameInto(name, change.name);
|
||||
change.value = createFieldFromAST(function_ast);
|
||||
@ -280,7 +280,7 @@ bool ParserSetQuery::parseNameValuePairWithParameterOrDefault(
|
||||
node = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(1)));
|
||||
else if (ParserKeyword("FALSE").ignore(pos, expected))
|
||||
node = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(0)));
|
||||
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name.starts_with("disk"))
|
||||
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name == "disk")
|
||||
{
|
||||
change.name = name;
|
||||
change.value = createFieldFromAST(function_ast);
|
||||
|
@ -5,6 +5,7 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int SET_SIZE_LIMIT_EXCEEDED;
|
||||
}
|
||||
|
||||
@ -126,9 +127,20 @@ bool DistinctSortedChunkTransform::isKey(const size_t key_pos, const size_t row_
|
||||
|
||||
bool DistinctSortedChunkTransform::isLatestKeyFromPrevChunk(const size_t row_pos) const
|
||||
{
|
||||
for (size_t i = 0; i < sorted_columns.size(); ++i)
|
||||
for (size_t i = 0, s = sorted_columns.size(); i < s; ++i)
|
||||
{
|
||||
const int res = prev_chunk_latest_key[i]->compareAt(0, row_pos, *sorted_columns[i], sorted_columns_descr[i].nulls_direction);
|
||||
const auto & sorted_column = *sorted_columns[i];
|
||||
/// temporary hardening due to suspious crashes in sqlancer tests
|
||||
if (unlikely(sorted_column.size() <= row_pos))
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Unexpected size of a sorted column: size {}, row_pos {}, column position {}, type {}",
|
||||
sorted_column.size(),
|
||||
row_pos,
|
||||
i,
|
||||
sorted_column.getFamilyName());
|
||||
|
||||
const int res = prev_chunk_latest_key[i]->compareAt(0, row_pos, sorted_column, sorted_columns_descr[i].nulls_direction);
|
||||
if (res != 0)
|
||||
return false;
|
||||
}
|
||||
|
@ -159,7 +159,7 @@ void PartialSortingTransform::transform(Chunk & chunk)
|
||||
{
|
||||
MutableColumnPtr sort_description_threshold_column_updated = raw_block_columns[i]->cloneEmpty();
|
||||
sort_description_threshold_column_updated->insertFrom(*raw_block_columns[i], min_row_to_compare);
|
||||
sort_description_threshold_columns_updated[i] = std::move(sort_description_threshold_column_updated);
|
||||
sort_description_threshold_columns_updated[i] = sort_description_threshold_column_updated->convertToFullColumnIfSparse();
|
||||
}
|
||||
|
||||
sort_description_threshold_columns = std::move(sort_description_threshold_columns_updated);
|
||||
|
@ -833,7 +833,7 @@ namespace
|
||||
{
|
||||
settings_changes.push_back({key, value});
|
||||
}
|
||||
query_context->checkSettingsConstraints(settings_changes);
|
||||
query_context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
|
||||
query_context->applySettingsChanges(settings_changes);
|
||||
|
||||
query_context->setCurrentQueryId(query_info.query_id());
|
||||
@ -1118,7 +1118,7 @@ namespace
|
||||
SettingsChanges settings_changes;
|
||||
for (const auto & [key, value] : external_table.settings())
|
||||
settings_changes.push_back({key, value});
|
||||
external_table_context->checkSettingsConstraints(settings_changes);
|
||||
external_table_context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
|
||||
external_table_context->applySettingsChanges(settings_changes);
|
||||
}
|
||||
auto in = external_table_context->getInputFormat(
|
||||
|
@ -764,7 +764,7 @@ void HTTPHandler::processQuery(
|
||||
context->setDefaultFormat(default_format);
|
||||
|
||||
/// For external data we also want settings
|
||||
context->checkSettingsConstraints(settings_changes);
|
||||
context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
|
||||
context->applySettingsChanges(settings_changes);
|
||||
|
||||
/// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields.
|
||||
|
@ -40,7 +40,7 @@ const char * ServerType::serverTypeToString(ServerType::Type type)
|
||||
return type_name.data();
|
||||
}
|
||||
|
||||
bool ServerType::shouldStart(Type server_type, const std::string & custom_name_) const
|
||||
bool ServerType::shouldStart(Type server_type, const std::string & server_custom_name) const
|
||||
{
|
||||
if (type == Type::QUERIES_ALL)
|
||||
return true;
|
||||
@ -77,13 +77,15 @@ bool ServerType::shouldStart(Type server_type, const std::string & custom_name_)
|
||||
}
|
||||
}
|
||||
|
||||
return type == server_type && custom_name == custom_name_;
|
||||
if (type == Type::CUSTOM)
|
||||
return server_type == type && server_custom_name == "protocols." + custom_name + ".port";
|
||||
|
||||
return server_type == type;
|
||||
}
|
||||
|
||||
bool ServerType::shouldStop(const std::string & port_name) const
|
||||
{
|
||||
Type port_type;
|
||||
std::string port_custom_name;
|
||||
|
||||
if (port_name == "http_port")
|
||||
port_type = Type::HTTP;
|
||||
@ -119,20 +121,12 @@ bool ServerType::shouldStop(const std::string & port_name) const
|
||||
port_type = Type::INTERSERVER_HTTPS;
|
||||
|
||||
else if (port_name.starts_with("protocols.") && port_name.ends_with(".port"))
|
||||
{
|
||||
constexpr size_t protocols_size = std::string_view("protocols.").size();
|
||||
constexpr size_t port_size = std::string_view("protocols.").size();
|
||||
|
||||
port_type = Type::CUSTOM;
|
||||
port_custom_name = port_name.substr(protocols_size, port_name.size() - port_size);
|
||||
}
|
||||
else
|
||||
port_type = Type::UNKNOWN;
|
||||
|
||||
if (port_type == Type::UNKNOWN)
|
||||
else
|
||||
return false;
|
||||
|
||||
return shouldStart(type, port_custom_name);
|
||||
return shouldStart(port_type, port_name);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -10,7 +10,6 @@ public:
|
||||
|
||||
enum Type
|
||||
{
|
||||
UNKNOWN,
|
||||
TCP,
|
||||
TCP_WITH_PROXY,
|
||||
TCP_SECURE,
|
||||
@ -34,7 +33,8 @@ public:
|
||||
|
||||
static const char * serverTypeToString(Type type);
|
||||
|
||||
bool shouldStart(Type server_type, const std::string & custom_name_ = "") const;
|
||||
/// Checks whether provided in the arguments type should be started or stopped based on current server type.
|
||||
bool shouldStart(Type server_type, const std::string & server_custom_name = "") const;
|
||||
bool shouldStop(const std::string & port_name) const;
|
||||
|
||||
Type type;
|
||||
|
@ -184,14 +184,17 @@ void TCPHandler::runImpl()
|
||||
try
|
||||
{
|
||||
receiveHello();
|
||||
|
||||
/// In interserver mode queries are executed without a session context.
|
||||
if (!is_interserver_mode)
|
||||
session->makeSessionContext();
|
||||
|
||||
sendHello();
|
||||
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM)
|
||||
receiveAddendum();
|
||||
|
||||
if (!is_interserver_mode) /// In interserver mode queries are executed without a session context.
|
||||
if (!is_interserver_mode)
|
||||
{
|
||||
session->makeSessionContext();
|
||||
|
||||
/// If session created, then settings in session context has been updated.
|
||||
/// So it's better to update the connection settings for flexibility.
|
||||
extractConnectionSettingsFromContext(session->sessionContext());
|
||||
@ -1181,7 +1184,6 @@ std::unique_ptr<Session> TCPHandler::makeSession()
|
||||
res->setClientName(client_name);
|
||||
res->setClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version);
|
||||
res->setConnectionClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version);
|
||||
res->setQuotaClientKey(quota_key);
|
||||
res->setClientInterface(interface);
|
||||
|
||||
return res;
|
||||
@ -1274,11 +1276,10 @@ void TCPHandler::receiveHello()
|
||||
void TCPHandler::receiveAddendum()
|
||||
{
|
||||
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY)
|
||||
{
|
||||
readStringBinary(quota_key, *in);
|
||||
if (!is_interserver_mode)
|
||||
session->setQuotaClientKey(quota_key);
|
||||
}
|
||||
|
||||
if (!is_interserver_mode)
|
||||
session->setQuotaClientKey(quota_key);
|
||||
}
|
||||
|
||||
|
||||
@ -1591,12 +1592,12 @@ void TCPHandler::receiveQuery()
|
||||
if (query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
|
||||
{
|
||||
/// Throw an exception if the passed settings violate the constraints.
|
||||
query_context->checkSettingsConstraints(settings_changes);
|
||||
query_context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Quietly clamp to the constraints if it's not an initial query.
|
||||
query_context->clampToSettingsConstraints(settings_changes);
|
||||
query_context->clampToSettingsConstraints(settings_changes, SettingSource::QUERY);
|
||||
}
|
||||
query_context->applySettingsChanges(settings_changes);
|
||||
|
||||
|
@ -57,8 +57,8 @@ std::vector<String> S3DataLakeMetadataReadHelper::listFiles(
|
||||
{
|
||||
outcome = client->ListObjectsV2(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception(
|
||||
ErrorCodes::S3_ERROR,
|
||||
throw S3Exception(
|
||||
outcome.GetError().GetErrorType(),
|
||||
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
|
||||
quoteString(bucket),
|
||||
quoteString(base_configuration.url.key),
|
||||
|
@ -250,15 +250,16 @@ StorageKafka::StorageKafka(
|
||||
: IStorage(table_id_)
|
||||
, WithContext(context_->getGlobalContext())
|
||||
, kafka_settings(std::move(kafka_settings_))
|
||||
, topics(parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value)))
|
||||
, brokers(getContext()->getMacros()->expand(kafka_settings->kafka_broker_list.value))
|
||||
, group(getContext()->getMacros()->expand(kafka_settings->kafka_group_name.value))
|
||||
, macros_info{.table_id = table_id_}
|
||||
, topics(parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value, macros_info)))
|
||||
, brokers(getContext()->getMacros()->expand(kafka_settings->kafka_broker_list.value, macros_info))
|
||||
, group(getContext()->getMacros()->expand(kafka_settings->kafka_group_name.value, macros_info))
|
||||
, client_id(
|
||||
kafka_settings->kafka_client_id.value.empty() ? getDefaultClientId(table_id_)
|
||||
: getContext()->getMacros()->expand(kafka_settings->kafka_client_id.value))
|
||||
: getContext()->getMacros()->expand(kafka_settings->kafka_client_id.value, macros_info))
|
||||
, format_name(getContext()->getMacros()->expand(kafka_settings->kafka_format.value))
|
||||
, max_rows_per_message(kafka_settings->kafka_max_rows_per_message.value)
|
||||
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value))
|
||||
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info))
|
||||
, num_consumers(kafka_settings->kafka_num_consumers.value)
|
||||
, log(&Poco::Logger::get("StorageKafka (" + table_id_.table_name + ")"))
|
||||
, semaphore(0, static_cast<int>(num_consumers))
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/Macros.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/Kafka/KafkaConsumer.h>
|
||||
@ -79,6 +80,7 @@ public:
|
||||
private:
|
||||
// Configuration and state
|
||||
std::unique_ptr<KafkaSettings> kafka_settings;
|
||||
Macros::MacroExpansionInfo macros_info;
|
||||
const Names topics;
|
||||
const String brokers;
|
||||
const String group;
|
||||
|
@ -457,8 +457,11 @@ const ActionsDAG::Node * MergeTreeIndexConditionSet::operatorFromDAG(const Actio
|
||||
if (arguments_size != 1)
|
||||
return nullptr;
|
||||
|
||||
auto bit_wrapper_function = FunctionFactory::instance().get("__bitWrapperFunc", context);
|
||||
const auto & bit_wrapper_func_node = result_dag->addFunction(bit_wrapper_function, {arguments[0]}, {});
|
||||
|
||||
auto bit_swap_last_two_function = FunctionFactory::instance().get("__bitSwapLastTwo", context);
|
||||
return &result_dag->addFunction(bit_swap_last_two_function, {arguments[0]}, {});
|
||||
return &result_dag->addFunction(bit_swap_last_two_function, {&bit_wrapper_func_node}, {});
|
||||
}
|
||||
else if (function_name == "and" || function_name == "indexHint" || function_name == "or")
|
||||
{
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user